GEOG 489
Advanced Python Programming for GIS

(link is external) (link is external)

1.6.5.3 Multiprocessing Variants

PrintPrint

Python includes several different methods for executing processes in parallel. Each method behaves a little differently, and it is important to know some of these differences in order to get the most performance gain out of multiprocessing, and to avoid inadvertently introducing logical errors into your code. The table below provides some comparisons of the available methods to help summarize their abilities. Some things you should think about while choosing an appropriate method for your task is if the method is blocking, accepts single or multiple argument functions, and how the order of results are returned.

Multiprocessing variants
Variant Blocking Ordered Iterative Accepts Multiple Arguments Description
Pool.map Yes Yes No No Applies a function to all items in the input iterable, returning results in order.
Pool.map_async No Yes No No Similar to Pool.map, but returns a result object that can be checked later.
Pool.imap No Yes Yes No Returns an iterator that yields results in order as they become available.
Pool.imap_unordered No No Yes No Returns an iterator that yields results as they become available, order not guaranteed.
Pool.starmap Yes Yes No Yes Applies a function to arguments provided as tuples, returning results in order.
Pool.starmap_async No Yes No Yes Similar to Pool.starmap, but returns a result object that can be checked later.
apply Yes Yes No Yes Runs a single callable function and blocks until the result is available.
apply_async No Yes No Yes Runs a single callable function asynchronously and returns a result object.

For this class, we will focus on pool.starmap() and describe the pool.apply_async() to highlight some of their capabilities and implementations.

map

The method of multiprocessing that we will be using utilizes the map method that we covered earlier in the lesson as pool.starmap(), or you can think of it as ‘start the map() function’. The method starts a new process for each item in the list and holds the results from each process in a list.

  • Syntax: pool.map(func, iterable)
  • Purpose: Applies a function to all items in a given iterable (e.g., list) and returns a list of results.
  • Blocking: This method is blocking, meaning it waits for all processes to complete before moving on to the next line of code.
  • Synchronous: Processes tasks synchronously and in order.
  • Multiple Arguments: Designed to handle functions that take multiple arguments.
  • Usage: Often used when you have a list of tasks to perform and want the results in the same order as the input.

What if you wanted to run different functions in parallel? You can be more explicit by using the pool.apply_async() method to execute different functions in parallel. This multiprocessing variant is useful when performing various tasks that do not depend on maintaining return order, does not interfere with each other, or is dependent on results from other tasks. Some examples are copying a Featureclasses to multiple places, performing data operations, and executing maintenance routines and much more.

apply_async

Instead of using the map construct, you assign each process to a variable, start the task, and then call .get() when you are ready for the results.

  • Syntax: pool.apply_async(func, args=(), kwds={})
  • Purpose: Schedules a single function to be executed asynchronously.
  • Non-Blocking: This method is non-blocking, meaning it returns immediately with an ApplyResult object. It schedules the function to be executed and allows the main program to continue executing without waiting for the function to complete.
  • Asynchronous: Processes tasks asynchronously, allowing for other operations to continue while waiting for the result.
  • Multiple Arguments: Handles functions with a single argument or multiple arguments.
  • Usage: It is used when you need to execute a function asynchronously and do not need to wait for the result immediately. You can collect the results later using the .get() method on the ApplyResult object. The .get() method retrieves the result of the function call that was executed asynchronously. If the function has already completed, it returns the result immediately. If the function is still running, it waits until the function completes and then returns the result.

For example, this is how you can have three different functions working at the same time while you execute other processes. You can call get() on any of the task when your needs the result of the process, or you can put all tasks into a list. When you put them in a list, the time it takes to complete is based on the longest running process. Note here that the parameters need to be passed as a tuple. Single parameters need to be passed as (arg, ), but if you need to pass more than one parameter to the function, the tuple is (arg1, arg2, arg3).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
with multiprocessing.Pool() as pool:
    p1 = pool.apply_async(functionA, (1param,)) # starts the functionA process
    p2 = pool.apply_async(functionB, (1param, 2param)) # starts the functionB process
    p3 = pool.apply_async(functionC, (1param,)) # starts the functionC process
 
    # run other code if desired while p1, p2, p3 executes.
 
    # we need the result from p3 so block further execution and wait for it to finish
    functionA_process = p3.get()
    ...
 
    # get the results from the processes within ordered as a list.
    # when we call .get() on the task, it becomes blocking so it will wait here until the last process in the list is
    # done executing.
    order_results = [p1.get(), p2.get(), …]
    

When the list assigned to order_results is created and the .get() is called for each process, the results are stored in the list and can be retrieved by indexing or a loop.

1
2
3
4
5
# After the processes are complete, iterate over the results and check for errors.
    for r in order_results:
        if r['errorMsg'] != None:
            print(f'Task {r["name"]} Failed with: {r["errorMsg"]}'
        else: ...

What if the process run time is directly related to the amount of data being processed? For example, performing a calculation of a street at intervals of 20 feet for a whole County. Most likely, the dataset will have a wide range of street lengths. Short street segments will take milliseconds to compute, but the longer streets (those that are miles long) may take several minutes to calculate. You may not want to wait until the longest running process is complete to start working with the results since your short streets will be waiting and a number of your processors could be sitting idle until the last long calculation is done. By using the pool.imap_unordered or pool.imap methods, you can get the best performance gain since they work as an iterator and will return completed processes as they finish. Until the job list is completed, the iterator will ensure that no processor will sit idle, allowing many of the quicker calculated processes to complete and return while the longer processes continue to calculate. The syntax should look familiar since it is a simple for loop:

1
2
for i in pool.imap_unordered(split_street_function, range(10)):
    print(i)

We will focus on the pool.starmap() method for the examples and for the assignment since we will be simply applying the same function to a list of rasters or vectors. The multiple methods for multiprocessing are further described in the Python documentation here(link is external) and it is worth reviewing/ comparing them further if you have extra time at the end of the lesson.