dlt.extract.source
DltResourceDict Objects
class DltResourceDict(Dict[str, DltResource])
selected
@property
def selected() -> Dict[str, DltResource]
Returns a subset of all resources that will be extracted and loaded to the destination.
extracted
@property
def extracted() -> List[DltResource]
Returns a all resources that will be extracted. That includes selected resources and all their parents. Note that several resources with the same name may be included if there were several parent instances of the same resource.
selected_dag
@property
def selected_dag() -> List[Tuple[str, str]]
Returns a list of edges of directed acyclic graph of pipes and their parents in selected resources
extracted_pipes
@property
def extracted_pipes() -> Sequence[Pipe]
Returns all execution pipes in the source including all parent pipes
select
def select(*resource_names: str) -> Dict[str, DltResource]
Selects resource_name to be extracted, and unselects remaining resources.
with_pipe
def with_pipe(pipe: SupportsPipe) -> DltResource
Gets resource with given execution pipe matched by pipe instance id. Note that we do not use names to match pipe to resource as many resources with the same name may be extracted at the same time: ie. when transformers with different names depend on several instances of the same resource. Note that ad hoc resources for parent pipes without resource won't be created
add
def add(*resources: DltResource) -> None
Add resources to the source. Adding multiple resources with the same name is not
supported.
detach
def detach(resource_name: str = None) -> DltResource
Clones resource_name (including parent resource pipes) and removes source contexts.
Defaults to the first resource in the source if resource_name is None.
DltSource Objects
class DltSource(Iterable[TDataItem])
Groups several dlt resources under a single schema and allows to perform operations on them.
The instance of this class is created whenever you call the dlt.source decorated function. It automates several functions for you:
- You can pass this instance to
dltrunmethod in order to load all data present in thedlt resources. - You can select and deselect resources that you want to load via
with_resourcesmethod - You can access the resources (which are
DltResourceinstances) as source attributes - It implements
Iterableinterface so you can get all the data from the resources yourself and without dlt pipeline present. - It will create a DAG from resources and transformers and optimize the extraction so parent resources are extracted only once
- You can get the
schemafor the source and all the resources within it. - You can use a
runmethod to load the data with a default instance of dlt pipeline. - You can get source read only state for the currently active Pipeline instance
from_data
@classmethod
def from_data(cls, schema: Schema, section: str, data: Any) -> Self
Converts any data supported by dlt run method into dlt source with a name section.name and schema schema.
max_table_nesting
@property
def max_table_nesting() -> int
A schema hint that sets the maximum depth of nested table above which the remaining nodes are loaded as structs or JSON.
root_key
@property
def root_key() -> Optional[bool]
Enables merging on all resources by propagating root foreign key to nested tables. This option is most useful if you plan to change write disposition of a resource to disable/enable merge.
exhausted
@property
def exhausted() -> bool
Check all selected pipes whether one of them has started. if so, the source is exhausted.
resources
@property
def resources() -> DltResourceDict
A dictionary of all resources present in the source, where the key is a resource name.
Returns:
DltResourceDict- A dictionary of all resources present in the source, where the key is a resource name.
selected_resources
@property
def selected_resources() -> Dict[str, DltResource]
A dictionary of all the resources that are selected to be loaded.
Returns:
Dict[str, DltResource]: A dictionary of all the resources that are selected to be loaded.
discover_schema
def discover_schema(item: TDataItem = None, meta: Any = None) -> Schema
Computes table schemas for all selected resources in the source and merges them with a copy of current source schema. If item is provided,
dynamic tables will be evaluated, otherwise those tables will be ignored.
with_resources
def with_resources(*resource_names: str) -> "DltSource"
A convenience method to select one of more resources to be loaded. Returns a clone of the original source with the specified resources selected.
decompose
def decompose(strategy: TDecompositionStrategy) -> List["DltSource"]
Decomposes source into a list of sources with a given strategy.
"none" will return source as is "scc" will decompose the dag of selected pipes and their parent into strongly connected components
add_limit
def add_limit(max_items: Optional[int] = None,
max_time: Optional[float] = None,
count_rows: Optional[bool] = False) -> "DltSource"
Limits the items processed in all selected resources in the source that are not transformers: by count or time.
This is useful for testing, debugging and generating sample datasets for experimentation. You can easily get your test dataset in a few minutes, when otherwise you'd need to wait hours for the full loading to complete.
For incremental resources that return rows in deterministic order, you can use this function to process large load in batches.
Notes:
- Transformers resources won't be limited. They should process all the data they receive fully to avoid inconsistencies in generated datasets.
- Each yielded item may contain several records.
add_limitonly limits the "number of yields", not the total number of records. - Empty pages/yields are also counted. Use
count_rowsto skip empty pages.
Arguments:
max_itemsOptional[int] - The maximum number of items (not rows!) to yield, set to None for no limitmax_timeOptional[float] - The maximum number of seconds for this generator to run after it was opened, set to None for no limitcount_rowsOptional[bool] - Default: false. Count rows instead of pages. Note that if resource yields pages of rows, last page will not be trimmed and more rows that expected will be received.
Returns:
"DltSource"- returns self
parallelize
def parallelize() -> "DltSource"
Mark all resources in the source to run in parallel.
Only transformers and resources based on generators and generator functions are supported, unsupported resources will be skipped.
run
@property
def run() -> SupportsPipelineRun
A convenience method that will call run run on the currently active dlt pipeline. If pipeline instance is not found, one with default settings will be created.
state
@property
def state() -> StrAny
Gets source-scoped state from the active pipeline. PipelineStateNotAvailable is raised if no pipeline is active
clone
def clone(with_name: str = None) -> "DltSource"
Creates a deep copy of the source where copies of schema, resources and pipes are created.
If with_name is provided, a schema is cloned with a changed name
__iter__
def __iter__() -> Generator[TDataItem, None, None]
Opens iterator that yields the data items from all the resources within the source in the same order as in Pipeline class.
A read-only state is provided, initialized from active pipeline state. The state is discarded after the iterator is closed.
A source config section is injected to allow secrets/config injection as during regular extraction.
SourceSchemaInjectableContext Objects
@configspec
class SourceSchemaInjectableContext(ContainerInjectableContext)
A context containing the source schema, present when dlt.source/resource decorated function is executed
SourceInjectableContext Objects
@configspec
class SourceInjectableContext(ContainerInjectableContext)
A context containing the source schema, present when dlt.resource decorated function is executed