SMP processing with multiprocessing

Thu 23 May 2013

Filed under Blog

Tags python gdal tips

This note details how to use parallel processing in python accessing shared memory. The usage case is when you have a process that reads in a lot of data, and where the data can be processed in parallel, such as when reading and processing images/stacks of images in a pixel by pixel basis.

Critically, we want to avoid making lots of copies of the data to distribute to the individual processes. To do this, we need to use the multiprocessing module and the sharedmem module. The latter just makes using shared memory much more user friendly. There seem to be a few versions of sharedmem around, but you can get the code from here.

The example

One possible example is the calculation of the typical maximum value composite from MODIS data. This means that the value of NDVI is calculated for each observation within a temporal period, and the maximum value within that time period is reported. The processing is clearly pixel-by-pixel, so we could think of running the processing on different cores working on different pixels or different windows of the image. The main loop looks like this

def worker ( red, nir, ndvi, chunk ):
    """Process a chunk of the image stack.

    This function first calculates a window of rows
    which is directed by the value of `chunk`. Each
    worker processes 20 at a time, and this is
    reflected in the `row_slice` variable. It then
    loops over the 37 *dekads* in a year, taking into
    account the last period is only 5 days, and defines
    a new slice object, `time_slice`. For each period and
    spatial chunk, the ndvi for each day is calculated,
    and the maximum value is then stored in the ndvi
    array for the appropriate period.

    Note how we don't return any results here, we are
    modifying ndvi **in-place** in memory"""

    # We do 20 lines each time
    row_slice = np.s_[ (20*chunk):((chunk+1)*20) ]
    # This is the time loop: 37 periods per year
    for i in xrange(37):
        # Start position
        istart = i*10
        # End position
        iend = (i+1)*10
        # Correct for last period
        if iend > nir.shape[1]:
            iend = nir.shape[1]
        # Define the temporal slice for easier coding
        time_slice=np.s_[istart:iend]
        # calculates NDVI and stores it in `the_val`
        the_val = nir[time_slice, :, row_slice] - \
                    red[time_slice, :, row_slice]
        the_val = the_val /(1.0e-12 + \
                    nir[time_slice, :, row_slice] + \
                    red[time_slice, :, row_slice] )
        the_val = np.where ( np.isnan( the_val ), -1, \
            the_val )
        # Take maximum value and store it
        ndvi[i, :, row_slice] = the_val.max(axis=0)
    return

The worker above will get the data, modify one of the arrays in place ndvi and return. The multiprocessing mechanism is the one in charge of distributing the load. Let's see how this is done.

First, we need to import some stuff

import time
import sys
import multiprocessing as mp

from osgeo import gdal
import sharedmem as shm
import numpy as np

Now, we need a data reader. We'll use some data lying around on UCL for simplicity

N=2400

print "[%s] Reading RED..." % time.asctime()
sys.stdout.flush()
red = shm.zeros((365, N, N),dtype=np.float32)
g = gdal.Open("brdf_2004_b01.vrt")
for i in xrange(365):
    red[i,:,:] = g.GetRasterBand(2*i+1).ReadAsArray()/10000.
print "[%s] Read RED..." % time.asctime()
print "[%s] Reading NIR..." % time.asctime()
sys.stdout.flush()
nir = shm.zeros((365, N, N),dtype=np.float32)
g = gdal.Open("brdf_2004_b02.vrt")
for i in xrange(365):
    nir[i,:,:] = g.GetRasterBand(2*i+1).ReadAsArray()/10000.
print "[%s] Read NIR..." % time.asctime()
sys.stdout.flush()
# Output data
ndvi=shm.zeros((37, N, N),dtype=np.float32)
jobs = []
print "[%s] Spreading chunks!" % time.asctime()
sys.stdout.flush()

The previous code basically just sets to sharedmem arrays, red and nir, and reads data into them. Note that we specify the datatype, but otherwise, they could just be np.zeros incantations. The reading process can take several minutes if started from cold (i.e. data not on disk cache).

The next step is to actually spread the load over the different cores. We have assumed that we'd be doing 20 rows/cols at a time, so we'll have 2400/20 = 120 windows to process. The code is straightforward

jobs = []
for i in range(2400/20):
    p = mp.Process(target=worker, args=(red, nir, ndvi,i,))
    jobs.append(p)
    p.start()

for j in jobs:
    j.join()
print "[%s] Done chunking" % time.asctime()
sys.stdout.flush()

The only weird bit is the join: this is done to wait for the processes that might not have finished. After this is done, we get the processed data in the ndvi array. During this process, you can look at the output from htop to see how bad the CPU banging is. Running the script in a 24-core machine shows the following output:

[Thu May 23 18:18:04 2013] Reading RED...
[Thu May 23 18:42:53 2013] Read RED...
[Thu May 23 18:42:53 2013] Reading NIR...
[Thu May 23 18:47:24 2013] Read NIR...
[Thu May 23 18:47:25 2013] Spreading chunks!
[Thu May 23 18:47:42 2013] Done chunking

Note how most of the time is spent reading in the data. Notice that reading the red band is very slow but subsequently reading the NIR band is fairly fast. This is because the data are stored in an NFS share. During the first read the files will be cached locally (this particular server has lots of memory), and hence subsequently reading the NIR data is fast as little use of the network is done.

Finally...

So, in a nutshell, what one needs to do is:

  1. Create the sharedmem arrays and populate them.
  2. Create a worker function that does the require processing. Just assume all data is there, and use slicing!
  3. Use multiprocessing to distribute the job, creating a Process. that passes the shared memory arrays, as well as the "chunk" indicator.
  4. Wait for multiprocessing and use the data!

Comments


EO & DA Ramblings © J Gomez-Dans Powered by Pelican and Twitter Bootstrap. Icons by Font Awesome and Font Awesome More