dlt.pipeline
pipeline
@overload
def pipeline(pipeline_name: str = None,
pipelines_dir: str = None,
pipeline_salt: TSecretStrValue = None,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
import_schema_path: str = None,
export_schema_path: str = None,
full_refresh: bool = None,
dev_mode: bool = False,
refresh: TRefreshMode = None,
progress: TCollectorArg = _NULL_COLLECTOR,
_impl_cls: Type[TPipeline] = Pipeline) -> TPipeline
Creates a new instance of dlt pipeline, which moves the data from the source ie. a REST API to a destination ie. database or a data lake.
Notes:
The pipeline functions allows you to pass the destination name to which the data should be loaded, the name of the dataset and several other options that govern loading of the data.
The created Pipeline object lets you load the data from any source with run method or to have more granular control over the loading process with extract, normalize and load methods.
Please refer to the following doc pages
- Write your first pipeline walkthrough: https://dlthub.com/docs/walkthroughs/create-a-pipeline
- Pipeline architecture and data loading steps: https://dlthub.com/docs/reference
- List of supported destinations: https://dlthub.com/docs/dlt-ecosystem/destinations
Arguments:
-
pipeline_namestr, optional - A name of the pipeline that will be used to identify it in monitoring events and to restore its state and data schemas on subsequent runs. Defaults to the file name of pipeline script withdlt_prefix added. -
pipelines_dirstr, optional - A working directory in which pipeline state and temporary files will be stored. Defaults to user home directory:~/dlt/pipelines/. -
pipeline_saltTSecretStrValue, optional - A random value used for deterministic hashing during data anonymization. Defaults to a value derived from the pipeline name. Default value should not be used for any cryptographic purposes. -
destinationTDestinationReferenceArg, optional - A name of the destination to which dlt will load the data, or a destination module imported fromdlt.destination. May also be provided torunmethod of thepipeline. -
stagingTDestinationReferenceArg, optional - A name of the destination where dlt will stage the data before final loading, or a destination module imported fromdlt.destination. May also be provided torunmethod of thepipeline. -
dataset_namestr, optional - A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie.schemain relational databases or folder grouping many files. May also be provided later to therunorloadmethods of thePipeline. If not provided at all then defaults to thepipeline_name -
import_schema_pathstr, optional - A path from which the schemayamlfile will be imported on each pipeline run. Defaults to None which disables importing. -
export_schema_pathstr, optional - A path where the schemayamlfile will be exported after every schema change. Defaults to None which disables exporting. -
full_refreshbool, optional - Deprecated use dev_mode instead. -
dev_modebool, optional - When set to True, each instance of the pipeline with thepipeline_namestarts from scratch when run and loads the data to a separate dataset. The datasets are identified bydataset_name_+ datetime suffix. Use this setting whenever you experiment with your data to be sure you start fresh on each run. Defaults to False. -
refreshTRefreshMode, optional - Fully or partially reset sources during pipeline run. When set here the refresh is applied on each run of the pipeline. To apply refresh only once you can pass it topipeline.runorextractinstead. The following refresh modes are supported:drop_sources: Drop tables and source and resource state for all sources currently being processed inrunorextractmethods of the pipeline. (Note: schema history is erased)drop_resources: Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)drop_data: Wipe all data and resource state for all resources being processed. Schema is not modified.
-
progress(TCollectorArg)- A progress monitor that shows progress bars, console or log messages with current information on sources, resources, data items etc. processed inextract,normalizeandloadstage. Pass a string with a collector name or configure your own by choosing fromdlt.progressmodule. We support most of the progress libraries: try passingtqdm,enlightenoralive_progressorlogto write to console/log. -
_impl_clsType[TPipeline], optional - A class of the pipeline to use. Defaults toPipeline.
Returns:
TPipeline- An instance ofPipelineclass or a subclass. Please check the documentation ofrunmethod for information on what to do with it.
pipeline
@overload
def pipeline() -> Pipeline
When called without any arguments, returns the recently created Pipeline instance.
If not found, it creates a new instance with all the pipeline options set to defaults.
attach
@with_config(
spec=PipelineConfiguration,
sections=(known_sections.PIPELINES, ),
section_arg_name="pipeline_name",
)
def attach(pipeline_name: str = None,
pipelines_dir: str = None,
pipeline_salt: TSecretStrValue = None,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
progress: TCollectorArg = _NULL_COLLECTOR,
dataset_name: str = None,
**injection_kwargs: Any) -> Pipeline
Attaches to the working folder of pipeline_name in pipelines_dir or in default directory.
Pre-configured destination and staging factories may be provided. If not present, default factories are created from pipeline state.
If no local pipeline state is found, dlt will attempt to restore the pipeline from the provided destination and dataset.
run
def run(data: Any,
*,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: Sequence[TColumnSchema] = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: TRefreshMode = None) -> LoadInfo
Loads the data in data argument into the destination specified in destination and dataset specified in dataset_name.
Notes:
This method will extract the data from the data argument, infer the schema, normalize the data into a load package (ie. jsonl or PARQUET files representing tables) and then load such packages into the destination.
The data may be supplied in several forms:
- a
listorIterableof any JSON-serializable objects ie.dlt.run([1, 2, 3], table_name="numbers") - any
Iteratoror a function that yield (Generator) ie.dlt.run(range(1, 10), table_name="range") - a function or a list of functions decorated with @dlt.resource ie.
dlt.run([chess_players(title="GM"), chess_games()]) - a function or a list of functions decorated with @dlt.source.
Please note that dlt deals with bytes, datetime, decimal and uuid objects so you are free to load binary data or documents containing dates.
Execution:
The run method will first use sync_destination method to synchronize pipeline state and schemas with the destination. You can disable this behavior with restore_from_destination configuration option.
Next it will make sure that data from the previous is fully processed. If not, run method normalizes and loads pending data items.
Only then the new data from data argument is extracted, normalized and loaded.
Arguments:
-
dataAny - Data to be loaded to destination -
destinationTDestinationReferenceArg, optional - A name of the destination to which dlt will load the data, or a destination module imported fromdlt.destination. If not provided, the value passed todlt.pipelinewill be used. -
stagingTDestinationReferenceArg, optional - A name of the destination where dlt will stage the data before final loading, or a destination module imported fromdlt.destination. May also be provided torunmethod of thepipeline. -
dataset_namestr, optional - A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie.schemain relational databases or folder grouping many files. If not provided, the value passed todlt.pipelinewill be used. If not provided at all then defaults to thepipeline_name -
table_namestr, optional - The name of the table to which the data should be loaded within thedataset. This argument is required for adatathat is a list/Iterable or Iterator without__name__attribute. The behavior of this argument depends on the type of thedata:- generator functions: the function name is used as table name,
table_nameoverrides this default @dlt.resource: resource contains the full table schema and that includes the table name.table_namewill override this property. Use with care!@dlt.source: source contains several resources each with a table schema.table_namewill override all table names within the source and load the data into single table.
- generator functions: the function name is used as table name,
-
write_dispositionTWriteDispositionConfig, optional - Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. Allowed shorthand string literals:appendwill always add new data at the end of the table.replacewill replace existing data with new data.skipwill prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table providewrite_disposition={"disposition": "merge", "strategy": "scd2"}. Please note that in case ofdlt.resourcethe table schema value will be overwritten and in case ofdlt.source, the values in all resources will be overwritten. -
columnsSequence[TColumnSchema], optional - A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. -
schemaSchema, optional - An explicitSchemaobject in which all table schemas will be grouped. By defaultdlttakes the schema from the source (if passed indataargument) or creates a default one itself. -
loader_file_formatTLoaderFileFormat, optional - The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination. -
table_formatTTableFormat, optional - Can be "delta" or "iceberg". The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations. -
schema_contractTSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. -
refreshTRefreshMode, optional - Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:drop_sources: Drop tables and source and resource state for all sources currently being processed inrunorextractmethods of the pipeline. (Note: schema history is erased)drop_resources: Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)drop_data: Wipe all data and resource state for all resources being processed. Schema is not modified.
Raises:
PipelineStepFailed- when a problem happened duringextract,normalizeorloadsteps.
Returns:
LoadInfo- Information on loaded data including the list of package ids and failed job statuses. Please not thatdltwill not raise if a single job terminally fails. Such information is provided via LoadInfo.