A Fork-Join framework on a budget

And by "on a budget" I mean in under 150 lines of code. Commented, no less. Of course this comes with some limitations: I restrict myself to the embarrassingly parallel case where the client code wants to partition a computation equally among identical workers. I also restrict myself to pure OCaml without external dependencies. Furthermore, I have not been entirely diligent with either my error checking or my portability: this code is tested under Mac OS X, although I expect it should work on any Unix-like operating system. With those caveats in place, the framework responds to the following signature:

type ('a, 'b, 'c, 'd) worker =
    int -> ('a, 'b, 'c) Bigarray.Array1.t -> 'd -> unit

type ('a, 'b, 'c, 'd) pool

val create : int -> ('a, 'b, 'c, 'd) worker ->
    ('a, 'b, 'c) Bigarray.Array1.t -> ('a, 'b, 'c, 'd) pool

val close : ('a, 'b, 'c, 'd) pool -> unit

val iter : ('a, 'b, 'c, 'd) pool -> 'd array -> unit

A worker represents the code identified by id that will compute on shmem by taking args. A pool is set up by a call to create k worker array and will contain k such workers whose results will be delivered on a client array. The work on N arguments is distributed by iter pool arguments in an N-k fashion, such that as soon as a worker is finished it can be rescheduled with another work unit. The types are complicated by the parametricity over the Bigarrays used for input-output, but this affords type safety and computational flexibility.

Note: I present the following code in a standalone fashion, but it is imperative to hide it behind the given signature to ensure its type safety. Essentially the type variables are phantom types that ensure that all applications are sound; in particular it is the only way to ensure the type-safe use of marshaling.

The computation is organized with the main program as the leader of k child processes. The communication is arranged over a mailbox abstraction:

type mbox = {
  pid  : int;
  infd : Unix.file_descr;
  inch : in_channel;
  otch : out_channel;

An mbox records the process ID of the worker process (or 0 for the controller) and a pair of input-output channels used for communication. The mbox also keeps the underlying descriptor for the input channel as an optimization. Creating an mbox is straightforward:

let make_mbox pid infd otfd =
  { pid  = pid;
    infd = infd;
    inch = Unix.in_channel_of_descr  infd;
    otch = Unix.out_channel_of_descr otfd; }

Closing an mbox abstracts away waiting on the child process to clean up properly all resources associated with the forking:

let close_mbox mbox =
  if mbox.pid != 0 then
    ignore (Unix.waitpid [Unix.WNOHANG] mbox.pid);
  (* The file descriptor is closed through its channel *)
  close_in  mbox.inch;
  close_out mbox.otch

Messages are sent and received over mboxes using marshaling:

let sendval mbox v =
  Marshal.to_channel mbox.otch v [Marshal.No_sharing];
  flush mbox.otch

and recvval mbox = Marshal.from_channel mbox.inch

(Note a potential pitfall here: it is imperative to flush the output channel if the other end is to receive anything). The meat of the abstraction is the ability to wait on one or more mboxes for readiness to receive messages:

let select_mbox mboxes =
  let inset = List.map (fun mbox -> mbox.infd) mboxes in
  let ready, _, _ = Unix.select inset [] [] (-1.) in
  List.partition (fun mbox -> List.memq mbox.infd ready) mboxes

This is where the optimization of having the file descriptor at hand comes into play: select waits forever on the passed-in mboxes until some can be read from, and partitions them into a ready-busy pair of lists. This completes the communications framework. Now, a worker is concrete, as explained above:

type ('a, 'b, 'c, 'd) worker =
    int -> ('a, 'b, 'c) Bigarray.Array1.t -> 'd -> unit

These workers are structured as actors. They receive a work unit or a command to stop, and reply with a result or with an exception:

type 'a cmd = Work of 'a | Stop
 and 'a res = Done of 'a | Fail of exn

A worker must react to Work requests by performing the intended computation until it is told to Stop:

let make_worker worker i shmem =
  let rec react mbox = match recvval mbox with
  | Work a ->
    let res = try Done (worker i shmem a) with e -> Fail e in
    sendval mbox res;
    react mbox
  | Stop   -> close_mbox mbox; exit 0
  in (* … *)

Of course, before it can do anything useful a child and its parent are to part:

  (* … *)
  let in0, ot0 = Unix.pipe () (* parent read , child write *)
  and in1, ot1 = Unix.pipe () (* parent write, child read  *)
  match Unix.fork () with
  |  -1 -> List.iter Unix.close [in0; in1; ot0; ot1]; failwith "fork"
  |   0 ->
    List.iter Unix.close [in0; ot1];
    react (make_mbox 0 in1 ot0)
  | pid ->
    List.iter Unix.close [ot0; in1];
    make_mbox pid in0 ot1

This is the classic popen Unix pattern. Another traditional Unix-ism is for the parent to catch up with all prodigal children to clean after them:

let rec wait_all () =
    let pid, _ = Unix.waitpid [] 0 in
    if pid != 0 then wait_all ()
  with Unix.Unix_error (Unix.ECHILD, _, _) -> ()

(I know, I know: the function is not tail-recursive). Yet another traditional idiom is unlinking an open file to make it exist as long as it is kept open:

let tempfd () =
  let name = Filename.temp_file Sys.executable_name "TMP" in
    let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT] 0o600 in
    Unix.unlink name;
  with e -> Unix.unlink name; raise e

Now I can fill in the pool:

type ('a, 'b, 'c, 'd) pool = {
  mboxes : mbox list;
  input  : ('a, 'b, 'c) Bigarray.Array1.t;
  output : ('a, 'b, 'c) Bigarray.Array1.t;
  mmapfd : Unix.file_descr;

It contains the list of mboxes to communicate with its children, the client input array, the shared output array, and the descriptor of the file used to map it in memory. Creating the pool involves setting up that shared memory to mirror the input array:

let create num_workers worker array =
  let kind   = Bigarray.Array1.kind   array
  and layout = Bigarray.Array1.layout array
  and dim    = Bigarray.Array1.dim    array in
  let mmapfd = tempfd () in
  let shmem  = Bigarray.Array1.map_file mmapfd kind layout true dim in
  let mboxes = ref [] in
  (* … *)

This is a robustness pitfall: I should really test that all system calls succeed and clean up after them if not. This also is a potential portability pitfall: Mac OS X cannot map character devices, so the Linux-ism of mmap-ing over /dev/zero must be worked around by explicitly creating a temporary file, which of course means that it must be disposed of afterwards. Now the workers can be created and their mailboxes registered:

  (* … *)
    for i = 0 to num_workers - 1 do
      mboxes := make_worker worker i shmem :: !mboxes
    { mboxes = !mboxes;
      input  = array;
      output = shmem;
      mmapfd = mmapfd; }
  with e ->
    List.iter (fun mbox ->
      Unix.kill mbox.pid Sys.sigkill;
      close_mbox mbox;
    ) !mboxes;
    wait_all ();
    Unix.close mmapfd;
    raise e

If anything goes wrong I try to clean up the partially spawned pool. Technically it is not necessary to do a wait_all at the end, but since close_mbox doesn't block waiting for the child's return status it might catch a straggler under a heavily loaded system. Closing a pool stops all its children and tidies up all used resources:

let close pool =
  List.iter (fun mbox ->
    sendval mbox Stop;
    close_mbox mbox) pool.mboxes;
  wait_all ();
  Unix.close pool.mmapfd

Of course, busy children will block the pool but the next and last function will ensure that there is no possibility of that occurring: iter is the meat of the framework. It maintains as an invariant that all workers in a pool are always ready:

let iter pool args =
  (* Precondition: All mailboxes are ready *)
  let ready   = ref pool.mboxes
  and waiting = ref [] in
  (* … *)

This is ensured by waiting on all mboxes pending of being received from:

  (* … *)
  let join_waiting () =
    (* Precondition: There are mailboxes waiting to be read from *)
    assert (!waiting != []);
    (* Find all mailboxes we can read from without blocking *)
    let idle, busy = select_mbox !waiting in
    (* Receive results from soon-to-become ready mailboxes *)
    List.iter (fun mbox -> match recvval mbox with
    | Done () -> ()
    | Fail e  -> Printf.fprintf stderr "Process %d: %s\n%!" mbox.pid (Printexc.to_string e)
    ) idle;
    (* Adjust ready and waiting pools *)
    ready   := idle @ !ready;
    waiting := busy
  in (* … *)

Note that the code does not assure the postcondition that there is at least one worker ready to accept more work, as select can fail with EAGAIN and return an empty set of file descriptors (or something). The next code copes with that by retrying:

  (* … *)
  (* Prepare shared memory for work *)
  Bigarray.Array1.blit pool.input pool.output;
  (* Distribute n work units amongst k workers *)
  let cur = ref 0 in
  while !cur != Array.length args do
    let arg = args.(!cur) in
    match !ready with
    | mbox :: ms ->
      ready   := ms;
      waiting := mbox :: !waiting;
      sendval mbox (Work arg);
      incr cur
    | [] -> join_waiting ()
  (* Final read barrier *)
  while !waiting != [] do join_waiting () done;
  (* Postcondition: All mailboxes are ready *)
  assert (List.length !ready == List.length pool.mboxes);
  (* Copy results to client *)
  Bigarray.Array1.blit pool.output pool.input

The main loop is over the array of arguments to be distributed. Ready workers are immediately assigned their tasks and marked as waiting for their results to be delivered. If there are no ready workers the next argument must wait for the one to become ready. The loop ends with a read barrier that waits for all the stragglers, ensuring the invariant.

Using the framework is simple. A worker must minimally accept a subarray as a work unit. For instance, pixel planes have a given height and a width less than or equal a given stride (aligned scan lines):

type workunit = {
  base   : int;
  stride : int;
  height : int;
  width  : int;

Given such a pixel plane, distributing rendering work is easy:

let render num_workers block_size pixels width height stride =
  let pool = create num_workers worker pixels in
  let num_strips = (height + block_size - 1) / block_size in
  let work_units = Array.init num_strips (fun i ->
    let base = i * block_size in {
    base   = base * stride;
    stride = stride;
    height = min block_size (height - base);
    width  = width;
  }) in
  iter pool work_units;
  close pool

That's it. In my system (an old MacBook) I get 20% to 50% speed-up rendering Mandelbrot zooms by using both cores, with negligible overhead from using the framework, compared to sequential code. If you need to adjust the code for your system I'd love to hear from your portability improvements or, God forbid, bug fixes.

1 comment:

Anonymous said...

Heh... Beautiful dry code. I'd like to learn to think and program like you. Envy :)