## 2007-12-13

### Naturally Sorted

Well, Jeff Atwood has weighed in the discussion, with a clear explanation for "the rest of them". Seeing Batchelder's Python code, I realized I didn't do as well as I could with my OCaml version. Specifically, I didn't look hard enough for a simple way to split the string between decimal and non-decimal parts. The idea is not to delimit parts by looking for transitions between one kind and the other, but to use one of the kinds as the delimiter itself:

```let split =
let delim = Str.regexp "[0-9]+" in
List.map (function Str.Delim s -> s | Str.Text s -> s)
% Str.full_split delim
```

As is to be expected with OCaml, the result of `full_split` is rather odd, in that it uses an ADT to distinguish between text and delimiters. I use a forgetful mapping to coalesce all the parts into a plain string list.

Another point where the comparison could be more discriminating is when comparing numbers with leading zeros. While it is obvious that "010" > "08", it is also to be expected that "010" > "10". The fix for this is to compare first by value, then by length:

```let cmp_alnum x y =
try
let m, n = int_of_string x, int_of_string y in
let c = Pervasives.compare m n in
if c != 0 then c else
Pervasives.compare (String.length x) (String.length y)
with Failure _ -> Pervasives.compare x y
```

There's always opportunity for polish…

## 2007-12-11

### The Alphanumeric Sort

A discussion over reddit about Dave Koelle's "Alphanum Algorithm" to sort "naturally" strings that include numerals. The problem to solve is, in Koelle's words:

For example, in a sorted list of files, "z100.html" is sorted before "z2.html". But obviously, 2 comes before 100!

First, it's clear that it's a comparison function between (rather, a total order on) strings what's at issue here; second, as pointed out in reddit, it's what OS X does in the Finder (I wouldn't swear it, but I think iTunes sorts it like this, too). Martin Pool's Natural String Order comparison is another implementation of this idea, one that is special in that (in his words) [p]erformance is linear: each character of the string is scanned at most once, and only as many characters as necessary to decide are considered. His page also lists a number of alternative implementations, giving priority to Stuart Cheshire.

I'll not aim that high; my implementation is also linear but I won't bother trying to visit each character at most once. The idea is to separate numbers from non-numbers, and compare corresponding numbers according to numeric value and not as strings. In other words, 100 > 20 even when "100" < "20" (as the string comparison stops as soon as the first character '1' on the left compares less than the first character '2' on the right). The first thing needed would then be a way to split a string between numeric and non-numeric components. OCaml regexps are somewhat crude, but a multipass (cue Milla Jovovich's Leeloo Dallas multipass!) approach works: I'll insert a delimiter between a sequence of digits followed by a sequence of non-digits; then I'll insert the same delimiter between a sequence of non-digits followed by a sequence of digits. Finally, I'll split the string on the delimiter. The only problem is that any character is potentially valid in a string; I cop out and use a null byte:

```let (%) f g x = f (g x)

let split =
let re_d_D = Str.regexp "\\([0-9]+\\)\\([^0-9]+\\)"
and re_D_d = Str.regexp "\\([^0-9]+\\)\\([0-9]+\\)"
and re_NUL = Str.regexp_string "\000"
and tp_spl = "\\1\000\\2" in
Str.split re_NUL
% Str.global_replace re_D_d tp_spl
% Str.global_replace re_d_D tp_spl
```

Next, I need to lift a comparison on elements to a comparison on a list of those elements. There is no standard lexicographical comparison on lists, but the implementation is trivial. As long as elements compare as equal, recur on the rest of the lists until a difference is found or the end of the shortest list is reached, whichever comes first:

```let rec cmp_list cmp l m = match l, m with
| [], [] ->  0
| [], _  -> -1
| _, []  ->  1
| a :: l, b :: m ->
let c = cmp a b in
if c == 0 then cmp_list cmp l m else c
```

Finally comes the alphanumeric comparison. The idea is to try first to compare both strings as numbers, and if that isn't possible to fall back on plain string comparison. For that, I take advantage of the fact that `int_of_string` fails with exception on non-numeric input:

```let cmp_alnum x y =
try Pervasives.compare (int_of_string x) (int_of_string y)
with Failure _ -> Pervasives.compare x y
```

Putting all together, I perform a Schwartzian Transform on the list of strings to be sorted, splitting on numbers to give a list of lists. Then, I sort using the standard sort on lists with a comparison function built by lifting the alphanumeric comparison to a lexicographic ordering. Lastly, I concatenate the pieces back into strings:

```let sort_alnum =
List.map (String.concat "")
% List.sort (cmp_list cmp_alnum)
% List.map split
```

To prove my claim of linearity, note that `split` is (likely—I haven't studied `Str`'s code) linear since the regexes are deterministic, that `cmp_list` is obviously linear on the size of the lists hence linear on the size of the original string, and that `cmp_alnum` is linear since conversion of string to numbers is linear just as is lexicographical comparison. Simple, huh?

## 2007-12-05

### Functional Refactoring

A common and simple enough task: count the words present in a text file. For our purposes (and as it is generally agreed), a word is a contiguous sequence of alphabetic characters, delimited by non-alphabetic characters. We should be mindful of edge cases: both the beginning and the end of the file are treated as non-alphabetic characters; in other words, the first and last words of the file, if any, must not be lost.

I found lying around in my code folder (you'd be surprised) the following OCaml code:

```let iter_words f inch =
let ibf = String.create buflen
and obf = Buffer.create 80
and any = ref true
and inw = ref false in
while !any do
let nread = unsafe_input inch ibf 0 buflen in
for i = 0 to pred nread do
let c = String.unsafe_get ibf i in
if !inw then begin
if isalpha c then
else begin
f (Buffer.contents obf);
Buffer.clear obf;
inw := false
end
end else if isalpha c then begin
inw := true
end
done;
done;
if !inw then f (Buffer.contents obf)
```

The first thing to notice is that this is an iterator over the file's words: for every one found, the parameter f is called with its text. What f does with the word is its problem; the advantage of this approach is that you can process files without having to load the entire result on memory; the main disadvantage is that you are forced to process words in sequence, and handle the intermediate state by yourself.

Inside the function, the code is unabashedly imperative: a pair of nested loops process the file, the outer reading a bufferful at a time over ibf (buflen is implied, and defined somewhere else with a suitable value; so is `unsafe_input`), the inner scanning the buffer a character at a time and accumulating eventual words in obf. Inside the inner loop, a state machine uses inw to track whether it is scanning a word or a non-word, changing according to predicate `isalpha` (also suitably defined). This state must survive entire buffers, as a word might stride one. A further state variable, any, controls the outer loop termination.

All very straightforward (this code has modification date October 1st, 2006, and I could read it after all this time without problem), but also very, very ugly: this style may be appropriate for C, but it is not especially defensible in OCaml. So, I set out first to replace the condition variable by a functional state machine. The idea is to have a number of recursive functions, each for every state, which, instead of transitioning, returns the next function to call. This is very similar to the object-oriented GoF pattern "State Machine".

The only complication is that in order to express this pattern we need recursive types: a state is the type of functions of a parameter of type, say, α returning a state! In other words, the type is `α → β as β`. For OCaml to accept this, it needs the -rectypes option; the problem with this extension is that the type-checker accepts a lot of junk along with the valid definitions, making development more fragile. The alternative is to wrap the recursion so that it goes through a type constructor: either an algebraic data-type, or a record type. I choose the latter:

```type 'a state = { fn : 'a -> 'a state }
```

The function signature is the same, and I'll keep for now the input and output buffers:

```let iter_words f inch =
let ibf = String.create buflen
and obf = Buffer.create 80 in
```

The first state is "scanning outside a word"; if the next character is alphabetic, it must start a new word and transition, otherwise it keeps looking:

```  let
rec no_word = { fn =
fun c ->
if isalpha c then begin
in_word
end else
no_word
}
```

The second state is "scanning inside a word"; if the next character is alphabetic, it accumulates it and keeps scanning, otherwise it ends the word, sends it to the accumulating function f and transitions:

```  and in_word = { fn =
fun c ->
if isalpha c then begin
in_word
end else begin
let () = f (Buffer.contents obf) in
Buffer.clear obf;
no_word
end
}
in
```

A buffer is scanned character by character (I've renamed the variables formerly known as nread and i). It must, however, conserve the last known state, so that words spanning buffers are not incorrectly split:

```  let rec scan pos len state =
if pos == len then state else
let c = String.unsafe_get ibf pos in
scan (succ pos) len (state.fn c)
in
```

Note how the state handler returns its successor. The file must then be read a buffer at a time. Again, it must conserve the state across calls to the buffer scanning loop, terminating when there is no more data to read:

```  let rec ioread state =
let nread = unsafe_input inch ibf 0 buflen in
if nread == 0 then state else
in
```

Finally, the entire device is set into motion by reading outside a word (remember, the beginning-of-file is taken as an implied non-word character). If the ending state is `in_word`, the last word must be taken into account and processed (again, the end-of-file is taken as an implied non-word character):

```  if ioread no_word == in_word then f (Buffer.contents obf)
```

From 26 lines to 38 lines, a modest increase in code length for a great improvement in modularity and (I'd argue) understandability: higher-order functions are a powerful abstraction device. I could, however do better: there are many explicitly recursive functions that beg to be generalized as recursion schemes, or patterns.

The first recursion scheme that should be pulled out is the reading loop. It uses as a free variable the input buffer that it shares with the rest of the code. Abstracting out the work done on each buffer, and accumulating on an explicit parameter:

```let fold_in f e inch =
let ibf = String.create buflen in
let rec fold e =
let nread = unsafe_input inch ibf 0 buflen in
if nread == 0 then e else
in fold e
```

The parameter f will take the the last accumulated value e, and the newly read data in the form of the buffer and the count of valid characters in it; it must return the updated result for the next iteration. This means that I must manage to supply `scan` with the relevant state. But I will also abstract out the iteration over a string. Unfortunately, the `String` module doesn't define a fold on strings, much less one on a substring:

```let fold_string f e str pos len =
let rec fold pos e =
if pos == len then e else
let c = String.unsafe_get str pos in
fold (succ pos) (f e c)
in fold pos e
```

I submit that both functions are independently useful, especially the last one. The input iteration is just:

```  let ioread state =
fold_in scan state inch
```

In turn, the string iteration (with parameters suitably rearranged to allow for η-conversion) is:

```  let scan state buf len =
fold_string (fun state -> state.fn) state buf 0 len
```

Now there is not much point in naming the simple folds; η-converting and inlining:

```  let ioread state =
fold_in (fun state buf -> fold_string (fun state -> state.fn) state buf 0)
state inch
```

The state machine doesn't change, and the output buffer can still be kept free ("global", so to speak) in it. The entire code is:

```let iter_words f inch =
let obf = Buffer.create 80 in
let
rec no_word = { fn =
fun c ->
if isalpha c then begin
in_word
end else
no_word
}
and in_word = { fn =
fun c ->
if isalpha c then begin
in_word
end else begin
let () = f (Buffer.contents obf) in
Buffer.clear obf;
no_word
end
}
in
fold_in (fun state str -> fold_string (fun state -> state.fn) state str 0)
state inch
in
if ioread no_word == in_word then f (Buffer.contents obf)
```

The code is, now, 44 lines long, counting the abstracted folds. There is nothing, however, that prevents the state to include both the callback function to the overall word iteration and the output buffer. The result is not as pretty, because all those lifted parameters clutter the function signatures a bit; it has, however the advantage that each function is top-level:

```let
rec no_word = { fn =
fun (f, buf, c) ->
if isalpha c then begin
in_word
end else
no_word
}

and in_word = { fn =
fun (f, buf, c) ->
if isalpha c then begin
in_word
end else begin
let () = f (Buffer.contents buf) in
Buffer.clear buf;
no_word
end
}

fold_in (fun st str ->
fold_string (fun (f, buf, state) c -> (f, buf, state.fn (f, buf, c)))
st str 0)

let iter_words f inch =
let (_, buf, state) = ioread (f, Buffer.create 80, no_word) inch in
if state == in_word then f (Buffer.contents buf)
```

The code is more compact at 31 lines, thanks to reusing the folds. A tweak that could make the code a bit tidier is to change the type of the states to be transducing instead of just accepting. The result of a transducing state includes the information passed to it updated by the state as needed; this means that, whereas the former code worked because the state (as represented by the `Buffer`) was mutable, the following is general:

```type ('a, 'b) trans = { st : 'a -> 'b -> 'a * ('a, 'b) trans }

let
rec no_word = { st =
fun (f, buf as s) c ->
if isalpha c then begin
s, in_word
end else
s, no_word
}

and in_word = { st =
fun (f, buf as s) c ->
if isalpha c then begin
s, in_word
end else begin
let () = f (Buffer.contents buf) in
Buffer.clear buf;
s, no_word
end
}

fold_in (fun e str ->
fold_string (fun (t, state) -> state.st t)
e str 0)

let iter_words f inch =
let buf = Buffer.create 80 in
let (_, state) = ioread ((f, buf), no_word) inch in
if state == in_word then f (Buffer.contents buf)
```

And this is essentially the 34 lines of code that you would write in Haskell, except that the folds would be provided to you in the corresponding monads. The only imperative bits are the file input and the use of an extensible buffer for performance.

Speaking of which, what is the cost of abstracting out the structure? I tested all five versions with the Project Gutember e-text for Beowulf. I ran the tests seven times, took their mean time, and normalized it to that of the first version. First, bytecode:

Abstraction Cost, bytecode (`ocamlopt.opt -inline 10 -unsafe -o ioread.exe unix.cmxa ioread.ml`)
VersionSpeed (Rel.)Speed-down
1.0000.0 %
1.2721.3 %
1.3827.5 %
1.8144.8 %
2.2455.4 %

Then, naïve native code:

Abstraction Cost, native code, non-inlined (`ocamlopt.opt -o ioread.exe unix.cmxa ioread.ml`)
VersionSpeed (Rel.)Speed-down
1.0000.0 %
1.2419.1 %
1.3827.8 %
1.7944.3 %
2.3056.4 %

Lastly, optimized native code:

Abstraction Cost, native code, inlined (`ocamlopt.opt -inline 10 -unsafe -o ioread.exe unix.cmxa ioread.ml`)
VersionSpeed (Rel.)Speed-down
1.0000.0 %
1.2721.3 %
1.3827.5 %
1.7843.9 %
2.2455.4 %

As you can see, the differences are pretty consistent and independent of the target architecture. Furthermore, the cost of a fully functional, monadic read is more than half more that of the imperative double-loop. The good thing about OCaml is not that it is an efficient functional language, but that it lets you move seamlessly between functional and imperative style of coding, according to your preferences and your needs.

## 2007-12-01

### The Genuine Sieve of Eratosthenes

The Sieve of Eratosthenes is a staple demo of the expressive power of lazy functional languages. It is usually encountered in its tersest Haskell form:

```sieve [] = []
sieve (x:xs) = x : sieve [y | y <- xs, y `mod` x /= 0]

primes = sieve [1..]
```

However, in a feisty paper, Melissa O'Neill clearly shows that this is not the implementation of a sieving process, much less of Eratosthenes's algorithm (see also her rather forceful Haskell Café message). As is well-known, Eratosthenes' Sieve begins with an unmarked "grid" (or rather, a list) of integer numbers, starting from 2. Then, taking in turn the least unmarked number d, it "crosses out", or marks every multiple 2⋅d, 3⋅d…, of d. The unmarked numbers are not multiple of anything else and so, by definition, prime.

As O'Neill explains, in order to replicate this process with a computer program, there must be somehow a record of all "crossed-out" numbers. Her idea is to use a priority queue to readily access the next number to eliminate from the list. The key aspect of using a priority queue is that we can access the least element (according to some total order between them) in constant time. Then, for each prime p found, the queue is updated to record its multiples kp needing crossing out. I'll show an implementation of this idea, using this time object-oriented imperative queues, in OCaml.

Suppose an object q of class `queue` possessing, at least, an operation `q#peek` to look at the least element, an operation `q#drop` to eliminate it, and an operation `q#put` to add an element to it. In order to test if a number n is prime, we need to see if the queue holds some prime multiple equal to it or not. The queue will store every known prime p together with its least current multiple d as a pair `(p, d)`. When n is tested against d and either found prime or marked, the code eliminates d from the queue and updates it with p's next multiple:

```let is_prime q n =
let rec test pass =
let (p, d) = q#peek in
if d > n then begin
if pass then q#put (n, n + n);
pass
end else begin
q#drop;
q#put (p, d + p);
test (d < n)
end
in test true
```

Either n passes the test, that is, is prime, or not. In any case, the queue's elements are inspected in order to update them with their next multiple; if any is equal to n we know from that mark that it is composite; otherwise, it is added as a formerly unknown prime. The type of `is_prime` is:

```val is_prime : < drop : 'a; peek : int * int; put : int * int -> unit; .. > -> int -> bool
```

that is, as explained above, the only requirements on the queue class is that it has the methods `drop`, `peek`, `put` of the right type. A naïve but suitable implementation using lists is:

```class ['a] queue (cmp : 'a -> 'a -> int) = object
val mutable elts : 'a list = []
method elements = elts
method clear = elts <- []
method is_empty = elts == []
method peek = match elts with
| [] -> failwith "peek"
| x :: _ -> x
method drop = match elts with
| [] -> failwith "drop"
| _ :: xs -> elts <- xs
method put (a : 'a) =
elts <- List.merge cmp [a] elts
end
```

The queue is generic on the type of the arguments, and the constructor takes the comparison function needed to make the elements totally ordered. It keeps them in a list, and its core operation is `put`, which simply inserts the new element into the existing list of elements while keeping it sorted. The extra methods will come handy later. With this, it is simple to find out all the primes up to a given limit lim:

```let sieve q lim =
let rec loop i =
if i > lim then [] else
if is_prime q i
then i :: loop (succ i)
else loop (succ i)
in
q#clear;
q#put (2, 4);
2 :: loop 3
```

The downside of using an imperative queue is that we must be mindful of state, and make sure that we start from a known point: we have to prime (!) the sieve with a queue containing just the first known candidate, namely two; from there the process is straightforward. So much so that we can easily do better. First of all, note that there is no need whatsoever to keep track of the found primes in a list, as the queue itself already does that for us. Also, O'Neill's program uses a flywheel, a recurring list of increments to skip over known composite numbers. The paper uses this large flywheel to eliminate multiples of 2, 3, 5 and 7; I'll use an infinite (circular) list:

```let sieve q lim =
let rec loop i (h :: t) =
if i > lim then () else
let _ = is_prime q i in loop (i + h) t
in
let rec flywheel =
2 :: 4 :: 2 :: 4 :: 6 :: 2 :: 6 :: 4 ::
2 :: 4 :: 6 :: 6 :: 2 :: 6 :: 4 :: 2 ::
6 :: 4 :: 6 :: 8 :: 4 :: 2 :: 4 :: 2 ::
4 :: 8 :: 6 :: 4 :: 6 :: 2 :: 4 :: 6 ::
2 :: 6 :: 6 :: 4 :: 2 :: 4 :: 6 :: 2 ::
6 :: 4 :: 2 :: 4 :: 2 ::10 :: 2 ::10 :: flywheel
in
q#clear;
q#put (11, 22);
loop 13 (List.tl flywheel);
2 :: 3 :: 5 :: 7 :: List.sort compare (List.map fst q#elements)
```

This is faster, but only by a constant amount; the time is dominated by the queue operations. Even though removing the minimum is done in constant time (that is, is O(1)), the cost C`list` of inserting a new element is linear in the size of the heap, or O(π(n)). This π(n) is the number of primes less than n, which is O(n/log n), and must be done once for every element in the queue, for every integer up to n, for a total cost of n⋅π(n)⋅C`list` = O(n³/log² n).

By lowering the cost of an insertion, even by offsetting it with an increased cost per removal we can do asymptotically better. The best imperative implementation of a priority queue is a heap: a totally balanced binary tree with the property that every non-leaf node's children are greater than the parent. I'll use an array to store the nodes, so that the node stored at index i has its children stored at indexes 2⋅i and 2⋅i + 1. Since the binary tree is totally balanced, it has at most ⌊lg n⌋ + 1 levels.

At any instant, the heap condition must be kept; that is, in a heap every parent is always less than or equal to its left child, which in turn is always less than or equal to its sibling. This, given the layout of nodes in the array, means that the minimum element is the root, which is found at the very beginning of the array:

I will use a class with a member array that will be dynamically resized to make room for new elements. Initially, it holds no elements; however, the initial array is constructed by "magic"-ing dummy values of the correct type. The `elements` of the heap are returned as a list in an arbitrary order:

```class ['a] heap (cmp : 'a -> 'a -> int) = object (self)
val mutable elts : 'a array = Array.make 8 (Obj.magic 0 : 'a)
val mutable count = 0

method elements =
let res = ref [] in
for i = 0 to count - 1 do res := elts.(i) :: !res done;
!res
```

Now `clear`, `is_empty` and `peek` are trivial:

```  method clear = count <- 0
method is_empty = count == 0

method peek =
if self#is_empty then failwith "peek";
elts.(0)
```

In order to remove the least element, we move the last element in the heap to the root, replacing it. After that, it must "sift down" the tree to its proper resting place; the method `siftdown` restores the heap condition:

```  method drop =
if self#is_empty then failwith "drop";
count <- count - 1;
elts.(0) <- elts.(count);
self#siftdown
```

A simple variant avoids duplicating the effort needed to keep the heap condition between a removal of the minimum and the insertion of a new element:

```  method replace (a : 'a) =
if self#is_empty then failwith "replace";
elts.(0) <- a;
self#siftdown
```

In order to insert a new element, the code first checks that there is enough space for it and inserts it as the last node. Then `siftup` is called to bubble it up the tree until the heap condition is restored:

```  method put (a : 'a) =
self#check;
elts.(count) <- a;
count <- count + 1;
self#siftup
```

Space is made for a further element by doubling the array and copying its items to the new array:

```  method private check =
if count == Array.length elts then begin
let sz = 2 * count in
let ar = Array.make sz elts.(0) in
Array.blit elts 0 ar 0 count;
elts <- ar
end
```

`siftup` is the simpler of the two restoring procedures. It repeatedly compares the unsorted child with its parent, exchanging both if they are out of order and recurring:

```  method private siftup =
let rec sift i =
if i > 1 then begin
let p = i lsr 1 in
if cmp elts.(p - 1) elts.(i - 1) > 0 then begin
let t = elts.(i - 1) in
elts.(i - 1) <- elts.(p - 1);
elts.(p - 1) <- t;
sift p
end
end
in sift count
```

(this is essentially the iteration for an insertion sort). `siftdown` is complicated by the fact that it must compare and exchange each node with the greatest of its two children:

```  method private siftdown =
let rec sift i =
let c = ref (i lsl 1) in
if !c <= count then begin
if !c + 1 <= count
&& cmp elts.(!c - 1) elts.(!c) > 0 then incr c;
if cmp elts.(!c - 1) elts.(i - 1) < 0 then begin
let t = elts.(i - 1) in
elts.(i - 1) <- elts.(!c - 1);
elts.(!c - 1) <- t;
sift !c
end
end
in sift 1
end
```

Both `siftup` and `siftdown` dominate the complexity of `drop` and `put`, respectively. Both must traverse every level of the tree, so they have a cost of C`heap` = ⌊lg n⌋ + 1 = O(log n). By using this data structure, the sieving cost drops from O(n³/log² n) to n⋅π(n)⋅C`heap` = O(n²).

Given this class `heap`, the code for `sieve` remains unchanged; however, I will take advantage of `replace` in `is_prime` to avoid a `siftup` after a `drop` and a `put`:

```let is_prime q n =
let rec test pass =
let (p, d) = q#peek in
if d > n then begin
if pass then q#put (n, n + n);
pass
end else begin
q#replace (p, d + p);
test (d < n)
end
in test true
```

All this, of course, is very far from O'Neill's functional implementation, as the present code is decidedly imperative. In fact, I'm not sure that this performs better than the naïve imperative sieve using an array of `bool`eans (or a bit vector) to record which number is crossed. It has, however, the advantage of using memory proportional to the number π(n) of primes found, not to the range n, and since the heap grows dynamically, can be used to find primes less than an arbitrary limit without the need to have to hard-code it.