aics_dask_utils package

Submodules

aics_dask_utils.distributed_handler module

class aics_dask_utils.distributed_handler.DistributedHandler(address: Optional[str] = None)[source]

Bases: object

A wrapper around concurrent.futures.ThreadPoolExecutor and distributed.Client to make moving from debugging to dask distribution easier and includes additional utility functions to manage large iterable mapping.

Parameters

address (Optional[str]) – A scheduler address to connect to. Default: None (Use ThreadPoolExeuctor)

Examples

Use local machine threads for concurrency

>>> from aics_dask_utils import DistributedHandler
... # `None` address provided means use local machine threads
... with DistributedHandler(None) as handler:
...     futures = handler.client.map(
...         lambda x: x + 1,
...         [1, 2, 3]
...     )
...
...     results = handler.gather(futures)

Use some distributed cluster for concurrency

>>> from distributed import LocalCluster
... cluster = LocalCluster()
...
... # Actual address provided means use the dask scheduler
... with DistributedHandler(cluster.scheduler_address) as handler:
...     futures = handler.client.map(
...         lambda x: x + 1,
...         [1, 2, 3]
...     )
...
...     results = handler.gather(futures)
batched_map(func, *iterables, batch_size: Optional[int] = None, **kwargs) → List[Any][source]

Map a function across iterables in a batched fashion.

If the iterables are of length 1000, but the batch size is 10, it will process and _complete_ 10 at a time. Other batch implementations relate to how the tasks are submitted to the scheduler itself. See batch_size parameter on the distributed.Client.map: https://distributed.dask.org/en/latest/api.html#distributed.Client.map

This function should be used over DistributedHandler.client.map if the map operation would result in more than hundreds of thousands of tasks being placed on the scheduler.

See: https://github.com/dask/distributed/issues/2181 for more details

Parameters
  • func (Callable) – A serializable callable function to run across each iterable set.

  • iterables (Iterables) – List-like objects to map over. They should have the same length.

  • batch_size (Optional[int]) – Number of items to process and _complete_ in a single batch. Default: number of available workers or threads.

  • **kwargs (dict) – Other keyword arguments to pass down to this handler’s client.

Returns

results – The complete results of all items after they have been fully processed and gathered.

Return type

Iterable[Any]

property client

A pointer to the ThreadPoolExecutor or the Distributed Client.

close()[source]

Close whichever client this handler is holding open.

Note: If connected to a Distributed.Client, it will close the client connection but will not shutdown the cluster.

gather(futures: Iterable[Union[concurrent.futures._base.Future, distributed.client.Future]]) → List[Any][source]

Block until all futures are complete and return in a list of results.

Parameters

futures (Iterable[Union[ThreadFuture, DaskFuture]]) – An iterable of futures object returned from DistributedHandler.client.map.

Returns

results – The result of each future in a list.

Return type

List[Any]

Module contents

Top-level package for AICS Dask Utils.

aics_dask_utils.get_module_version()[source]