Parallel Processor (pproc)

Written By: James Tay (james@2longbeans.net)
Start Date: 22nd Sep 2010
Last Revision: 20th Oct 2010

Project Page

Description

This program reads data from standard input and processes the stream in parallel using multiple instances of "cmd", for example "gzip -9c". The output is then combined and written to standard output. This allows the caller to make use of multiple CPUs to process the stream in parallel. It is very important to note that the output from this program is NOT similar to the output from simply using "cmd" by itself. That is to say,

    % gzip -9c < my-in-file > output1
    % pproc -e "gzip -9c" < my-in-file > output2

The contents of "output1" and "output2" are NOT the same. The reason is that this program chops up the input stream in to "ncpu" chunk pieces of "chunk size", then sends each chunk into a seperate instance of "cmd" while collecting the output. The processed output from each chunk is then recombined and written to standard output. Thus, the format of this processed output is :

    <stream header><chunk header1><data1>[<chunk headerN><dataN>...]

This implies that processed output can be automagically detected by examining the first few bytes of the stream. This in turn causes the program to automatically operate in an "unprocess" mode if its input begins with this header. To continue with the above example, the original contents of "my-in-file" can be obtained using :

    % pproc -e "gzip -cd" < output2 > my-in-file

Note that the writing of chunk headers can be controlled by setting the "-b" flag. Not writing headers may be applicable in some scenerios.

Memory Buffers

This program prepares a total of 2 input chunk sets and 2 output chunk sets, where each chunk set consists of buffers equal to the value of ncpu. If we're running across 4 CPUs, consider the following buffers :

   stdin -.   [ inbuf-A-1 ] -> cmd -> [ outbuf-A-1 ]
          |   [ inbuf-A-2 ] -> cmd -> [ outbuf-A-2 ]
          |   [ inbuf-A-3 ] -> cmd -> [ outbuf-A-3 ]
          |   [ inbuf-A-4 ] -> cmd -> [ outbuf-A-4 ]
          |
          `-> [ inbuf-B-1 ]           [ outbuf-B-1 ]
              [ inbuf-B-2 ]           [ outbuf-B-2 ]
              [ inbuf-B-3 ]           [ outbuf-B-3 ]
              [ inbuf-B-4 ]           [ outbuf-B-4 ] ---> stdout

When data arrives, this program fills up one input chunk set. Once filled, 4 instances of "cmd" are run and input then continues into the second input chunk set. Once all 4 "cmd" instances complete, data is written out from that output chunk set. In the meantime, another set of 4 "cmd" instances are spawned, and their outputs now accumulate in the other output chunk set.

Essentially, while "cmd" operates on reading inbuf A and writing to outbuf A, then standard input is flowing into inbuf B and outbuf B is flushing to standard output. These buffers get switched around once the following conditions are met :

  1. current input buffers are totally filled (from stdin).
  2. current output buffers are totally flushed (to stdout).
  3. all instances of "cmd" have terminated.

Note that buffers in the input/output chunk set may differ in length. For example, if "cmd" performs decompression, the output buffers will be automatically grown to accomodate output from "cmd".

When spawning "cmd", the following environment variables will be set. This can assist the end user in further fine tuning and customization, eg, taskset'ing to certain CPU cores, etc.

    PPROC_NCPU the value of "ncpu" as specified on commandline
    PPROC_BUF_ID the chunk buffer (0 - ncpu)
    PPROC_CHUNK_LEN actual number of data bytes in the chunk buffer
    PPROC_CHUNK_ID the chunk ID number, starting from 0
    PPROC_STREAM 0=raw input, 1=stream input

Bugs

If the user supplied command produces an unreasonably large amount of output, this program will try to grow the buffers to accomodate it until memory exhaustion occurs. For example,

    % pproc -e "dd if=/dev/zero" ...

Standard error generated by child processes are all merged into this program's standard error stream. There is no way to distinguish which child process generated which message.

Output generated on a machine with a large value of chunk size may not "unprocessed" successfully on machines with limited memory.