dlt.extract.incremental
Incremental Objects
@configspec
class Incremental(ItemTransform[TDataItem], BaseConfiguration,
Generic[TCursorValue])
Adds incremental extraction for a resource by storing a cursor value in persistent state.
The cursor could for example be a timestamp for when the record was created and you can use this to load only new records created since the last run of the pipeline.
To use this the resource function should have an argument either type annotated with Incremental or a default Incremental instance.
For example:
When the resource has a primary_key specified this is used to deduplicate overlapping items with the same cursor value.
Alternatively you can use this class as transform step and add it to any resource. For example:
@dlt.resource(primary_key='id')
def some_data(created_at=dlt.sources.incremental('created_at', '2023-01-01T00:00:00Z'):
yield from request_data(created_after=created_at.last_value)
@dlt.resource
def some_data():
last_value = dlt.sources.incremental.from_existing_state("some_data", "item.ts")
...
r = some_data().add_step(dlt.sources.incremental("item.ts", initial_value=now, primary_key="delta"))
info = p.run(r, destination="duckdb")
Arguments:
cursor_path- The name or a JSON path to a cursor field. Uses the same names of fields as in your JSON document, before they are normalized to store in the database.initial_value- Optional value used forlast_valuewhen no state is available, e.g. on the first run of the pipeline. If not providedlast_valuewill beNoneon the first run.last_value_func- Callable used to determine which cursor value to save in state. It is called with a list of the stored state value and all cursor vals from currently processing items. Default ismaxprimary_key- Optional primary key used to deduplicate data. If not provided, a primary key defined by the resource will be used. Pass a tuple to define a compound key. Pass empty tuple to disable unique checksend_value- Optional value used to load a limited range of records betweeninitial_valueandend_value. Use in conjunction withinitial_value, e.g. load records from given monthincremental(initial_value="2022-01-01T00:00:00Z", end_value="2022-02-01T00:00:00Z")Note, when this is set the incremental filtering is stateless andinitial_valuealways supersedes any previous incremental value in state.row_order- Declares that data source returns rows in descending (desc) or ascending (asc) order as defined bylast_value_func. If row order is know, Incremental class is able to stop requesting new rows by closing pipe generator. This prevents getting more data from the source. Defaults to None, which means that row order is not known.allow_external_schedulers- If set to True, allows dlt to look for external schedulers from which it will take "initial_value" and "end_value" resulting in loading only specified range of data. Currently Airflow scheduler is detected: "data_interval_start" and "data_interval_end" are taken from the context and passed Incremental class. The values passed explicitly to Incremental will be ignored. Note that if logical "end date" is present then also "end_value" will be set which means that resource state is not used and exactly this range of date will be loadedon_cursor_value_missing- Specify what happens when the cursor_path does not exist in a record or a record hasNoneat the cursor_path: raise, include, excludelag- Optional value used to define a lag or attribution window. For datetime cursors, this is interpreted as seconds. For other types, it uses the + or - operator depending on the last_value_func.range_start- Decide whether the incremental filtering range isopenorclosedon the start value side. Default isclosed. Setting this toopenmeans that items with the same cursor value as the last value from the previous run (orinitial_value) are excluded from the result. Theopenrange disables deduplication logic so it can serve as an optimization when you know cursors don't overlap between pipeline runs.range_end- Decide whether the incremental filtering range isopenorclosedon the end value side. Default isopen(exactend_valueis excluded). Setting this toclosedmeans that items with the exact same cursor value as theend_valueare included in the result.
placement_affinity
stick to end
from_existing_state
@classmethod
def from_existing_state(cls, resource_name: str,
cursor_path: str) -> "Incremental[TCursorValue]"
Create Incremental instance from existing state.
merge
def merge(other: "Incremental[TCursorValue]") -> "Incremental[TCursorValue]"
Create a new incremental instance which merges the two instances.
Only properties which are not None from other override the current instance properties.
This supports use cases with partial overrides, such as:
def my_resource(updated=incremental('updated', initial_value='1970-01-01'))
...
my_resource(updated=incremental(initial_value='2023-01-01', end_value='2023-02-01'))
get_cursor_column_name
def get_cursor_column_name() -> Optional[str]
Return the name of the cursor column if the cursor path resolves to a single column
get_state
def get_state() -> IncrementalColumnState
Returns or creates an Incremental state for a particular cursor column
If end_value is set, a mock state is created that will be discarded after extract step Otherwise state is taken from current pipeline and will be persisted in it
get_incremental_value_type
def get_incremental_value_type() -> Type[Any]
Infers the type of incremental value from a class of an instance if those preserve the Generic arguments information.
bind
def bind(pipe: SupportsPipe) -> "Incremental[TCursorValue]"
Called by pipe just before evaluation
can_close
def can_close() -> bool
Checks if incremental is out of range and can be closed.
Returns true only when row_order was set and
- results are ordered ascending and are above upper bound (end_value)
- results are ordered descending and are below or equal lower bound (start_value)
IncrementalResourceWrapper Objects
class IncrementalResourceWrapper(ItemTransform[TDataItem])
placement_affinity
stick to end
__init__
def __init__(
primary_key: Optional[TTableHintTemplate[TColumnNames]] = None
) -> None
Creates a wrapper over a resource function that accepts Incremental instance in its argument to perform incremental loading.
The wrapper delays instantiation of the Incremental to the moment of actual execution and is currently used by dlt.resource decorator.
The wrapper explicitly (via resource_name) parameter binds the Incremental state to a resource state.
Note that wrapper implements FilterItem transform interface and functions as a processing step in the before-mentioned resource pipe.
Arguments:
primary_keyTTableHintTemplate[TColumnKey], optional - A primary key to be passed to Incremental Instance at execution. Defaults to None.
inject_implicit_incremental_arg
@staticmethod
def inject_implicit_incremental_arg(
incremental: Optional[Union[Incremental[Any],
"IncrementalResourceWrapper"]],
sig: inspect.Signature,
func_args: Tuple[Any],
func_kwargs: Dict[str, Any],
fallback: Optional[Incremental[Any]] = None
) -> Tuple[Tuple[Any], Dict[str, Any], Optional[Incremental[Any]]]
Inject the incremental instance into function arguments if the function has an incremental argument without default in its signature and it is not already set in the arguments.
Returns:
Tuple of the new args, kwargs and the incremental instance that was injected (if any)
wrap
def wrap(sig: inspect.Signature, func: TFun) -> TFun
Wrap the callable to inject an Incremental object configured for the resource.
set_incremental
def set_incremental(incremental: Optional[TIncrementalConfig],
from_hints: bool = False) -> None
Sets the incremental. If incremental was set from_hints, it can only be changed in the same manner
allow_external_schedulers
@property
def allow_external_schedulers() -> bool
Allows the Incremental instance to get its initial and end values from external schedulers like Airflow