aics_dask_utils package¶
Submodules¶
aics_dask_utils.distributed_handler module¶
-
class
aics_dask_utils.distributed_handler.
DistributedHandler
(address: Optional[str] = None)[source]¶ Bases:
object
A wrapper around concurrent.futures.ThreadPoolExecutor and distributed.Client to make moving from debugging to dask distribution easier and includes additional utility functions to manage large iterable mapping.
- Parameters
address (Optional[str]) – A scheduler address to connect to. Default: None (Use ThreadPoolExeuctor)
Examples
Use local machine threads for concurrency
>>> from aics_dask_utils import DistributedHandler ... # `None` address provided means use local machine threads ... with DistributedHandler(None) as handler: ... futures = handler.client.map( ... lambda x: x + 1, ... [1, 2, 3] ... ) ... ... results = handler.gather(futures)
Use some distributed cluster for concurrency
>>> from distributed import LocalCluster ... cluster = LocalCluster() ... ... # Actual address provided means use the dask scheduler ... with DistributedHandler(cluster.scheduler_address) as handler: ... futures = handler.client.map( ... lambda x: x + 1, ... [1, 2, 3] ... ) ... ... results = handler.gather(futures)
-
batched_map
(func, *iterables, batch_size: Optional[int] = None, **kwargs) → List[Any][source]¶ Map a function across iterables in a batched fashion.
If the iterables are of length 1000, but the batch size is 10, it will process and _complete_ 10 at a time. Other batch implementations relate to how the tasks are submitted to the scheduler itself. See batch_size parameter on the distributed.Client.map: https://distributed.dask.org/en/latest/api.html#distributed.Client.map
This function should be used over DistributedHandler.client.map if the map operation would result in more than hundreds of thousands of tasks being placed on the scheduler.
See: https://github.com/dask/distributed/issues/2181 for more details
- Parameters
func (Callable) – A serializable callable function to run across each iterable set.
iterables (Iterables) – List-like objects to map over. They should have the same length.
batch_size (Optional[int]) – Number of items to process and _complete_ in a single batch. Default: number of available workers or threads.
**kwargs (dict) – Other keyword arguments to pass down to this handler’s client.
- Returns
results – The complete results of all items after they have been fully processed and gathered.
- Return type
Iterable[Any]
-
property
client
¶ A pointer to the ThreadPoolExecutor or the Distributed Client.
-
close
()[source]¶ Close whichever client this handler is holding open.
Note: If connected to a Distributed.Client, it will close the client connection but will not shutdown the cluster.
-
gather
(futures: Iterable[Union[concurrent.futures._base.Future, distributed.client.Future]]) → List[Any][source]¶ Block until all futures are complete and return in a list of results.
- Parameters
futures (Iterable[Union[ThreadFuture, DaskFuture]]) – An iterable of futures object returned from DistributedHandler.client.map.
- Returns
results – The result of each future in a list.
- Return type
List[Any]