The (square) root of refinement

There's this great little book, A Method of Programming by Edsger W. Dijkstra and Wim Feijen, which is dry and precise as a smack of a ruler against collected fingertips. Really, I cannot recommend it highly enough. It includes a little essay, Coordinate transformation that —for me— illustrates beautifully the methodical, stepwise refining of a program by meaning-preserving transformations.

They start with a naïve version of code for extracting integer square roots:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let a = ref 0
  and b = ref 1 in
  while !b * !b <= n do b := !b lsl 1 done;
  while !b != !a + 1 do
    let c = (!a + !b) lsr 1 in
    if !c * !c <= n then a := !c else b := !c

The first loop finds the least k such that b² = 4k > n; this takes 1 + ⌊(lg n)/2⌋ iterations. The second iteration performs a binary search for the least c such that c² ≤ n < (c+1)² in the range 1 to b, for 1 + ⌊lg (b - a)⌋ = 1 + ⌊lg (2k)⌋ = 1 + ⌊lg n⌋ iterations. But they note that "each iteration requires a multiplication, however, and since on the whole the general multiplicative operations requires (an order of magnitude) more time than additive operations, halvings and doublings, the question arises whether these general multiplications can be eliminated." To answering this in the affirmative then they set out.

The basic idea is to replace multiplications (more specifically, squarings) by finite differences. It is well-known that the sum of integers in arithmetic progression gives (more or less) a square. Note that b - a is always a power of 2 (by induction), and so the division by shifting is exact. This enables them to introduce a variable d = b - a at every step:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let a = ref 0
  and b = ref 1
  and d = ref 1 (* invariant: d = b - a *) in
  while !b * !b <= n do b := !b lsl 1; d := !d lsl 1 done;
  while !b != !a + 1 do
    let c = (!a + !b) lsr 1 in
    if !c * !c <= n then begin
      a := !c;
      d := !d lsr 1
    end else begin
      b := !c;
      d := !d lsr 1

which readily transforms to:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let a = ref 0
  and b = ref 1
  and d = ref 1 (* invariant: d = b - a *) in
  while !b * !b <= n do b := !b lsl 1; d := !d lsl 1 done;
  while !b != !a + 1 do
    let c = (!a + !b) lsr 1 in
    d := !d lsr 1;
    (* a + d = cc + d = b *)
    if !c * !c <= n then a := !c else b := !c

Moving the update to d upwards out of the conditional instead of downards might seem a rabbit pulled out of the hat, but by doing so all four cs can be replaced by a + d in the last alternative statement, as the invariant in the comment explains. This makes c unused and so it can be eliminated:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let a = ref 0
  and b = ref 1
  and d = ref 1 (* invariant: d = b - a *) in
  while !b * !b <= n do b := !b lsl 1; d := !d lsl 1 done;
  while !b != !a + 1 do
    d := !d lsr 1;
    if (!a + !d) * (!a + !d) <= n then a := !a + !d else b := !a + !d

Now in the first loop, a = 0, and so b = d, hence the loop condition can mention d instead of b. Also, note that because d = b - a is kept invariant in the second repetition, the condition ba + 1 can be rewritten as d ≠ 1, and the second branch of the conditional is trivial. This makes b unused and can be eliminated:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let a = ref 0
  and d = ref 1 in
  while !d * !d <= n do d := !d lsl 1 done;
  while !d != 1 do
    d := !d lsr 1;
    if (!a + !d) * (!a + !d) <= n then a := !a + !d

This doesn't really bought us anything except replacing two variables by one. However, the square of the binomial in the condition inside the loop can be rewritten as (a + d)² ≤ n ≡ 2⋅ad + d² ≤ n - a². Now, we could keep three variables, p = ad, q = d² and r = n - a² invariantly (hm, invariant variables…) and profit from the simpler relations holding between them:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let a = ref 0
  and d = ref 1
  and p = ref 0
  and q = ref 1
  and r = ref n in
  while !d * !d <= n do q := !q lsl 2; d := !d lsl 1 done;
  while !d != 1 do
    q := !q lsr 2;
    p := !p lsr 1;
    d := !d lsr 1;
    if 2 * !a * !d + !d * !d <= n - !a * !a then begin
      r := !r - (2 * !a * !d + !d * !d);
      p := !p + !q;
      a := !a + !d

The introduction was straightforward and mechanical, except for the fact that, since a = 0 in the first loop, doubling p was not necessary. Now observe that in it the condition d² ≤ n is equal to qr since a = 0, and so we may effect the replacement. On the other hand, in the second loop the condition d ≠ 1 equivales to q ≠ 1, and we may replace it. Finally, in the inner conditional, 2⋅ad + d² = 2⋅p + q and n - a² = r by definition, and we can simplify. These substitutions make both d and a unused and they can be left out, since with d = 1 is p = a:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let p = ref 0
  and q = ref 1
  and r = ref n in
  while !q <= !r do q := !q lsl 2 done;
  while !q != 1 do
    q := !q lsr 2;
    p := !p lsr 1;
    if 2 * !p + !q <= !r then begin
      r := !r - (2 * !p + !q);
      p := !p + !q

For convenience we could introduce a new constant h = 2⋅p + q:

let isqrt n =
  if n <= 0 then failwith "isqrt" else
  let p = ref 0
  and q = ref 1
  and r = ref n in
  while !q <= !r do q := !q lsl 2 done;
  while !q != 1 do
    q := !q lsr 2;
    p := !p lsr 1;
    let h = 2 * !p + !q in
    if h <= !r then begin
      r := !r - h;
      p := !p + !q

Dijkstra's and Feijen's essay stops here, but not before they remark that:

Here we have shown extensively the transformation process — introduction of new variables and invariants, rewriting guards, and then elimination of original variables. Carried out this way it requires a lot of writing. Someone who is used to this method of program development would make the transition from old to new variables in one step, without writing down the version with both variables. Rewriting, which this method of program development entails, remains a drawback, however.

To which I would add, not so much of a drawback with a good text editor with multiple undos and an interactive top-level interpreter with a read-eval-print loop.


Pass the Bucket

Although not without its problems, hash tables are a popular data structure for storage and retrieval of key/value pairs, otherwise known as finite maps or finite functions. It is the main data structure in dynamic languages like JavaScript, AWK, Arc, Io, Python, Perl and others. To recapitulate, the idea behind a hash table is to store data in an array (which is a finite map with domain a contiguous range of nonnegative numbers), by encoding the position of the datum associated with key K by means of a function hash K, returning integral values between 0 and N - 1. What is the advantage of this? If the time to compute this function is essentially constant, and since array access is in turn constant, this makes a hash table a constant-time finite map on an arbitrary domain of keys.

All good and dandy except for the fact that, in general, hash is not injective: whenever two different keys KK′ have hash K = hash K′, a collision has occurred, and we must devise some way to arbitrate between the data wanting to go into the same position of the array. A popular way to do this is to crowd more than one item in the same position: by chaining overflowing items into a linked list, the hash table may become oblivious to hash collisions. An obvious problem with this is that if the hash function is very poor (think, for instance, on the hash that sends every item to 0), the expected constant-time algorithm degenerates into a linear search, which for thousands or millions of items is, in Knuth's words, "too horrible to contemplate".

The solution to this is two-pronged: first, choose a function hash that essentially behaves like a random function: the probability of hash K being equal to a given (arbitrary) number should be as near to 1/N as possible. This is in general pretty difficult, which is why good hash functions are hard to find. Secondly, it is important not to let the hash table become too crowded. By the pigeonhole principle, it is obvious that a table of size M containing N > M data would have at least an overflowed entry. To avoid this, increase the table size once it reaches a certain occupancy factor α = N/M. It is important that the size increases exponentially in order to maintain average-case constant-time operations (the literature explains why.) This, however, leaves wasted space behind, especially if after a number of insertions follow a number of deletions, as the extra space is never reclaimed. To address this last problem, shrink the table once α falls below some value. It is important, however, to add some hysteresis to the process to avoid successive resizings when insertions and deletions alternate on a table of critical size. Typically, a hash table must trigger an expansion when α ≥ 0.75 and a shrinkage when α < 0.5, for instance.

The bottom line is that it is easy to cobble together a substandard, underperforming implementation of hash tables. It is important to understand that if the hash function behaves essentially as a random variable there can be no performance guarantees, only probabilistic expectations. However, the comment usually heard is that hash tables "perform well in practice"; I should say that good implementations of hash tables perform well in practice. OCaml's implementation shows all the subtleties a good hash table data structure must take into account. The point I wanted to make, and that needed this long introduction (maybe it didn't really needed it, but then I hope you found it interesting nonetheless) is, what happens when you remove the buckets from a hash table? Why, you get vectors!

It is a glaring omission in the Ocaml's standard library the lack of an extensible, dynamically-resizing array. This is such a useful data structure that it should come included; it is, however, not very difficult to implement. The idea is to use an underlying array big enough for the elements currently stored, with a count of said elements. Upon getting full, the code resizes the array. For this, we need not only a mutable count but also a mutable array:

type 'a t = { mutable vec : 'a array; mutable cnt : int; }

On creation, it is convenient for the array to have a not too-small size to avoid frequent early doublings. Clearly an initial empty vector (of size 0) is less than convenient, as it would need special-casing the calculation. I settled on 16 initial elements, but this can be tuned:

let initial_size = 16

Now the problem with a non-empty array is that it needs initializing with an element to avoid type safety violations:

let make e = { vec = Array.make initial_size e; cnt = 0; }

The code must ensure, at all times, the precondition that no position in the array beyond cnt will ever be accessed. This would allow using an unsafe initialization:

let create () = make (Obj.magic 0)

This, however, demands that the type of vectors be made abstract; this is left as an exercise to you, the reader. With this, simple manipulations of the element count give us some handy functions:

let length v = v.cnt

let clear v = v.cnt <- 0

Getting and setting elements in the array must respect the vector invariant, by checking bounds:

let get v i =
  if 0 > i || i >= v.cnt then invalid_arg "index out of bounds" else
  Array.unsafe_get v.vec i

let set v i x =
  if 0 > i || i >= v.cnt then invalid_arg "index out of bounds" else
  Array.unsafe_set v.vec i x

It would be futile to check twice for bounds, once for our vectors and one for OCaml's native arrays, whose size is at least that of our vectors; so I can use an unsafe array operation with confidence. This is now enough to give us simple iterators over vectors:

let iteri (f : int -> 'a -> unit) v =
  for i = 0 to v.cnt - 1 do
    f i (Array.unsafe_get v.vec i)

let fold f e v =
  let acc = ref e in
  iteri (fun _ x -> acc := f !acc x) v;

Again, the validity of the invariant allows me to use unsafe accesses in iteri with impunity. I've used an imperative fold (left) based on iteri, but doing it the other way around is as easy. So far, the code leaves out the small matter of extension: all our vectors have constant size zero! Checking for the vector invariant is best left to an auxiliary function:

let check v =
  let siz = Array.length v.vec in
  if v.cnt == siz then begin
    let cnt = min (siz lsl 1) Sys.max_array_length in
    if cnt == siz then failwith "cannot resize" else
    let vec = Array.make cnt v.vec.(0) in
    Array.blit v.vec 0 vec 0 siz;
    v.vec <- vec

If the array has leftover capacity to grow, there is obviously nothing to be done. If not, there is one implementation detail that must be taken into account: arrays in OCaml have a maximum size (which is rather small), so the new size is double the old size but only up to Sys.max_array_length. Beyond that, there's not much we could do except fail. If not, a new array is created and initialized with an arbitrary but existing (check that this is the case!) element of the old array; the elements are copied over and the array replaced. You can see the same code in the implementation of Hashtbl.resize, except that if the array reaches the maximum allowable length nothing is done and the hash table is allowed to overflow.

With this, we can extend a vector from its end:

let add v e =
  check v;
  Array.unsafe_set v.vec v.cnt e;
  v.cnt <- v.cnt + 1

The check ensures that the precondition is satisfied, and the underlying array can be accessed in an unsafe manner. Similarly, to remove the element at the end:

let remove v =
  if v.cnt == 0 then failwith "pop" else
  v.cnt <- v.cnt - 1;
  Array.unsafe_get v.vec v.cnt

These two operations endow the vector with a FIFO discipline, making it suitable for a stack with random access. The signature for the abstract data type implementation is:

module Vec :
    type 'a t
    val make   : 'a -> 'a t
    val create : unit -> 'a t
    val length : 'a t -> int
    val clear  : 'a t -> unit
    val get    : 'a t -> int -> 'a
    val set    : 'a t -> int -> 'a -> unit
    val iteri  : (int -> 'a -> unit) -> 'a t -> unit
    val fold   : ('a -> 'b -> 'a) -> 'a -> 'b t -> 'a
    val add    : 'a t -> 'a -> unit
    val remove : 'a t -> 'a

It is interesting to compare the issues of extensibility in the face of type safety with Java's ArrayList<T> implementation. You can see in the constructor and in ensureCapacity unsafe code analogous to create. However, since the ArrayList stores object references, it avoids the need to initialize the underlying array with a dummy object: null suffices! The same idea could be used in OCaml: instead of using an underlying 'a array as the store, we could use an 'a option array initialized with None. Then, set and add would box data items with Some x, and get and remove would unbox them with a pattern match. The resulting interface would be unchanged, since no valid entry in the vector could be None, and the implementation would actually be type-safe. The fact that both versions would be observationally equivalent means that this implementation, although unsafe, is sound.

Euler's First Problem

Actually, Euler project's first problem:

If we list all the natural numbers below 10 that are multiples of 3 or 5, we get 3, 5, 6 and 9. The sum of these multiples is 23.

Find the sum of all the multiples of 3 or 5 below 1000.

This is simple enough (it is ranked first by the number of solutions to it) that you can cook up a brute-force solution in minutes, and the limit parameter N = 1,000 is so low that brute-forcing it is quite reasonable. First, a natural-interval-producing function:

let rec between m n =
  if m >= n then [] else
  m :: between (m + 1) n

With this, the problem gets solved by:

open List

let prob1 max = fold_left (+) 0
  (filter (fun p -> p mod 3 = 0 || p mod 5 = 0)
    (between 1 max))

And so, it instantaneously gives that:

# prob1 1000 ;;
- : int = 233168

(Sorry about the spoiler.) There is, however, the nagging sensation that there's not much point in going through every number, discarding those that we don't want and summing those which are left. Can't we just generate those multiples of 3 or 5 directly, without chaff so to speak? Yes, by a trick analogous to Dijkstra's solution to the so-called Hamming problem:

let prob1 max =
  let rec iter sm p3 p5 =
    let m, q3, q5 =
      if p5 < p3 then p5, p3,   p5+5 else
      if p3 < p5 then p3, p3+3, p5   else
                      p3, p3+3, p5+5 in
    if m >= max then sm else
    iter (m + sm) q3 q5
  in iter 0 0 0

The idea of the inner loop is to keep the running sum and the next multiples of 3 and 5 to consider next. Of both, the least is added to the sum and updated in turn, up to the requested limit. This not only limits the numbers needing to be inspected precisely to the multiples of 3 or 5 in order, but it also deforests completely the intermediate lists in the original program.

For N = 1,000, there's no perceptible difference between both versions, but for N = 100,000 there is some, and for N = 1,000,000 the non-tail-recursive between blows the stack (and the second version overflows an integer, so I guess both fail miserably.)

This problem is indeed so simple that I'd leave it at that, were not for the fact that it rather easily gives in to some analysis. Let:

s3 = [sum i : 0 ≤  3*i < 1000 :  3*i]
s5 = [sum i : 0 ≤  5*i < 1000 :  5*i]
sf = [sum i : 0 ≤ 15*i < 1000 : 15*i]

By the inclusion-exclusion principle, prob1 1000 = s3 + s5 - sf. Also:

 3*i < 1000  ⇐  i < ⌈1000/ 3⌉ = 334;
 5*i < 1000  ⇐  i < ⌈1000/ 5⌉ = 200;
15*i < 1000  ⇐  i < ⌈1000/15⌉ =  67.

It is well known that:

s.n = [sum i : 0 ≤ i < n : i] = n*(n-1)/2.


s3 =  3*[sum i : 0 ≤ i < 334 : i] =  3 * s.334 = 166833
s5 =  5*[sum i : 0 ≤ i < 200 : i] =  5 * s.200 =  99500
sf = 15*[sum i : 0 ≤ i <  67 : i] = 15 * s. 67 =  33165

and so, the analytic solution is, simply 166833 + 99500 - 33165 = 233168. In the general case for arbitrary limit max:

let prob1 max =
  let (//) n k = int_of_float (ceil (float_of_int n /. float_of_int k))
  and sum  n   =
    if n land 1 == 0
    then (n / 2) * (n - 1)
    else n * ((n - 1) / 2)
  3 * sum (max // 3)  +  5 * sum (max // 5)  -  15 * sum (max // 15)

The only complication is avoiding overflow in the arithmetic sum. Of course, this is easy to generalize to an arbitrary-precision version, except maybe for the ceilinged division, which needs adding one if the dividend doesn't evenly divide the divisor.


Naturally Paired Up

It is Cantor's best-known results that there are exactly as many integers as there are fractions, although it seems to be more of the latter than of the former. There is a remarkably beautiful constructive proof of this, and I find no need to repeat the argument. It is somewhat complicated by having to take into account the fact that there is more than one way to write the same rational number q = m/n = (k·m)/(k·n) for any nonzero k. Stripped of this technicality, Cantor's diagonal argument is simple: make a tableau of pairs indexed by rows and columns, and read it by diagonals. This construction is straightforward to implement in a functional language:

let rec diag n =
  if n == 0 then (0, 0) else
  match diag (pred n) with
  | (x, 0) -> (0, succ x)
  | (x, y) -> (succ x, pred y)

This amounts to walking said diagonals and wrapping around upon hitting the edge. While this is not the most efficient diagonalization procedure one could devise, it has the virtue of being amenable to program inversion by simple inspection of the case analysis:

let rec undiag = function
| (0, 0) -> 0
| (0, y) -> succ (undiag (pred y, 0))
| (x, y) -> succ (undiag (pred x, succ y))

It is obvious both that diag is total on N and undiag is total on N×N. To prove that program inversion was correctly carried out, I will show that undiagdiag = id, and diagundiag = id. The base case for the first theorem is trivial:

   undiag (diag 0)
= { Definition }
   undiag (0, 0)
= { Definition }

The corresponding inductive step:

   undiag (diag n)
= { Definition with n ≠ 0 }
   undiag (match diag (pred n) with
   | (x, 0) -> (0, succ x)
   | (x, y) -> (succ x, pred y))
= { Application over a conditional }
   match diag (pred n) with
   | (x, 0) -> undiag (0, succ x)
   | (x, y) -> undiag (succ x, pred y)
= { Definition of undiag, twice }
   match diag (pred n) with
   | (x, 0) -> succ (undiag (pred (succ x), 0))
   | (x, y) -> succ (undiag (pred (succ x), succ (pred y)))
= { Arithmetic, guard on y ≠ 0 }
   match diag (pred n) with
   | (x, 0) -> succ (undiag (x, 0))
   | (x, y) -> succ (undiag (x, y))
= { Identity guards }
   succ (undiag (diag (pred n)))
= { Inductive step }
   succ (pred n)
= { n ≠ 0 }

The base case of the second identity is:

   diag (undiag (0, 0))
= { Definition }
   diag 0
= { Definition }
   (0, 0)

I will show the inductive step by case analysis. If on the first row:

   diag (undiag (0, y))
= { Definition with y ≠ 0 }
   diag (succ (undiag (pred y, 0)))
= { Definition of diag with n ≠ 0 }
   match (diag (pred (succ (undiag (pred y, 0))))) with
   | (x, 0) -> (0, succ x)
   | (x, y) -> (succ x, pred y)
= { Arithmetic }
   match (diag (undiag (pred y, 0))) with
   | (x, 0) -> (0, succ x)
   | (x, y) -> (succ x, pred y)
= { Inductive step }
   match (pred y, 0) with
   | (x, 0) -> (0, succ x)
   | (x, y) -> (succ x, pred y)
= { Guard }
   (0, succ (pred y))
= { y ≠ 0 }
   (0, y)

The second case is for a general pair:

   diag (undiag (x, y))
= { Definition with x ≠ 0 }
   diag (succ (undiag (pred x, succ y)))
= { Definition }
   match (diag (pred (succ (undiag (pred x, succ y))))) with
   | (x, 0) -> (0, succ x)
   | (x, y) -> (succ x, pred y)
= { Arithmetic }
   match (diag (undiag (pred x, succ y))) with
   | (x, 0) -> (0, succ x)
   | (x, y) -> (succ x, pred y)
= { Inductive step }
   match (pred x, succ y) with
   | (x, 0) -> (0, succ x)
   | (x, y) -> (succ x, pred y)
= { Guard }
   (succ (pred x), pred (succ y))
= { x ≠ 0 }
   (x, y)

The corollary is that, indeed, N is isomorphic to N×N, as witnessed by diag and undiag.

Now the latter has a well-known arithmetic expression that doesn't require primitive recursion over the naturals:

let undiag (x, y) =
  let k = x + y in
  let t = k * (k + 1) / 2 in
  t + x

The k-th diagonal contains the k+1 pairs whose components sum to k. This means that the first pair (0, k) on the diagonal has T.k = k·(k+1)/2 pairs preceding it. It is rather tedious to prove that this is in fact equal to the recursive version:

   undiag (0, 0)
   (0 + 0) * (0 + 0 + 1) / 2 + 0


   undiag (0, y)
= { Definition }
   1 + undiag (y - 1, 0)
= { Induction }
   1 + (y - 1) * y / 2 + y - 1
= { Arithmetic }
   y * (y + 1) / 2
   (0 + y) * (0 + y + 1) / 2 + 0

and finally

   undiag (x, y)
= { Definition }
   1 + undiag (x - 1, y + 1)
= { Induction }
   1 + (x - 1 + y + 1) * (x - 1 + y + 1 + 1) / 2 + x - 1
= { Arithmetic }
   (x + y) * (x + y + 1) / 2 + x

In every case, I have aimed at arriving at the expansion of undiag x y = (x + y)·(x + y + 1)/2 + x. Now this is again amenable to inversion, albeit of a different nature:

let diag n =
  let k = truncate (floor (sqrt (0.25 +. 2. *. float n) -. 0.5)) in
  let t = n - k * (k + 1) / 2 in
  (t, k - t)

Regarded as a quadratic equation, k·(k+1)/2 - n = 0 for positive k = √(¼ + 2·n) - ½; the diagonal starts with the pair indexed by ⌊k⌋. Then, the offset is the corresponding remainder. Beyond the evidently correct arithmetic inversion, it is equationally immediate that this version of diag is equivalent to the primitive recursive one:

   diagR = diagA
≡ { undiag is total }
   undiagA ∘ diagR = undiagA ∘ diagA
≡ { undiagA = undiagR }
   undiagR ∘ diagR = id


The slowest regexp engine in the world

Much has been (and still is being) written about the speed and efficiency of regular expression matching. I find this sort of competition a somewhat trivial exercise in optimization. Writing the slowest regexp engine, that's a challenge!

A regular expression is an expression that compactly denotes a regular language. A regular language, in turn, is a set LS of strings over an alphabet S such that the test for membership of a string w in L can be carried out by a finite automaton (from now on I'll imply the alphabet and drop the subscript.)

In my quest for inefficiency, I will denote regular languages by extension. In other words, a regular language L will be exactly the set of the strings belonging to it. This set may be very large, and in fact it is not unexpected for it to be infinite; so I will need lazy lists or sequences:

type 'a node = Nil | Cons of 'a * 'a seq
and  'a seq  = 'a node Lazy.t

The simplest of all regular languages is the empty language:

let nil = lazy Nil

It has no strings. The next simplest language is the one containing just the simplest of strings, the empty one:

let eps = lazy (Cons ("", nil))

Any member c of the alphabet S can be regarded as a string. I can form regular languages from characters, too:

let sym a = lazy (Cons (String.make 1 a, nil))

Now, even though it is a start, I can't do much with this. Given regular languages L and L', I can form the union M = LL', where a string belongs to it if it belongs either to L or to L'.

The standard way to represent sets as lists is to keep them sorted somehow, so that no element appears more than once. Then set union is a so-called natural merge on these lists. I will sort strings not simply lexicographically, since that would put the string "aaa…" formed by an arbitrary number of a's before the string "b", for instance. I'll instead sort shorter strings first, and strings of equal length in lexicographical order:

let shortlex s t =
  Pervasives.compare (String.length s, s) (String.length t, t)

let rec alt r r' =
  let step = function
  | Nil        , r'            -> r'
  | r          , Nil           -> r
  | Cons (h, t), Cons (h', t') ->
    let c = shortlex h h' in
    if c < 0 then Cons (h , alt t r') else
    if c > 0 then Cons (h', alt r t') else
    Cons (h, alt t t')
  in lazy (step (Lazy.force r, Lazy.force r'))

I must be careful in executing the reductions, forcing just as much as needed. Now I can represent a simple language, for instance ε|a, where ε denotes the empty string:

# let r = alt eps (sym 'a') ;;
val r : string seq = <lazy>

Well, regular sets are opaque. I need a way to extract some values from them, for instance out to a list:

let rec take n s =
  if n = 0 then [] else match Lazy.force s with
  | Nil -> []
  | Cons (h, t) -> h :: take (pred n) t

Now, let's see what r contains:

# take 10 r ;;
- : string list = [""; "a"]

As is to be expected. Operating on sequences is straightforward, apart from having to administer laziness carefully. For instance, mapping a function over a sequence is analogous to its list counterpart:

let rec map f s =
  let step = function
  | Nil         -> Nil
  | Cons (h, t) -> Cons (f h, map f t)
  in lazy (step (Lazy.force s))

Given regular languages L and L', I can form the concatenation M = L · L', where a string belongs to it if it can be decomposed in a prefix belonging to L and a suffix belonging to L'. In other words, the strings of M are the concatenation of all the strings of L with those of L'. This Cartesian shuffle of sequences is somewhat tricky. Fortunately, union distributes over concatenation; so that (v :: s)·(w :: t) = vw :: ([vts·(w :: t)):

let rec cat r r' =
  let step = function
  | Nil, _     | _, Nil        -> Nil
  | Cons (h, t), Cons (h', t') ->
    Cons (h ^ h', alt (map ((^) h) t') (cat t r'))
  in lazy (step (Lazy.force r, Lazy.force r'))

This inlines the concatenation of the singleton set [v] directly as a map. Let's test how it works:

# take 10 ( cat (alt (sym 'a') (sym 'b')) (alt eps (sym 'c'))) ;;
- : string list = ["a"; "b"; "ac"; "bc"]

Up to now, there is no way to represent anything other than (very) small languages. Given a regular language L, I can form the reflexive-transitive closure M = L*, where a string belongs to it if it is formed by zero or more concatenations of one of the strings in L. In other words, M is the smallest set M containing the empty string ε and the concatenation of L with M itself:

let rec clo r =
  let rec step = function
  | Nil          -> Cons ("", nil)
  | Cons ("", t) -> step (Lazy.force t)
  | _            -> Cons ("", cat r (clo r))
  in lazy (step (Lazy.force r))

The closure of the null regular language is the empty one. If the regular language being closed already contains the empty string, it must keep it and close over the rest. Otherwise, it contains the empty string and the concatenation of every one of its strings with the closure. Let's try it:

# take 10 (cat (clo (sym 'a')) (sym 'b')) ;;
- : string list =
["b"; "ab"; "aab"; "aaab"; "aaaab"; "aaaaab"; "aaaaaab"; "aaaaaaab";
 "aaaaaaaab"; "aaaaaaaaab"]

It works, and it is clear that the shortlex ordering is instrumental in generating the languages. Now, what I've accomplished up to here is to devise an effective enumeration of the regular language denoted by a regular expression. This incidentally proves constructively that regular languages are recursively enumerable. To match a string against a regular expression amounts to testing it for membership in the regular set denoted by the expression:

let rec matches s r = match Lazy.force r with
| Nil         -> false
| Cons (h, t) ->
  let c = shortlex s h in
  c = 0 || c > 0 && matches s t

I must be careful to use the well-ordering on the regular language; otherwise membership becomes semi-decidable, taking literally forever to look for a string that is simply not there. Now let me be realistic: I'll try to match my e-mail address (not really). I need a way to compactly generate a character set:

let rec set x y =
  if x > y then nil else
  alt (sym x) (set (Char.chr (succ (Char.code x))) y)

Now the set of names is comprised of the concatenation of one or more alphabetic characters (it could be more complete, but that would be unwieldy):

let rep r = cat r (clo r)

let name = rep (set 'a' 'z')

The ten-thousandth name is still four letters long! Now the set of emails is simply:

let email = cat name (cat (sym '@') (cat name (rep (cat (sym '.') name))))

It (sort of) works:

# take 10 email ;;
- : string list =
["a@a.a"; "a@a.b"; "a@a.c"; "a@a.d"; "a@a.e"; "a@a.f"; "a@a.g"; "a@a.h";
 "a@a.i"; "a@a.j"]

I'll leave the test to you.


Refining Pipelines

Trying to emulate a text-processing pipeline in a functional language with the intent of replacing shell-level programming is both natural and appealing. In Haskell, the lazy nature of the language makes this efficient as a bonus, as the streaming nature of Unix pipes is preserved.

Trying to do this naïvely in a strict language like OCaml results in programs that transform the entire contents of the file at each step, consuming unbounded amounts of memory and generating large quantities of garbage. The obvious enhancement is to process text files one line at a time.

I'll start with the line reader I blogged about in December.. The idea of that code was that to iterate over a file's lines, applying a user-supplied function to each one in turn. To give pipelines a more shell-like flavor, I'd like to abstract out the mechanics of opening and closing files, and working instead with filenames.

The higher-order RAII pattern is straightforward in a functional language:

let unwind ~(protect: 'a -> unit) f x =
    let res = f x in
    protect x; res
  with e ->
    protect x; raise e

With this, I can define channel handlers protect the application of a user function to a channel:

let with_input_channel  f = unwind ~protect:close_in  f
and with_output_channel f = unwind ~protect:close_out f

To get it out of the way now, this is the line-writing function I'll need later:

let output_endline out s =
  output_string out s;
  output_char out '\n'

(In contrast to print_endline, this one writes to an output channel.) With this, I can define a version of the line iterator that takes a filename instead of an already-opened channel:

let read fname f = with_input_channel (iter_lines f) (open_in_bin fname)

The type of read is string -> (string -> unit) -> unit. It can be viewed not as an iterator but as a function returning a continuation monad.

Just as I did with read, I would like to abstract away the opening and closing of the output file, and expose an interface that only deals with file names. If the pipeline is to consume its input line by line, it is not immediate how that would work without opening the named file, appending the line to it and closing it afterwards, once for each line. So as a first approximation, write fname would have the shape:

let write fname =
  with_output_channel (fun out -> … ) (open_out_bin fname)

Somehow, lines would be delivered to with_output_channel's argument, to be written by output_endline:

let write fname =
  with_output_channel (fun out -> … output_endline out …)
    (open_out_bin fname)

The only sensible thing to do is to let output_endline out be the continuation of… something that would invoke it for each processed line. Abstracting out that "something":

let write fname f =
  with_output_channel (fun out -> f (output_endline out)) (open_out_bin fname)

But then I can write the argument in point-free style:

let (%) f g x = f (g x)

let write fname f =
  with_output_channel (f % output_endline) (open_out_bin fname)

The type of write is string -> ((string -> unit) -> 'a) -> 'a. This is analogous to the so-called runner in the continuation monad. Now we have that read "foo" has type (string -> unit) -> unit, which unifies with the type of the second argument to write "bar" with 'a equal to unit. This implies that write "bar" (read "foo") really copies files, one line at a time (actually, it normalizes line endings). There is really no need to prove this by equational reasoning; the types are a "free theorem" when interpreted in the continuation monad.

Now, it's clear that the combinator ( >> ) that would allow us to write read "foo" >> write "bar" is simply reverse application in operator form:

let (>>) x f = f x

Now, I would like to interpose some line-processing functions in the pipeline between read and write. For instance, a regular-expression global-replace-and-substitute (a.k.a. sed):

let replace = Str.global_replace % Str.regexp

It really works:

# replace "a+" "amas " "anaconda" ;;
- : string = "amas namas condamas "

Hocus-pocus. Also, the following is useful:

let escapeXML s =
  let b = Buffer.create (String.length s) in
  String.iter (function
  | '<' -> Buffer.add_string b "<"
  | '>' -> Buffer.add_string b ">"
  | '&' -> Buffer.add_string b "&"
  | '"' -> Buffer.add_string b """
  | c ->
    let d = int_of_char c in
    if d < 128 then Buffer.add_char b c else begin
      Buffer.add_string b "&#";
      Buffer.add_string b (string_of_int d);
      Buffer.add_string b ";"
    end) s;
  Buffer.contents b

So, now, how to stack these string -> string line-editors in the input-output pipeline? A general pipeline would look like this:

read "foo" ## led1 ## led2 … ## ledn ## write "bar"

where the ## are the relevant operators. One of these must be our ( >> ) applicator; and if n = 0, it must be either the first or the last. By the types of them, the read continuation looks simpler to compose; so let's build the transformations from the left and let it be applied en bloc to write:

read "foo" ## led1 ## led2 … ## ledn >> write "bar"

The chain of line-editors must stack on top the line-reading continuation; in other words, read "foo" will have to be the bottommost, last continuation applied to write "bar". We need ## to be left-associative; hence, let's call it ( |> ):

read "foo" |> stred1 |> stred2 … |> stredn >> write "bar"

So that ( |> ) takes a reader and a line-editor, and builds a filtered reader. This reader takes a continuation argument with type (string -> unit) -> unit. Tt is the string passed to it that must be filtered, and composition suffices:

let (|>) reader f k = reader (k % f)

On the other hand, a right-associative operator ( @> ) that would let us to write a right-to-left filtered output pipeline:

read "foo" >> stred1 @> stred2 … @> stredn @> write "bar"

is more complicated. First of all, it should take a filter and a writer and return a filtered writer:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = …

The types should guide the refining of the operator. Given what they are, for now, let's make it ignore the filter and return the writer unmodified:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = writer

This writer is applied a writing continuation of type (string -> unit) -> 'a. Abstracting it out:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = fun (k : (string -> unit) -> 'a) -> writer k

This continuation gets passed, in turn, the continuation that actually writes each line out. Abstracting it out:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = 
    fun (k : (string -> unit) -> 'a) -> writer (fun (w : string -> unit) -> k w)

Now we have a context where to apply the filter:

let (@>) (f : string -> string) (writer : ((string -> unit) -> 'a) -> 'a)
  : ((string -> unit) -> 'a) -> 'a = 
    fun (k : (string -> unit) -> 'a) -> writer (fun (w : string -> unit) -> k (w % f))

Eliminating the typing constraints:

let (@>) f writer k = writer (fun w -> k (w % f))

We can now write read "file.xml" >> replace "[<>]" "#" @> write "foo.txt" or read "file.xml" |> replace "[<>]" "#" >> write "foo.txt", whichever is more natural.

Codeless Language Module for OCaml

Since I couldn't find a ready-made syntax highlighting definition to edit OCaml code in BBEdit, I pieced together one from information and examples I found around. So, for the record, here is an OCaml Codeless Language Module that does the job:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
  <string>Objective Caml</string>
  <key>Language Features</key>
    <key>Comment Pattern</key>
(?> \(\* (?s: .*? ) (?> \*\) | \z ) )
    <key>Function Pattern</key>
^[ \t]*
  \b(?: let | and )\b [ \t]+
  (?: \brec\b [ \t]+ )?
    [A-Za-z]['0-9A-Za-z_]* | \([^)]+\)
  (?:[ \t]+ [^ \t=]+)+ [ \t]*
    <key>String Pattern</key>
  (?<!') " (?: [^"\\] |
      \\ (?: ['"ntbr\\] | \d{3} | x[0-9A-Fa-f]{2} )
  (?> " | \z ) ) |
(?> ' (?: [^'\\] | (?P>escape) ) ' )
    <key>Identifier and Keyword Character Class</key>
    <key>Skip Pattern</key>
(?> \(\* (?s: .*? ) (?> \*\) | \z ) )
(?> (?<!') " (?: [^"\\] | \\ (?: ['"ntbr\\] | \d{3} | x[0-9A-Fa-f]{2} ) )* " )

The function pattern could use some work, and the comments pattern doesn't recognize nested comments, but it's a start. I hope you will find it useful.