And by "on a budget" I mean in under 150 lines of code. Commented, no less. Of course this comes with some limitations: I restrict myself to the embarrassingly parallel case where the client code wants to partition a computation equally among identical workers. I also restrict myself to pure OCaml without external dependencies. Furthermore, I have not been entirely diligent with either my error checking or my portability: this code is tested under Mac OS X, although I expect it should work on any Unix-like operating system. With those caveats in place, the framework responds to the following signature:
type ('a, 'b, 'c, 'd) worker =
int -> ('a, 'b, 'c) Bigarray.Array1.t -> 'd -> unit
type ('a, 'b, 'c, 'd) pool
val create : int -> ('a, 'b, 'c, 'd) worker ->
('a, 'b, 'c) Bigarray.Array1.t -> ('a, 'b, 'c, 'd) pool
val close : ('a, 'b, 'c, 'd) pool -> unit
val iter : ('a, 'b, 'c, 'd) pool -> 'd array -> unit
A worker
represents the code identified by id that will compute on shmem by taking args. A pool
is set up by a call to create k worker array
and will contain k such workers whose results will be delivered on a client array. The work on N arguments is distributed by iter pool arguments
in an N-k fashion, such that as soon as a worker is finished it can be rescheduled with another work unit. The types are complicated by the parametricity over the Bigarray
s used for input-output, but this affords type safety and computational flexibility.
Note: I present the following code in a standalone fashion, but it is imperative to hide it behind the given signature to ensure its type safety. Essentially the type variables are phantom types that ensure that all applications are sound; in particular it is the only way to ensure the type-safe use of marshaling.
The computation is organized with the main program as the leader of k child processes. The communication is arranged over a mailbox abstraction:
type mbox = {
pid : int;
infd : Unix.file_descr;
inch : in_channel;
otch : out_channel;
}
An mbox
records the process ID of the worker process (or 0
for the controller) and a pair of input-output channels used for communication. The mbox
also keeps the underlying descriptor for the input channel as an optimization. Creating an mbox
is straightforward:
let make_mbox pid infd otfd =
{ pid = pid;
infd = infd;
inch = Unix.in_channel_of_descr infd;
otch = Unix.out_channel_of_descr otfd; }
Closing an mbox
abstracts away waiting on the child process to clean up properly all resources associated with the forking:
let close_mbox mbox =
if mbox.pid != 0 then
ignore (Unix.waitpid [Unix.WNOHANG] mbox.pid);
(* The file descriptor is closed through its channel *)
close_in mbox.inch;
close_out mbox.otch
Messages are sent and received over mbox
es using marshaling:
let sendval mbox v =
Marshal.to_channel mbox.otch v [Marshal.No_sharing];
flush mbox.otch
and recvval mbox = Marshal.from_channel mbox.inch
(Note a potential pitfall here: it is imperative to flush the output channel if the other end is to receive anything). The meat of the abstraction is the ability to wait on one or more mbox
es for readiness to receive messages:
let select_mbox mboxes =
let inset = List.map (fun mbox -> mbox.infd) mboxes in
let ready, _, _ = Unix.select inset [] [] (-1.) in
List.partition (fun mbox -> List.memq mbox.infd ready) mboxes
This is where the optimization of having the file descriptor at hand comes into play: select
waits forever on the passed-in mboxes
until some can be read from, and partition
s them into a ready-busy pair of lists. This completes the communications framework. Now, a worker
is concrete, as explained above:
type ('a, 'b, 'c, 'd) worker =
int -> ('a, 'b, 'c) Bigarray.Array1.t -> 'd -> unit
These worker
s are structured as actors. They receive a work unit or a command to stop, and reply with a result or with an exception:
type 'a cmd = Work of 'a | Stop
and 'a res = Done of 'a | Fail of exn
A worker
must react
to Work
requests by performing the intended computation until it is told to Stop
:
let make_worker worker i shmem =
let rec react mbox = match recvval mbox with
| Work a ->
let res = try Done (worker i shmem a) with e -> Fail e in
sendval mbox res;
react mbox
| Stop -> close_mbox mbox; exit 0
in (* … *)
Of course, before it can do anything useful a child and its parent are to part:
(* … *)
let in0, ot0 = Unix.pipe () (* parent read , child write *)
and in1, ot1 = Unix.pipe () (* parent write, child read *)
in
match Unix.fork () with
| -1 -> List.iter Unix.close [in0; in1; ot0; ot1]; failwith "fork"
| 0 ->
List.iter Unix.close [in0; ot1];
react (make_mbox 0 in1 ot0)
| pid ->
List.iter Unix.close [ot0; in1];
make_mbox pid in0 ot1
This is the classic popen
Unix pattern. Another traditional Unix-ism is for the parent to catch up with all prodigal children to clean after them:
let rec wait_all () =
try
let pid, _ = Unix.waitpid [] 0 in
if pid != 0 then wait_all ()
with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
(I know, I know: the function is not tail-recursive). Yet another traditional idiom is unlinking an open file to make it exist as long as it is kept open:
let tempfd () =
let name = Filename.temp_file Sys.executable_name "TMP" in
try
let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT] 0o600 in
Unix.unlink name;
fd
with e -> Unix.unlink name; raise e
Now I can fill in the pool
:
type ('a, 'b, 'c, 'd) pool = {
mboxes : mbox list;
input : ('a, 'b, 'c) Bigarray.Array1.t;
output : ('a, 'b, 'c) Bigarray.Array1.t;
mmapfd : Unix.file_descr;
}
It contains the list of mboxes
to communicate with its children, the client input
array, the shared output
array, and the descriptor of the file used to map it in memory. Creating the pool
involves setting up that shared memory to mirror the input
array:
let create num_workers worker array =
let kind = Bigarray.Array1.kind array
and layout = Bigarray.Array1.layout array
and dim = Bigarray.Array1.dim array in
let mmapfd = tempfd () in
let shmem = Bigarray.Array1.map_file mmapfd kind layout true dim in
let mboxes = ref [] in
(* … *)
This is a robustness pitfall: I should really test that all system calls succeed and clean up after them if not. This also is a potential portability pitfall: Mac OS X cannot map character devices, so the Linux-ism of mmap
-ing over /dev/zero must be worked around by explicitly creating a temporary file, which of course means that it must be disposed of afterwards. Now the workers can be created and their mailboxes registered:
(* … *)
try
for i = 0 to num_workers - 1 do
mboxes := make_worker worker i shmem :: !mboxes
done;
{ mboxes = !mboxes;
input = array;
output = shmem;
mmapfd = mmapfd; }
with e ->
List.iter (fun mbox ->
Unix.kill mbox.pid Sys.sigkill;
close_mbox mbox;
) !mboxes;
wait_all ();
Unix.close mmapfd;
raise e
If anything goes wrong I try to clean up the partially spawned pool. Technically it is not necessary to do a wait_all
at the end, but since close_mbox
doesn't block waiting for the child's return status it might catch a straggler under a heavily loaded system. Closing a pool
stops all its children and tidies up all used resources:
let close pool =
List.iter (fun mbox ->
sendval mbox Stop;
close_mbox mbox) pool.mboxes;
wait_all ();
Unix.close pool.mmapfd
Of course, busy children will block the pool but the next and last function will ensure that there is no possibility of that occurring: iter
is the meat of the framework. It maintains as an invariant that all workers in a pool
are always ready:
let iter pool args =
(* Precondition: All mailboxes are ready *)
let ready = ref pool.mboxes
and waiting = ref [] in
(* … *)
This is ensured by waiting on all mbox
es pending of being received from:
(* … *)
let join_waiting () =
(* Precondition: There are mailboxes waiting to be read from *)
assert (!waiting != []);
(* Find all mailboxes we can read from without blocking *)
let idle, busy = select_mbox !waiting in
(* Receive results from soon-to-become ready mailboxes *)
List.iter (fun mbox -> match recvval mbox with
| Done () -> ()
| Fail e -> Printf.fprintf stderr "Process %d: %s\n%!" mbox.pid (Printexc.to_string e)
) idle;
(* Adjust ready and waiting pools *)
ready := idle @ !ready;
waiting := busy
in (* … *)
Note that the code does not assure the postcondition that there is at least one worker ready to accept more work, as select
can fail with EAGAIN
and return an empty set of file descriptors (or something). The next code copes with that by retrying:
(* … *)
(* Prepare shared memory for work *)
Bigarray.Array1.blit pool.input pool.output;
(* Distribute n work units amongst k workers *)
let cur = ref 0 in
while !cur != Array.length args do
let arg = args.(!cur) in
match !ready with
| mbox :: ms ->
ready := ms;
waiting := mbox :: !waiting;
sendval mbox (Work arg);
incr cur
| [] -> join_waiting ()
done;
(* Final read barrier *)
while !waiting != [] do join_waiting () done;
(* Postcondition: All mailboxes are ready *)
assert (List.length !ready == List.length pool.mboxes);
(* Copy results to client *)
Bigarray.Array1.blit pool.output pool.input
The main loop is over the array of arguments to be distributed. Ready workers are immediately assigned their tasks and marked as waiting for their results to be delivered. If there are no ready workers the next argument must wait for the one to become ready. The loop ends with a read barrier that waits for all the stragglers, ensuring the invariant.
Using the framework is simple. A worker must minimally accept a subarray as a work unit. For instance, pixel planes have a given height and a width less than or equal a given stride (aligned scan lines):
type workunit = {
base : int;
stride : int;
height : int;
width : int;
}
Given such a pixel plane, distributing rendering work is easy:
let render num_workers block_size pixels width height stride =
let pool = create num_workers worker pixels in
let num_strips = (height + block_size - 1) / block_size in
let work_units = Array.init num_strips (fun i ->
let base = i * block_size in {
base = base * stride;
stride = stride;
height = min block_size (height - base);
width = width;
}) in
iter pool work_units;
close pool
That's it. In my system (an old MacBook) I get 20% to 50% speed-up rendering Mandelbrot zooms by using both cores, with negligible overhead from using the framework, compared to sequential code. If you need to adjust the code for your system I'd love to hear from your portability improvements or, God forbid, bug fixes.