datastep package¶
Submodules¶
datastep.constants module¶
datastep.exceptions module¶
datastep.file_utils module¶
-
datastep.file_utils.
create_unique_logical_key
(physical_key: Union[str, pathlib.Path]) → str[source]¶
-
datastep.file_utils.
make_json_serializable
(value: Any, context: Optional[str] = None) → Union[bool, float, int, str, List, Dict][source]¶
-
datastep.file_utils.
manifest_filepaths_abs2rel
(manifest: pandas.core.frame.DataFrame, filepath_columns: List[str], relative_dir: pathlib.Path)[source]¶
-
datastep.file_utils.
manifest_filepaths_rel2abs
(manifest: pandas.core.frame.DataFrame, filepath_columns: List[str], relative_dir: pathlib.Path)[source]¶
datastep.quilt_utils module¶
-
class
datastep.quilt_utils.
ValidationDetails
(value, index, origin_column, details_type)[source]¶ Bases:
tuple
Create new instance of ValidationDetails(value, index, origin_column, details_type)
-
property
details_type
¶ Alias for field number 3
-
property
index
¶ Alias for field number 1
-
property
origin_column
¶ Alias for field number 2
-
property
value
¶ Alias for field number 0
-
property
-
datastep.quilt_utils.
clean_metadata
(details: datastep.quilt_utils.ValidationDetails) → datastep.quilt_utils.ValidationDetails[source]¶
-
datastep.quilt_utils.
create_package
(manifest: pandas.core.frame.DataFrame, step_pkg_root: pathlib.Path, filepath_columns: List[str] = ['filepath'], metadata_columns: List[str] = []) → Tuple[quilt3.packages.Package, pandas.core.frame.DataFrame][source]¶
-
datastep.quilt_utils.
route_validator
(details: datastep.quilt_utils.ValidationDetails, manifest: pandas.core.frame.DataFrame, progress_bar) → datastep.quilt_utils.ValidationDetails[source]¶
datastep.step module¶
-
class
datastep.step.
Step
(step_name: Optional[str] = None, filepath_columns: List[str] = ['filepath'], metadata_columns: List[str] = [], direct_upstream_tasks: List[Step] = [], config: Union[str, pathlib.Path, Dict[str, str], None] = None)[source]¶ Bases:
prefect.core.task.Task
A class for creating “pure function” steps in a DAG.
This object’s sole purpose is to handle and enforce data logging tied to code using Quilt.
It manages to do this data logging through heavy utilization of a local staging directory and supporting files such as initialization parameters, a manifest CSV / Parquet that you can use to store the files you will want to send to Quilt.
However, as a part of the problem with stepwise workflows is their dependents on upstream data is hard to manage, the more you rely on this object the easier those upstream dependecies become. As if your upstream data dependecies are generated by other Step modules, then you can place them in the downstream Step as “direct_upstream_tasks” and use the Step.pull function to retrieve their data.
- Parameters
step_name (Optional[str]) – A name for this step. Default: the lowercased version of the inheriting object name
filepath_columns (List[str]) – In the final manifest CSV / Parquet you generate, which columns store filepaths. Default: [“filepath”]
metadata_columns (List[str]) – In the final manifest CSV / Parquet you generate, which columns store metadata. Default: []
direct_upstream_tasks (List[Step]) – If you need data for this task to run, and that data was generated by another Step object you can place references to those objects here and during the pull method this Step will retrieve the required data.
config (Optional[Union[str, Path, Dict[str, str]]]) – A path or dictionary detailing the entire workflow config. Refer to datastep.constants for details on workflow config defaults.
-
checkout
(data_version: Optional[str] = None, bucket: Optional[str] = None)[source]¶ Pull data previously generated by a run of this step.
- Parameters
data_version (Optional[str]) – Request a specific version of the prior generated data. Default: ‘latest’
bucket (Optional[str]) – Request data from a specific bucket different from the bucket defined by your workflow_config.json or the defaulted bucket.
-
clean
() → str[source]¶ Completely reset this steps local staging directory by removing all previously generated files.
-
get_result
(state: prefect.engine.state.State, flow: prefect.core.flow.Flow) → Any[source]¶ Get the result of this step.
- Parameters
state (prefect.engine.state.State) – The final state object of a prefect flow produced by running the flow.
flow (prefect.core.flow.Flow) – The flow that ran this step.
- Returns
result – The resulting object from running this step in a flow.
- Return type
Any
Notes
This will always return the first item that matches this step. What this means for the user is that if this step was used in a mapped task, you would only recieve the result of the first iteration of that map.
Generally though, you shouldn’t be using these steps in mapped tasks. (It’s on our to-do list…)
-
manifest_filepaths_abs2rel
()[source]¶ Convert manifest filepaths to relative paths.
Useful for when you are ready to upload to a remote bucket.
-
manifest_filepaths_rel2abs
()[source]¶ Convert manifest filepaths to absolute paths.
Useful for after you pull data from a remote bucket.
-
property
project_local_staging_dir
¶
-
pull
(data_version: Optional[str] = None, bucket: Optional[str] = None)[source]¶ Pull all upstream data dependecies using the list of upstream steps.
- Parameters
data_version (Optional[str]) – Request a specific version of the upstream data. Default: ‘latest’ for all upstreams
bucket (Optional[str]) – Request data from a specific bucket different from the bucket defined by your workflow_config.json or the defaulted bucket.
-
push
(bucket: Optional[str] = None)[source]¶ Push the most recently generated data.
- Parameters
bucket (Optional[str]) – Push data to a specific bucket different from the bucket defined by your workflow_config.json or the defaulted bucket.
Notes
If your git status isn’t clean, or you haven’t commited and pushed to origin, any attempt to push data will be rejected.
-
property
quilt_package_name
¶
-
property
quilt_package_owner
¶
-
run
(distributed_executor_address: Optional[str] = None, clean: bool = False, debug: bool = False, **kwargs) → Any[source]¶ Run a pure function.
There are a few “protected” parameters that are the following:
- Parameters
distributed_executor_address (Optional[str]) – An optional executor address to pass to some computation engine.
clean (bool) – Should the local staging directory be cleaned prior to this run. Default: False (Do not clean)
debug (bool) – A debug flag for the developer to use to manipulate how much data runs, how it is processed, etc. Default: False (Do not debug)
- Returns
result – A pickable object or value that is the result of any processing you do.
- Return type
Any
-
property
step_local_staging_dir
¶ A preconfigured directory for you to store output files in. Can be specifically set using a workflow_config.json file.
-
property
step_name
¶ Return the name of this step as a string.
-
property
storage_bucket
¶
-
property
upstream_tasks
¶