Refining Pipelines

Trying to emulate a text-processing pipeline in a functional language with the intent of replacing shell-level programming is both natural and appealing. In Haskell, the lazy nature of the language makes this efficient as a bonus, as the streaming nature of Unix pipes is preserved.

Trying to do this naïvely in a strict language like OCaml results in programs that transform the entire contents of the file at each step, consuming unbounded amounts of memory and generating large quantities of garbage. The obvious enhancement is to process text files one line at a time.

I'll start with the line reader I blogged about in December.. The idea of that code was that to iterate over a file's lines, applying a user-supplied function to each one in turn. To give pipelines a more shell-like flavor, I'd like to abstract out the mechanics of opening and closing files, and working instead with filenames.

The higher-order RAII pattern is straightforward in a functional language:

let unwind ~(protect: 'a -> unit) f x =
    let res = f x in
    protect x; res
  with e ->
    protect x; raise e

With this, I can define channel handlers protect the application of a user function to a channel:

let with_input_channel  f = unwind ~protect:close_in  f
and with_output_channel f = unwind ~protect:close_out f

To get it out of the way now, this is the line-writing function I'll need later:

let output_endline out s =
  output_string out s;
  output_char out '\n'

(In contrast to print_endline, this one writes to an output channel.) With this, I can define a version of the line iterator that takes a filename instead of an already-opened channel:

let read fname f = with_input_channel (iter_lines f) (open_in_bin fname)

The type of read is string -> (string -> unit) -> unit. It can be viewed not as an iterator but as a function returning a continuation monad.

Just as I did with read, I would like to abstract away the opening and closing of the output file, and expose an interface that only deals with file names. If the pipeline is to consume its input line by line, it is not immediate how that would work without opening the named file, appending the line to it and closing it afterwards, once for each line. So as a first approximation, write fname would have the shape:

let write fname =
  with_output_channel (fun out -> … ) (open_out_bin fname)

Somehow, lines would be delivered to with_output_channel's argument, to be written by output_endline:

let write fname =
  with_output_channel (fun out -> … output_endline out …)
    (open_out_bin fname)

The only sensible thing to do is to let output_endline out be the continuation of… something that would invoke it for each processed line. Abstracting out that "something":

let write fname f =
  with_output_channel (fun out -> f (output_endline out)) (open_out_bin fname)

But then I can write the argument in point-free style:

let (%) f g x = f (g x)

let write fname f =
  with_output_channel (f % output_endline) (open_out_bin fname)

The type of write is string -> ((string -> unit) -> 'a) -> 'a. This is analogous to the so-called runner in the continuation monad. Now we have that read "foo" has type (string -> unit) -> unit, which unifies with the type of the second argument to write "bar" with 'a equal to unit. This implies that write "bar" (read "foo") really copies files, one line at a time (actually, it normalizes line endings). There is really no need to prove this by equational reasoning; the types are a "free theorem" when interpreted in the continuation monad.

Now, it's clear that the combinator ( >> ) that would allow us to write read "foo" >> write "bar" is simply reverse application in operator form:

let (>>) x f = f x

Now, I would like to interpose some line-processing functions in the pipeline between read and write. For instance, a regular-expression global-replace-and-substitute (a.k.a. sed):

let replace = Str.global_replace % Str.regexp

It really works:

# replace "a+" "amas " "anaconda" ;;
- : string = "amas namas condamas "

Hocus-pocus. Also, the following is useful:

let escapeXML s =
  let b = Buffer.create (String.length s) in
  String.iter (function
  | '<' -> Buffer.add_string b "<"
  | '>' -> Buffer.add_string b ">"
  | '&' -> Buffer.add_string b "&"
  | '"' -> Buffer.add_string b """
  | c ->
    let d = int_of_char c in
    if d < 128 then Buffer.add_char b c else begin
      Buffer.add_string b "&#";
      Buffer.add_string b (string_of_int d);
      Buffer.add_string b ";"
    end) s;
  Buffer.contents b

So, now, how to stack these string -> string line-editors in the input-output pipeline? A general pipeline would look like this:

read "foo" ## led1 ## led2 … ## ledn ## write "bar"

where the ## are the relevant operators. One of these must be our ( >> ) applicator; and if n = 0, it must be either the first or the last. By the types of them, the read continuation looks simpler to compose; so let's build the transformations from the left and let it be applied en bloc to write:

read "foo" ## led1 ## led2 … ## ledn >> write "bar"

The chain of line-editors must stack on top the line-reading continuation; in other words, read "foo" will have to be the bottommost, last continuation applied to write "bar". We need ## to be left-associative; hence, let's call it ( |> ):

read "foo" |> stred1 |> stred2 … |> stredn >> write "bar"

So that ( |> ) takes a reader and a line-editor, and builds a filtered reader. This reader takes a continuation argument with type (string -> unit) -> unit. Tt is the string passed to it that must be filtered, and composition suffices:

let (|>) reader f k = reader (k % f)

On the other hand, a right-associative operator ( @> ) that would let us to write a right-to-left filtered output pipeline:

read "foo" >> stred1 @> stred2 … @> stredn @> write "bar"

is more complicated. First of all, it should take a filter and a writer and return a filtered writer:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = …

The types should guide the refining of the operator. Given what they are, for now, let's make it ignore the filter and return the writer unmodified:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = writer

This writer is applied a writing continuation of type (string -> unit) -> 'a. Abstracting it out:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = fun (k : (string -> unit) -> 'a) -> writer k

This continuation gets passed, in turn, the continuation that actually writes each line out. Abstracting it out:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = 
    fun (k : (string -> unit) -> 'a) -> writer (fun (w : string -> unit) -> k w)

Now we have a context where to apply the filter:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = 
    fun (k : (string -> unit) -> 'a) -> writer (fun (w : string -> unit) -> k (w % f))

Eliminating the typing constraints:

let (@>) f writer k = writer (fun w -> k (w % f))

We can now write read "file.xml" >> replace "[<>]" "#" @> write "foo.txt" or read "file.xml" |> replace "[<>]" "#" >> write "foo.txt", whichever is more natural.


Yoric said...

Out of curiosity: why not do that with streams ?

Matías Giovannini said...

It could be done reasonably well with streams. The pipelining operators would then be stream combinators, of which there are examples for parsing, for instance. In a way, if you view streams as a head and a frozen closure representing the tail or, as the case may be, the rest of the computation, both approaches are more or less equivalent.

But sometimes generators are useful, and a technique that is better-known in imperative languages like Python, but rather obscure in functional languages like OCaml.