datastep package

Submodules

datastep.constants module

datastep.exceptions module

exception datastep.exceptions.DirectoryNotFoundError[source]

Bases: Exception

exception datastep.exceptions.InvalidGitStatus[source]

Bases: Exception

exception datastep.exceptions.PackagingError[source]

Bases: Exception

datastep.file_utils module

datastep.file_utils.create_unique_logical_key(physical_key: Union[str, pathlib.Path]) → str[source]
datastep.file_utils.make_json_serializable(value: Any, context: Optional[str] = None) → Union[bool, float, int, str, List, Dict][source]
datastep.file_utils.manifest_filepaths_abs2rel(manifest: pandas.core.frame.DataFrame, filepath_columns: List[str], relative_dir: pathlib.Path)[source]
datastep.file_utils.manifest_filepaths_rel2abs(manifest: pandas.core.frame.DataFrame, filepath_columns: List[str], relative_dir: pathlib.Path)[source]
datastep.file_utils.resolve_directory(d: Union[str, pathlib.Path], make: bool = False, strict: bool = True) → pathlib.Path[source]
datastep.file_utils.resolve_filepath(f: Union[str, pathlib.Path], strict: bool = True) → pathlib.Path[source]

datastep.quilt_utils module

class datastep.quilt_utils.ValidationDetails(value, index, origin_column, details_type)[source]

Bases: tuple

Create new instance of ValidationDetails(value, index, origin_column, details_type)

property details_type

Alias for field number 3

property index

Alias for field number 1

property origin_column

Alias for field number 2

property value

Alias for field number 0

datastep.quilt_utils.clean_metadata(details: datastep.quilt_utils.ValidationDetails) → datastep.quilt_utils.ValidationDetails[source]
datastep.quilt_utils.create_package(manifest: pandas.core.frame.DataFrame, step_pkg_root: pathlib.Path, filepath_columns: List[str] = ['filepath'], metadata_columns: List[str] = []) → Tuple[quilt3.packages.Package, pandas.core.frame.DataFrame][source]
datastep.quilt_utils.route_validator(details: datastep.quilt_utils.ValidationDetails, manifest: pandas.core.frame.DataFrame, progress_bar) → datastep.quilt_utils.ValidationDetails[source]
datastep.quilt_utils.validate_filepath(details: datastep.quilt_utils.ValidationDetails) → datastep.quilt_utils.ValidationDetails[source]
datastep.quilt_utils.validate_manifest(manifest: pandas.core.frame.DataFrame, filepath_columns: List[str], metadata_columns: List[str])[source]

datastep.step module

class datastep.step.Step(step_name: Optional[str] = None, filepath_columns: List[str] = ['filepath'], metadata_columns: List[str] = [], direct_upstream_tasks: List[Step] = [], config: Union[str, pathlib.Path, Dict[str, str], None] = None)[source]

Bases: prefect.core.task.Task

A class for creating “pure function” steps in a DAG.

This object’s sole purpose is to handle and enforce data logging tied to code using Quilt.

It manages to do this data logging through heavy utilization of a local staging directory and supporting files such as initialization parameters, a manifest CSV / Parquet that you can use to store the files you will want to send to Quilt.

However, as a part of the problem with stepwise workflows is their dependents on upstream data is hard to manage, the more you rely on this object the easier those upstream dependecies become. As if your upstream data dependecies are generated by other Step modules, then you can place them in the downstream Step as “direct_upstream_tasks” and use the Step.pull function to retrieve their data.

Parameters
  • step_name (Optional[str]) – A name for this step. Default: the lowercased version of the inheriting object name

  • filepath_columns (List[str]) – In the final manifest CSV / Parquet you generate, which columns store filepaths. Default: [“filepath”]

  • metadata_columns (List[str]) – In the final manifest CSV / Parquet you generate, which columns store metadata. Default: []

  • direct_upstream_tasks (List[Step]) – If you need data for this task to run, and that data was generated by another Step object you can place references to those objects here and during the pull method this Step will retrieve the required data.

  • config (Optional[Union[str, Path, Dict[str, str]]]) – A path or dictionary detailing the entire workflow config. Refer to datastep.constants for details on workflow config defaults.

checkout(data_version: Optional[str] = None, bucket: Optional[str] = None)[source]

Pull data previously generated by a run of this step.

Parameters
  • data_version (Optional[str]) – Request a specific version of the prior generated data. Default: ‘latest’

  • bucket (Optional[str]) – Request data from a specific bucket different from the bucket defined by your workflow_config.json or the defaulted bucket.

clean() → str[source]

Completely reset this steps local staging directory by removing all previously generated files.

get_result(state: prefect.engine.state.State, flow: prefect.core.flow.Flow) → Any[source]

Get the result of this step.

Parameters
  • state (prefect.engine.state.State) – The final state object of a prefect flow produced by running the flow.

  • flow (prefect.core.flow.Flow) – The flow that ran this step.

Returns

result – The resulting object from running this step in a flow.

Return type

Any

Notes

This will always return the first item that matches this step. What this means for the user is that if this step was used in a mapped task, you would only recieve the result of the first iteration of that map.

Generally though, you shouldn’t be using these steps in mapped tasks. (It’s on our to-do list…)

manifest_filepaths_abs2rel()[source]

Convert manifest filepaths to relative paths.

Useful for when you are ready to upload to a remote bucket.

manifest_filepaths_rel2abs()[source]

Convert manifest filepaths to absolute paths.

Useful for after you pull data from a remote bucket.

property project_local_staging_dir
pull(data_version: Optional[str] = None, bucket: Optional[str] = None)[source]

Pull all upstream data dependecies using the list of upstream steps.

Parameters
  • data_version (Optional[str]) – Request a specific version of the upstream data. Default: ‘latest’ for all upstreams

  • bucket (Optional[str]) – Request data from a specific bucket different from the bucket defined by your workflow_config.json or the defaulted bucket.

push(bucket: Optional[str] = None)[source]

Push the most recently generated data.

Parameters

bucket (Optional[str]) – Push data to a specific bucket different from the bucket defined by your workflow_config.json or the defaulted bucket.

Notes

If your git status isn’t clean, or you haven’t commited and pushed to origin, any attempt to push data will be rejected.

property quilt_package_name
property quilt_package_owner
run(distributed_executor_address: Optional[str] = None, clean: bool = False, debug: bool = False, **kwargs) → Any[source]

Run a pure function.

There are a few “protected” parameters that are the following:

Parameters
  • distributed_executor_address (Optional[str]) – An optional executor address to pass to some computation engine.

  • clean (bool) – Should the local staging directory be cleaned prior to this run. Default: False (Do not clean)

  • debug (bool) – A debug flag for the developer to use to manipulate how much data runs, how it is processed, etc. Default: False (Do not debug)

Returns

result – A pickable object or value that is the result of any processing you do.

Return type

Any

property step_local_staging_dir

A preconfigured directory for you to store output files in. Can be specifically set using a workflow_config.json file.

property step_name

Return the name of this step as a string.

property storage_bucket
property upstream_tasks
datastep.step.log_run_params(func)[source]

Module contents

Top-level package for datastep.

datastep.get_module_version()[source]