Trying to emulate a text-processing pipeline in a functional language with the intent of replacing shell-level programming is both natural and appealing. In Haskell, the lazy nature of the language makes this efficient as a bonus, as the streaming nature of Unix pipes is preserved.
Trying to do this naïvely in a strict language like OCaml results in programs that transform the entire contents of the file at each step, consuming unbounded amounts of memory and generating large quantities of garbage. The obvious enhancement is to process text files one line at a time.
I'll start with the line reader I blogged about in December.. The idea of that code was that to iterate over a file's lines, applying a user-supplied function to each one in turn. To give pipelines a more shell-like flavor, I'd like to abstract out the mechanics of opening and closing files, and working instead with filenames.
The higher-order RAII pattern is straightforward in a functional language:
let unwind ~(protect: 'a -> unit) f x = try let res = f x in protect x; res with e -> protect x; raise e
With this, I can define channel handlers protect the application of a user function to a channel:
let with_input_channel f = unwind ~protect:close_in f and with_output_channel f = unwind ~protect:close_out f
To get it out of the way now, this is the line-writing function I'll need later:
let output_endline out s = output_string out s; output_char out '\n'
(In contrast to
print_endline, this one writes to an output channel.) With this, I can define a version of the line iterator that takes a filename instead of an already-opened channel:
let read fname f = with_input_channel (iter_lines f) (open_in_bin fname)
The type of
string -> (string -> unit) -> unit. It can be viewed not as an iterator but as a function returning a continuation monad.
Just as I did with
read, I would like to abstract away the opening and closing of the output file, and expose an interface that only deals with file names. If the pipeline is to consume its input line by line, it is not immediate how that would work without opening the named file, appending the line to it and closing it afterwards, once for each line. So as a first approximation,
write fname would have the shape:
let write fname = with_output_channel (fun out -> … ) (open_out_bin fname)
Somehow, lines would be delivered to
with_output_channel's argument, to be written by
let write fname = with_output_channel (fun out -> … output_endline out …) (open_out_bin fname)
The only sensible thing to do is to let
output_endline out be the continuation of… something that would invoke it for each processed line. Abstracting out that "something":
let write fname f = with_output_channel (fun out -> f (output_endline out)) (open_out_bin fname)
But then I can write the argument in point-free style:
let (%) f g x = f (g x) let write fname f = with_output_channel (f % output_endline) (open_out_bin fname)
The type of
string -> ((string -> unit) -> 'a) -> 'a. This is analogous to the so-called runner in the continuation monad. Now we have that
read "foo" has type
(string -> unit) -> unit, which unifies with the type of the second argument to
write "bar" with
'a equal to
unit. This implies that
write "bar" (read "foo") really copies files, one line at a time (actually, it normalizes line endings). There is really no need to prove this by equational reasoning; the types are a "free theorem" when interpreted in the continuation monad.
Now, it's clear that the combinator
( >> ) that would allow us to write
read "foo" >> write "bar" is simply reverse application in operator form:
let (>>) x f = f x
Now, I would like to interpose some line-processing functions in the pipeline between
write. For instance, a regular-expression global-replace-and-substitute (a.k.a.
let replace = Str.global_replace % Str.regexp
It really works:
# replace "a+" "amas " "anaconda" ;; - : string = "amas namas condamas "
Hocus-pocus. Also, the following is useful:
let escapeXML s = let b = Buffer.create (String.length s) in String.iter (function | '<' -> Buffer.add_string b "<" | '>' -> Buffer.add_string b ">" | '&' -> Buffer.add_string b "&" | '"' -> Buffer.add_string b """ | c -> let d = int_of_char c in if d < 128 then Buffer.add_char b c else begin Buffer.add_string b "&#"; Buffer.add_string b (string_of_int d); Buffer.add_string b ";" end) s; Buffer.contents b
So, now, how to stack these
string -> string line-editors in the input-output pipeline? A general pipeline would look like this:
read "foo" ## led1 ## led2 … ## ledn ## write "bar"
## are the relevant operators. One of these must be our
( >> ) applicator; and if n = 0, it must be either the first or the last. By the types of them, the
read continuation looks simpler to compose; so let's build the transformations from the left and let it be applied en bloc to
read "foo" ## led1 ## led2 … ## ledn >> write "bar"
The chain of line-editors must stack on top the line-reading continuation; in other words,
read "foo" will have to be the bottommost, last continuation applied to
write "bar". We need
## to be left-associative; hence, let's call it
( |> ):
read "foo" |> stred1 |> stred2 … |> stredn >> write "bar"
( |> ) takes a reader and a line-editor, and builds a filtered reader. This reader takes a continuation argument with type
(string -> unit) -> unit. Tt is the string passed to it that must be filtered, and composition suffices:
let (|>) reader f k = reader (k % f)
On the other hand, a right-associative operator
( @> ) that would let us to write a right-to-left filtered output pipeline:
read "foo" >> stred1 @> stred2 … @> stredn @> write "bar"
is more complicated. First of all, it should take a filter and a writer and return a filtered writer:
let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a) : ((string -> unit) -> 'a) -> 'a = …
The types should guide the refining of the operator. Given what they are, for now, let's make it ignore the filter and return the writer unmodified:
let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a) : ((string -> unit) -> 'a) -> 'a = writer
This writer is applied a writing continuation of type
(string -> unit) -> 'a. Abstracting it out:
let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a) : ((string -> unit) -> 'a) -> 'a = fun (k : (string -> unit) -> 'a) -> writer k
This continuation gets passed, in turn, the continuation that actually writes each line out. Abstracting it out:
let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a) : ((string -> unit) -> 'a) -> 'a = fun (k : (string -> unit) -> 'a) -> writer (fun (w : string -> unit) -> k w)
Now we have a context where to apply the filter:
let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a) : ((string -> unit) -> 'a) -> 'a = fun (k : (string -> unit) -> 'a) -> writer (fun (w : string -> unit) -> k (w % f))
Eliminating the typing constraints:
let (@>) f writer k = writer (fun w -> k (w % f))
We can now write
read "file.xml" >> replace "[<>]" "#" @> write "foo.txt" or
read "file.xml" |> replace "[<>]" "#" >> write "foo.txt", whichever is more natural.