2007-12-13

Naturally Sorted

Well, Jeff Atwood has weighed in the discussion, with a clear explanation for "the rest of them". Seeing Batchelder's Python code, I realized I didn't do as well as I could with my OCaml version. Specifically, I didn't look hard enough for a simple way to split the string between decimal and non-decimal parts. The idea is not to delimit parts by looking for transitions between one kind and the other, but to use one of the kinds as the delimiter itself:

let split =
  let delim = Str.regexp "[0-9]+" in
    List.map (function Str.Delim s -> s | Str.Text s -> s)
  % Str.full_split delim

As is to be expected with OCaml, the result of full_split is rather odd, in that it uses an ADT to distinguish between text and delimiters. I use a forgetful mapping to coalesce all the parts into a plain string list.

Another point where the comparison could be more discriminating is when comparing numbers with leading zeros. While it is obvious that "010" > "08", it is also to be expected that "010" > "10". The fix for this is to compare first by value, then by length:

let cmp_alnum x y =
  try
    let m, n = int_of_string x, int_of_string y in
    let c = Pervasives.compare m n in
    if c != 0 then c else
    Pervasives.compare (String.length x) (String.length y)
  with Failure _ -> Pervasives.compare x y

There's always opportunity for polish…

2007-12-11

The Alphanumeric Sort

A discussion over reddit about Dave Koelle's "Alphanum Algorithm" to sort "naturally" strings that include numerals. The problem to solve is, in Koelle's words:

For example, in a sorted list of files, "z100.html" is sorted before "z2.html". But obviously, 2 comes before 100!

First, it's clear that it's a comparison function between (rather, a total order on) strings what's at issue here; second, as pointed out in reddit, it's what OS X does in the Finder (I wouldn't swear it, but I think iTunes sorts it like this, too). Martin Pool's Natural String Order comparison is another implementation of this idea, one that is special in that (in his words) [p]erformance is linear: each character of the string is scanned at most once, and only as many characters as necessary to decide are considered. His page also lists a number of alternative implementations, giving priority to Stuart Cheshire.

I'll not aim that high; my implementation is also linear but I won't bother trying to visit each character at most once. The idea is to separate numbers from non-numbers, and compare corresponding numbers according to numeric value and not as strings. In other words, 100 > 20 even when "100" < "20" (as the string comparison stops as soon as the first character '1' on the left compares less than the first character '2' on the right). The first thing needed would then be a way to split a string between numeric and non-numeric components. OCaml regexps are somewhat crude, but a multipass (cue Milla Jovovich's Leeloo Dallas multipass!) approach works: I'll insert a delimiter between a sequence of digits followed by a sequence of non-digits; then I'll insert the same delimiter between a sequence of non-digits followed by a sequence of digits. Finally, I'll split the string on the delimiter. The only problem is that any character is potentially valid in a string; I cop out and use a null byte:

let (%) f g x = f (g x)

let split =
  let re_d_D = Str.regexp "\\([0-9]+\\)\\([^0-9]+\\)"
  and re_D_d = Str.regexp "\\([^0-9]+\\)\\([0-9]+\\)"
  and re_NUL = Str.regexp_string "\000"
  and tp_spl = "\\1\000\\2" in
    Str.split re_NUL
  % Str.global_replace re_D_d tp_spl
  % Str.global_replace re_d_D tp_spl

Next, I need to lift a comparison on elements to a comparison on a list of those elements. There is no standard lexicographical comparison on lists, but the implementation is trivial. As long as elements compare as equal, recur on the rest of the lists until a difference is found or the end of the shortest list is reached, whichever comes first:

let rec cmp_list cmp l m = match l, m with
| [], [] ->  0
| [], _  -> -1
| _, []  ->  1
| a :: l, b :: m ->
  let c = cmp a b in
  if c == 0 then cmp_list cmp l m else c

Finally comes the alphanumeric comparison. The idea is to try first to compare both strings as numbers, and if that isn't possible to fall back on plain string comparison. For that, I take advantage of the fact that int_of_string fails with exception on non-numeric input:

let cmp_alnum x y =
  try Pervasives.compare (int_of_string x) (int_of_string y)
  with Failure _ -> Pervasives.compare x y

Putting all together, I perform a Schwartzian Transform on the list of strings to be sorted, splitting on numbers to give a list of lists. Then, I sort using the standard sort on lists with a comparison function built by lifting the alphanumeric comparison to a lexicographic ordering. Lastly, I concatenate the pieces back into strings:

let sort_alnum =
    List.map (String.concat "")
  % List.sort (cmp_list cmp_alnum)
  % List.map split

To prove my claim of linearity, note that split is (likely—I haven't studied Str's code) linear since the regexes are deterministic, that cmp_list is obviously linear on the size of the lists hence linear on the size of the original string, and that cmp_alnum is linear since conversion of string to numbers is linear just as is lexicographical comparison. Simple, huh?

2007-12-05

Functional Refactoring

A common and simple enough task: count the words present in a text file. For our purposes (and as it is generally agreed), a word is a contiguous sequence of alphabetic characters, delimited by non-alphabetic characters. We should be mindful of edge cases: both the beginning and the end of the file are treated as non-alphabetic characters; in other words, the first and last words of the file, if any, must not be lost.

I found lying around in my code folder (you'd be surprised) the following OCaml code:

let iter_words f inch =
  let ibf = String.create buflen
  and obf = Buffer.create 80
  and any = ref true
  and inw = ref false in
  while !any do
    let nread = unsafe_input inch ibf 0 buflen in
    for i = 0 to pred nread do
      let c = String.unsafe_get ibf i in
      if !inw then begin
        if isalpha c then
          Buffer.add_char obf c
        else begin
          f (Buffer.contents obf);
          Buffer.clear obf;
          inw := false
        end
      end else if isalpha c then begin
        Buffer.add_char obf c;
        inw := true
      end
    done;
    any := nread != 0
  done;
  if !inw then f (Buffer.contents obf)

The first thing to notice is that this is an iterator over the file's words: for every one found, the parameter f is called with its text. What f does with the word is its problem; the advantage of this approach is that you can process files without having to load the entire result on memory; the main disadvantage is that you are forced to process words in sequence, and handle the intermediate state by yourself.

Inside the function, the code is unabashedly imperative: a pair of nested loops process the file, the outer reading a bufferful at a time over ibf (buflen is implied, and defined somewhere else with a suitable value; so is unsafe_input), the inner scanning the buffer a character at a time and accumulating eventual words in obf. Inside the inner loop, a state machine uses inw to track whether it is scanning a word or a non-word, changing according to predicate isalpha (also suitably defined). This state must survive entire buffers, as a word might stride one. A further state variable, any, controls the outer loop termination.

All very straightforward (this code has modification date October 1st, 2006, and I could read it after all this time without problem), but also very, very ugly: this style may be appropriate for C, but it is not especially defensible in OCaml. So, I set out first to replace the condition variable by a functional state machine. The idea is to have a number of recursive functions, each for every state, which, instead of transitioning, returns the next function to call. This is very similar to the object-oriented GoF pattern "State Machine".

The only complication is that in order to express this pattern we need recursive types: a state is the type of functions of a parameter of type, say, α returning a state! In other words, the type is α → β as β. For OCaml to accept this, it needs the -rectypes option; the problem with this extension is that the type-checker accepts a lot of junk along with the valid definitions, making development more fragile. The alternative is to wrap the recursion so that it goes through a type constructor: either an algebraic data-type, or a record type. I choose the latter:

type 'a state = { fn : 'a -> 'a state }

The function signature is the same, and I'll keep for now the input and output buffers:

let iter_words f inch =
  let ibf = String.create buflen
  and obf = Buffer.create 80 in

The first state is "scanning outside a word"; if the next character is alphabetic, it must start a new word and transition, otherwise it keeps looking:

  let
  rec no_word = { fn =
    fun c ->
      if isalpha c then begin
        Buffer.add_char obf c;
        in_word
      end else
        no_word
  }

The second state is "scanning inside a word"; if the next character is alphabetic, it accumulates it and keeps scanning, otherwise it ends the word, sends it to the accumulating function f and transitions:

  and in_word = { fn =
    fun c ->
      if isalpha c then begin
        Buffer.add_char obf c;
        in_word
      end else begin
        let () = f (Buffer.contents obf) in
        Buffer.clear obf;
        no_word
      end
  }
  in

A buffer is scanned character by character (I've renamed the variables formerly known as nread and i). It must, however, conserve the last known state, so that words spanning buffers are not incorrectly split:

  let rec scan pos len state =
    if pos == len then state else
    let c = String.unsafe_get ibf pos in
    scan (succ pos) len (state.fn c)
  in

Note how the state handler returns its successor. The file must then be read a buffer at a time. Again, it must conserve the state across calls to the buffer scanning loop, terminating when there is no more data to read:

  let rec ioread state =
    let nread = unsafe_input inch ibf 0 buflen in
    if nread == 0 then state else
    ioread (scan 0 nread state)
  in

Finally, the entire device is set into motion by reading outside a word (remember, the beginning-of-file is taken as an implied non-word character). If the ending state is in_word, the last word must be taken into account and processed (again, the end-of-file is taken as an implied non-word character):

  if ioread no_word == in_word then f (Buffer.contents obf)

From 26 lines to 38 lines, a modest increase in code length for a great improvement in modularity and (I'd argue) understandability: higher-order functions are a powerful abstraction device. I could, however do better: there are many explicitly recursive functions that beg to be generalized as recursion schemes, or patterns.

The first recursion scheme that should be pulled out is the reading loop. It uses as a free variable the input buffer that it shares with the rest of the code. Abstracting out the work done on each buffer, and accumulating on an explicit parameter:

let fold_in f e inch =
  let ibf = String.create buflen in
  let rec fold e =
    let nread = unsafe_input inch ibf 0 buflen in
    if nread == 0 then e else
    fold (f e ibf nread)
  in fold e

The parameter f will take the the last accumulated value e, and the newly read data in the form of the buffer and the count of valid characters in it; it must return the updated result for the next iteration. This means that I must manage to supply scan with the relevant state. But I will also abstract out the iteration over a string. Unfortunately, the String module doesn't define a fold on strings, much less one on a substring:

let fold_string f e str pos len =
  let rec fold pos e =
    if pos == len then e else
    let c = String.unsafe_get str pos in
    fold (succ pos) (f e c)
  in fold pos e

I submit that both functions are independently useful, especially the last one. The input iteration is just:

  let ioread state =
    fold_in scan state inch

In turn, the string iteration (with parameters suitably rearranged to allow for η-conversion) is:

  let scan state buf len =
    fold_string (fun state -> state.fn) state buf 0 len

Now there is not much point in naming the simple folds; η-converting and inlining:

  let ioread state =
    fold_in (fun state buf -> fold_string (fun state -> state.fn) state buf 0)
      state inch

The state machine doesn't change, and the output buffer can still be kept free ("global", so to speak) in it. The entire code is:

let iter_words f inch =
  let obf = Buffer.create 80 in
  let
  rec no_word = { fn =
    fun c ->
      if isalpha c then begin
        Buffer.add_char obf c;
        in_word
      end else
        no_word
  }
  and in_word = { fn =
    fun c ->
      if isalpha c then begin
        Buffer.add_char obf c;
        in_word
      end else begin
        let () = f (Buffer.contents obf) in
        Buffer.clear obf;
        no_word
      end
  }
  in
  let ioread state =
    fold_in (fun state str -> fold_string (fun state -> state.fn) state str 0)
      state inch
  in
  if ioread no_word == in_word then f (Buffer.contents obf)

The code is, now, 44 lines long, counting the abstracted folds. There is nothing, however, that prevents the state to include both the callback function to the overall word iteration and the output buffer. The result is not as pretty, because all those lifted parameters clutter the function signatures a bit; it has, however the advantage that each function is top-level:

let
rec no_word = { fn =
  fun (f, buf, c) ->
    if isalpha c then begin
      Buffer.add_char buf c;
      in_word
    end else
      no_word
}

and in_word = { fn =
  fun (f, buf, c) ->
    if isalpha c then begin
      Buffer.add_char buf c;
      in_word
    end else begin
      let () = f (Buffer.contents buf) in
      Buffer.clear buf;
      no_word
    end
}

let ioread =
  fold_in (fun st str ->
    fold_string (fun (f, buf, state) c -> (f, buf, state.fn (f, buf, c)))
      st str 0)

let iter_words f inch =
  let (_, buf, state) = ioread (f, Buffer.create 80, no_word) inch in
  if state == in_word then f (Buffer.contents buf)

The code is more compact at 31 lines, thanks to reusing the folds. A tweak that could make the code a bit tidier is to change the type of the states to be transducing instead of just accepting. The result of a transducing state includes the information passed to it updated by the state as needed; this means that, whereas the former code worked because the state (as represented by the Buffer) was mutable, the following is general:

type ('a, 'b) trans = { st : 'a -> 'b -> 'a * ('a, 'b) trans }

let
rec no_word = { st =
  fun (f, buf as s) c ->
    if isalpha c then begin
      Buffer.add_char buf c;
      s, in_word
    end else
      s, no_word
}

and in_word = { st =
  fun (f, buf as s) c ->
    if isalpha c then begin
      Buffer.add_char buf c;
      s, in_word
    end else begin
      let () = f (Buffer.contents buf) in
      Buffer.clear buf;
      s, no_word
    end
}

let ioread =
  fold_in (fun e str ->
    fold_string (fun (t, state) -> state.st t)
      e str 0)

let iter_words f inch =
  let buf = Buffer.create 80 in
  let (_, state) = ioread ((f, buf), no_word) inch in
  if state == in_word then f (Buffer.contents buf)

And this is essentially the 34 lines of code that you would write in Haskell, except that the folds would be provided to you in the corresponding monads. The only imperative bits are the file input and the use of an extensible buffer for performance.

Speaking of which, what is the cost of abstracting out the structure? I tested all five versions with the Project Gutember e-text for Beowulf. I ran the tests seven times, took their mean time, and normalized it to that of the first version. First, bytecode:

Abstraction Cost, bytecode (ocamlopt.opt -inline 10 -unsafe -o ioread.exe unix.cmxa ioread.ml)
VersionSpeed (Rel.)Speed-down
1.0000.0 %
1.2721.3 %
1.3827.5 %
1.8144.8 %
2.2455.4 %

Then, naïve native code:

Abstraction Cost, native code, non-inlined (ocamlopt.opt -o ioread.exe unix.cmxa ioread.ml)
VersionSpeed (Rel.)Speed-down
1.0000.0 %
1.2419.1 %
1.3827.8 %
1.7944.3 %
2.3056.4 %

Lastly, optimized native code:

Abstraction Cost, native code, inlined (ocamlopt.opt -inline 10 -unsafe -o ioread.exe unix.cmxa ioread.ml)
VersionSpeed (Rel.)Speed-down
1.0000.0 %
1.2721.3 %
1.3827.5 %
1.7843.9 %
2.2455.4 %

As you can see, the differences are pretty consistent and independent of the target architecture. Furthermore, the cost of a fully functional, monadic read is more than half more that of the imperative double-loop. The good thing about OCaml is not that it is an efficient functional language, but that it lets you move seamlessly between functional and imperative style of coding, according to your preferences and your needs.

2007-12-01

The Genuine Sieve of Eratosthenes

The Sieve of Eratosthenes is a staple demo of the expressive power of lazy functional languages. It is usually encountered in its tersest Haskell form:

sieve [] = []
sieve (x:xs) = x : sieve [y | y <- xs, y `mod` x /= 0]

primes = sieve [1..]

However, in a feisty paper, Melissa O'Neill clearly shows that this is not the implementation of a sieving process, much less of Eratosthenes's algorithm (see also her rather forceful Haskell Café message). As is well-known, Eratosthenes' Sieve begins with an unmarked "grid" (or rather, a list) of integer numbers, starting from 2. Then, taking in turn the least unmarked number d, it "crosses out", or marks every multiple 2⋅d, 3⋅d…, of d. The unmarked numbers are not multiple of anything else and so, by definition, prime.

As O'Neill explains, in order to replicate this process with a computer program, there must be somehow a record of all "crossed-out" numbers. Her idea is to use a priority queue to readily access the next number to eliminate from the list. The key aspect of using a priority queue is that we can access the least element (according to some total order between them) in constant time. Then, for each prime p found, the queue is updated to record its multiples kp needing crossing out. I'll show an implementation of this idea, using this time object-oriented imperative queues, in OCaml.

Suppose an object q of class queue possessing, at least, an operation q#peek to look at the least element, an operation q#drop to eliminate it, and an operation q#put to add an element to it. In order to test if a number n is prime, we need to see if the queue holds some prime multiple equal to it or not. The queue will store every known prime p together with its least current multiple d as a pair (p, d). When n is tested against d and either found prime or marked, the code eliminates d from the queue and updates it with p's next multiple:

let is_prime q n =
  let rec test pass =
    let (p, d) = q#peek in
    if d > n then begin
      if pass then q#put (n, n + n);
      pass
    end else begin
      q#drop;
      q#put (p, d + p);
      test (d < n)
    end
  in test true

Either n passes the test, that is, is prime, or not. In any case, the queue's elements are inspected in order to update them with their next multiple; if any is equal to n we know from that mark that it is composite; otherwise, it is added as a formerly unknown prime. The type of is_prime is:

val is_prime : < drop : 'a; peek : int * int; put : int * int -> unit; .. > -> int -> bool

that is, as explained above, the only requirements on the queue class is that it has the methods drop, peek, put of the right type. A naïve but suitable implementation using lists is:

class ['a] queue (cmp : 'a -> 'a -> int) = object
  val mutable elts : 'a list = []
  method elements = elts
  method clear = elts <- []
  method is_empty = elts == []
  method peek = match elts with
  | [] -> failwith "peek"
  | x :: _ -> x
  method drop = match elts with
  | [] -> failwith "drop"
  | _ :: xs -> elts <- xs
  method put (a : 'a) =
    elts <- List.merge cmp [a] elts
end

The queue is generic on the type of the arguments, and the constructor takes the comparison function needed to make the elements totally ordered. It keeps them in a list, and its core operation is put, which simply inserts the new element into the existing list of elements while keeping it sorted. The extra methods will come handy later. With this, it is simple to find out all the primes up to a given limit lim:

let sieve q lim =
  let rec loop i =
    if i > lim then [] else
    if is_prime q i
      then i :: loop (succ i)
      else loop (succ i)
  in
  q#clear;
  q#put (2, 4);
  2 :: loop 3

The downside of using an imperative queue is that we must be mindful of state, and make sure that we start from a known point: we have to prime (!) the sieve with a queue containing just the first known candidate, namely two; from there the process is straightforward. So much so that we can easily do better. First of all, note that there is no need whatsoever to keep track of the found primes in a list, as the queue itself already does that for us. Also, O'Neill's program uses a flywheel, a recurring list of increments to skip over known composite numbers. The paper uses this large flywheel to eliminate multiples of 2, 3, 5 and 7; I'll use an infinite (circular) list:

let sieve q lim =
  let rec loop i (h :: t) =
    if i > lim then () else
    let _ = is_prime q i in loop (i + h) t
  in
  let rec flywheel =
    2 :: 4 :: 2 :: 4 :: 6 :: 2 :: 6 :: 4 ::
    2 :: 4 :: 6 :: 6 :: 2 :: 6 :: 4 :: 2 ::
    6 :: 4 :: 6 :: 8 :: 4 :: 2 :: 4 :: 2 ::
    4 :: 8 :: 6 :: 4 :: 6 :: 2 :: 4 :: 6 ::
    2 :: 6 :: 6 :: 4 :: 2 :: 4 :: 6 :: 2 ::
    6 :: 4 :: 2 :: 4 :: 2 ::10 :: 2 ::10 :: flywheel
  in
  q#clear;
  q#put (11, 22);
  loop 13 (List.tl flywheel);
  2 :: 3 :: 5 :: 7 :: List.sort compare (List.map fst q#elements)

This is faster, but only by a constant amount; the time is dominated by the queue operations. Even though removing the minimum is done in constant time (that is, is O(1)), the cost Clist of inserting a new element is linear in the size of the heap, or O(π(n)). This π(n) is the number of primes less than n, which is O(n/log n), and must be done once for every element in the queue, for every integer up to n, for a total cost of n⋅π(n)⋅Clist = O(n³/log² n).

By lowering the cost of an insertion, even by offsetting it with an increased cost per removal we can do asymptotically better. The best imperative implementation of a priority queue is a heap: a totally balanced binary tree with the property that every non-leaf node's children are greater than the parent. I'll use an array to store the nodes, so that the node stored at index i has its children stored at indexes 2⋅i and 2⋅i + 1. Since the binary tree is totally balanced, it has at most ⌊lg n⌋ + 1 levels.

At any instant, the heap condition must be kept; that is, in a heap every parent is always less than or equal to its left child, which in turn is always less than or equal to its sibling. This, given the layout of nodes in the array, means that the minimum element is the root, which is found at the very beginning of the array:

I will use a class with a member array that will be dynamically resized to make room for new elements. Initially, it holds no elements; however, the initial array is constructed by "magic"-ing dummy values of the correct type. The elements of the heap are returned as a list in an arbitrary order:

class ['a] heap (cmp : 'a -> 'a -> int) = object (self)
  val mutable elts : 'a array = Array.make 8 (Obj.magic 0 : 'a)
  val mutable count = 0

  method elements =
    let res = ref [] in
    for i = 0 to count - 1 do res := elts.(i) :: !res done;
    !res

Now clear, is_empty and peek are trivial:

  method clear = count <- 0
  method is_empty = count == 0

  method peek =
    if self#is_empty then failwith "peek";
    elts.(0)

In order to remove the least element, we move the last element in the heap to the root, replacing it. After that, it must "sift down" the tree to its proper resting place; the method siftdown restores the heap condition:

  method drop =
    if self#is_empty then failwith "drop";
    count <- count - 1;
    elts.(0) <- elts.(count);
    self#siftdown

A simple variant avoids duplicating the effort needed to keep the heap condition between a removal of the minimum and the insertion of a new element:

  method replace (a : 'a) =
    if self#is_empty then failwith "replace";
    elts.(0) <- a;
    self#siftdown

In order to insert a new element, the code first checks that there is enough space for it and inserts it as the last node. Then siftup is called to bubble it up the tree until the heap condition is restored:

  method put (a : 'a) =
    self#check;
    elts.(count) <- a;
    count <- count + 1;
    self#siftup

Space is made for a further element by doubling the array and copying its items to the new array:

  method private check =
    if count == Array.length elts then begin
      let sz = 2 * count in
      let ar = Array.make sz elts.(0) in
      Array.blit elts 0 ar 0 count;
      elts <- ar
    end

siftup is the simpler of the two restoring procedures. It repeatedly compares the unsorted child with its parent, exchanging both if they are out of order and recurring:

  method private siftup =
    let rec sift i =
      if i > 1 then begin
        let p = i lsr 1 in
        if cmp elts.(p - 1) elts.(i - 1) > 0 then begin
          let t = elts.(i - 1) in
          elts.(i - 1) <- elts.(p - 1);
          elts.(p - 1) <- t;
          sift p
        end
      end
    in sift count

(this is essentially the iteration for an insertion sort). siftdown is complicated by the fact that it must compare and exchange each node with the greatest of its two children:

  method private siftdown =
    let rec sift i =
      let c = ref (i lsl 1) in
      if !c <= count then begin
        if !c + 1 <= count
        && cmp elts.(!c - 1) elts.(!c) > 0 then incr c;
        if cmp elts.(!c - 1) elts.(i - 1) < 0 then begin
          let t = elts.(i - 1) in
          elts.(i - 1) <- elts.(!c - 1);
          elts.(!c - 1) <- t;
          sift !c
        end
      end
    in sift 1
end

Both siftup and siftdown dominate the complexity of drop and put, respectively. Both must traverse every level of the tree, so they have a cost of Cheap = ⌊lg n⌋ + 1 = O(log n). By using this data structure, the sieving cost drops from O(n³/log² n) to n⋅π(n)⋅Cheap = O(n²).

Given this class heap, the code for sieve remains unchanged; however, I will take advantage of replace in is_prime to avoid a siftup after a drop and a put:

let is_prime q n =
  let rec test pass =
    let (p, d) = q#peek in
    if d > n then begin
      if pass then q#put (n, n + n);
      pass
    end else begin
      q#replace (p, d + p);
      test (d < n)
    end
  in test true

All this, of course, is very far from O'Neill's functional implementation, as the present code is decidedly imperative. In fact, I'm not sure that this performs better than the naïve imperative sieve using an array of booleans (or a bit vector) to record which number is crossed. It has, however, the advantage of using memory proportional to the number π(n) of primes found, not to the range n, and since the heap grows dynamically, can be used to find primes less than an arbitrary limit without the need to have to hard-code it.