from tqdm import tqdm from concurrent.futures import ProcessPoolExecutor, as_completed def parallel_process(array, function, n_jobs=16, use_kwargs=False, front_num=0): """ A parallel version of the map function with a progress bar. Args: array (array-like): An array to iterate over. function (function): A python function to apply to the elements of array n_jobs (int, default=16): The number of cores to use use_kwargs (boolean, default=False): Whether to consider the elements of array as dictionaries of keyword arguments to function front_num (int, default=3): The number of iterations to run serially before kicking off the parallel job. Useful for catching bugs Returns: [function(array[0]), function(array[1]), ...] """ # We run the first few iterations serially to catch bugs if front_num > 0: front = [ function(**a) if use_kwargs else function(a) for a in array[:front_num] ] else: front = [] # If we set n_jobs to 1, just run a list comprehension. This is useful for benchmarking and debugging. if n_jobs == 1: return front + [ function(**a) if use_kwargs else function(a) for a in tqdm(array[front_num:]) ] # Assemble the workers with ProcessPoolExecutor(max_workers=n_jobs) as pool: # Pass the elements of array into function if use_kwargs: futures = [pool.submit(function, **a) for a in array[front_num:]] else: futures = [pool.submit(function, a) for a in array[front_num:]] kwargs = { "total": len(futures), "unit": "it", "unit_scale": True, "leave": True, } # Print out the progress as tasks complete for f in tqdm(as_completed(futures), **kwargs): pass out = [] # Get the results from the futures. for i, future in tqdm(enumerate(futures)): try: out.append(future.result()) except Exception as e: out.append(e) return front + out