## 2010-02-14

### Heap Up (the Bernstein Sums)

Happy Valentine's! The latest Programming Praxis calls for an implementation of the prime Sieve of Atkins. This sieve saves a bunch of operations with a preprocessing phase in which only square-free candidates are retained for latter sieving. For this to be efficient it is necessary to quickly count solutions to certain binary quadratic forms k·x² ± y² = n. Dan Berstein has published an elegant algorithm to quickly generate the solutions to such types of polynomial sums. It crucially requires a priority queue to ensure that the solutions are generated in order and that none is left out. The purpose of this post is to show an implementation of an imperative priority queue using a binary heap.

A binary heap is a sorted data structure over an ordered set (S, ≤): it is a complete binary tree such that every node is less than or equal to either children. The first property is usually called the shape property and amounts to requiring that every level h except possibly the last has exactly 2h nodes; the second property is called the heap property. The shape property makes it convenient (and usual) to represent binary heaps with N elements as arrays of size N: a node with index 1 ≤ jN has children with indices 2·j and 2·j + 1, in that order. The heap property is maintained by two operations `siftup` and `siftdown` that bubble up or down elements not necessarily in order into their proper position.

Since a heap is a dynamic structure, I will need a dynamic array or vector to back it up. The following signature will be sufficient for now; I will give the implementation later:

```module Vec : sig
type 'a t
val make   : unit -> 'a t
val length : 'a t -> int
val get    : 'a t -> int -> 'a
val set    : 'a t -> int -> 'a -> unit
val push   : 'a t -> 'a -> unit
val pop    : 'a t -> 'a
end
```

The intended semantics of a vector is that of normal arrays with the addition that its length can only change via a sequence of `push` and `pop` operations, subject to the following laws:

1. `length (make ())` ≡ 0
2. `pop (make ())` ≡ ⊥
3. `push v x; pop v`x
4. `push v x; pop v; v`v
5. `push v (pop v); v`v, if `length v` > 0
6. `push v x; get v (length v - 1)`x

Given `Vec` as a building block, a binary heap implementing a priority queue has the following signature:

```module Heap : sig
type 'a t
val make       : ('a -> 'a -> int) -> 'a t
val is_empty   : 'a t -> bool
val min        : 'a t -> 'a
val replacemin : 'a t -> 'a -> unit
val removemin  : 'a t -> unit
val insert     : 'a t -> 'a -> unit
end (* … *)
```

Heaps are fully polymorphic since when made they are associated with a comparison function:

```end = struct
type 'a t = { compare : 'a -> 'a -> int; heap : 'a Vec.t; }

let make compare = { compare = compare; heap = Vec.make (); }
```

A new heap is backed by an empty vector. The first crucial operation is `siftup`: it bubbles up the last element (the one with index `length h.heap - 1`), comparing and swapping it with its parent until the heap property is restored, namely, it becomes less than or equal than any of the children it has encountered on its voyage up the tree:

```  open Vec

let siftup h =
let rec go cmp v e i =
let p = (i + 1) / 2 - 1 in
if p >= 0 && cmp (get v p) e > 0 then begin
set v i (get v p);
go cmp v e p
end else set v i e
in
let n = length h.heap in
go h.compare h.heap (get h.heap (n - 1)) (n - 1)
```

Note that the parent of a child with index 1 ≤ i < N has index ⌊i/2⌋, but since vectors are 0-based it is necessary to adjust for negative underflow while using integer truncating division (with the usual machine-integer semantics, `-1 / 0 == 0` which is not correct). `Siftup` keeps comparing the node e at index i being inserted with its parent at index p: if it exists and is greater than e they get swapped and the process is repeated until it becomes the new root or it finds a parent with a smaller value. Since the tree is complete, it has exactly ⌊lg N⌋ + 1 levels, which is the number of iterations of this procedure. This iteration is implemented as a tail-recursive lambda-lifted function `go` for efficiency. The complementary `siftdown` is completely symmetric to it:

```  let siftdown h =
let rec go cmp v n e i =
let c =
let l = i * 2 + 1 in
let r = l + 1 in
if r < n && cmp (get v l) (get v r) > 0 then r else l
in
if c < n && cmp (get v c) e < 0 then begin
set v i (get v c);
go cmp v n e c
end else set v i e
in go h.compare h.heap (length h.heap) (get h.heap 0) 0
```

Instead of bubbling the last element up the tree, `siftdown` bubbles the first element down. It compares node e at index i with the smaller c of their children 2·i and 2·i + 1 (again making allowances for the 0-based indexing), if they exist. If it does and is less than e, they are swapped and the process repeated until it becomes the last leaf or it finds a pair of children with greater values. By the same analysis than before, this can happen at most ⌊lg N⌋ + 1 times. Again, the iteration is implemented as a tail-recursive lambda-lifted function `go`. With this done, the priority queue is implemented with a few more lines.

A heap is empty if and only if the underlying vector is:

```  let is_empty h = length h.heap = 0
```

By the heap property, the minimum element is the root of the heap, if it exists:

```  let min h =
if is_empty h then raise Not_found;
get h.heap 0
```

A low-level efficient operation is to replace the minimum element by another, maintaining the heap property:

```  let replacemin h x =
if is_empty h then raise Not_found;
set h.heap 0 x; siftdown h
```

In order to remove the minimum element another one must be found in its place. This decreases the length of the heap by one, which means that the easiest-to-access candidate is `pop h.heap`. I don't bother with a guard as by law 2 above `pop` is partial; however, I must be careful not to try to set the last element into an empty array. This element is placed at the root and sifted down as with `replacemin`:

```  let removemin h =
let x = pop h.heap in
if not (is_empty h) then begin
set h.heap 0 x; siftdown h
end
```

Finally, the dual operation of inserting a new element into a heap boils down to placing it at the end (cf law 6 above) and restoring the heap property by bubbling it up:

```  let insert h x =
push h.heap x; siftup h
end
```

It should be obvious that `replacemin h x``insert h x; removemin h` where the heap property is kept in suspense by eliminating the intervening `siftup h`. It only remains to complete the implementation of `Vec`. This is a completely straightforward amortized O(1) extensible array via doubling in `expand`. The only hack I use is a sprinkling of `Obj.magic` to minimize space leaks:

```module Vec : sig (* … *) end = struct
type 'a t = { mutable cnt : int; mutable len : int; mutable arr : 'a array }

let make () = let len = 16 in
{ cnt = 0; len = len; arr = Array.make len (Obj.magic 0); }

let length a = a.cnt

let expand a =
assert (a.len = a.cnt);
a.len <- 2 * a.len;
let arr = Array.make a.len (Obj.magic 0) in
Array.blit a.arr 0 arr 0 a.cnt;
a.arr <- arr;
assert (a.cnt < a.len)

let get a i =
if not (0 <= i && i < a.cnt) then failwith "get";
Array.unsafe_get a.arr i

let set a i x =
if not (0 <= i && i < a.cnt) then failwith "set";
Array.unsafe_set a.arr i x

let push a x =
if a.len = a.cnt then expand a;
Array.unsafe_set a.arr a.cnt x;
a.cnt <- a.cnt + 1

let pop a =
if a.cnt = 0 then raise Not_found;
a.cnt <- a.cnt - 1;
let x = Array.unsafe_get a.arr a.cnt in
Array.unsafe_set a.arr a.cnt (Obj.magic 0);
x
end
```

With this, Bernstein's algorithm admits a straightforward implementation. Since it stores triples (y, a, b) where y = p(a) + q(b) for polynomials p and q I need to define a lexicographical ordering on integer triples:

```let cint : int -> int -> int = Pervasives.compare

let c3int (p, m, b) (q, n, c) =
let cmp = cint p q in
if cmp <> 0 then cmp else
let cmp = cint m n in
if cmp <> 0 then cmp else
cint b c
```

Now the algorithm takes an enumerating function f to be called with each element of the triple, up to a limit n:

```let bernstein_sum p q (f : int -> int -> int -> unit) n =
let tab = Array.init (n + 1) (fun a -> (p a, a)) in
Array.sort (fun (p, _) (q, _) -> cint p q) tab;
let pa, _ = tab.(0) in
let h = Heap.make c3int in
for b = 0 to n do
Heap.insert h (pa + q b, 0, b)
done;
while not (Heap.is_empty h) do
let (y, i, b) = Heap.min h in
let (pa, a) = tab.(i) in
if i < n
then Heap.replacemin h (fst tab.(i + 1) - pa + y, i + 1, b)
else Heap.removemin h;
f a b y
done
```

As Bernstein himself notes, for sufficiently simple p and q the table of values of p(a) can be dispensed with completely. In any case, the following example nicely exercises the algorithm:

```# let sq i = i*i in bernstein_sum sq sq (Printf.printf "%2d^2 + %2d^2 = %d\n") 10 ;;
```

Jérôme said...

One should be very careful when using Obj.magic with arrays, as arrays of floats are not represented the same way as other arrays (floats are unboxed). Your code probably works with floats, tough. But it is very easy to get weird results. For instance:

# let a = Array.make 1 (Obj.magic 0);;

val a : '_a array = [||]

# a.(0) <- 1.;;

Exception: Invalid_argument "index out of bounds".

Damien Guichard said...

If you want a dynamic imperative heap then no need for Vec, you can just implement an imperative Braun Heap.

type 'a heap =
| E
| N of 'a node
and 'a node =
{mutable l: 'a heap; mutable e: 'a; mutable r: 'a heap}

let min {e=m} = m

let rec insert x h =
match h with
| E ->
N {l=E;e=x;r=E}
| N n ->
let nr = n.r in
n.r <- n.l;
if compare n.e x < 0 then n.l <- insert x nr
else (n.l <- insert n.e nr; n.e <- x);
h

let rec deletemin h =
match h with
| {l=E;r=t} | {r=E;l=t} ->
t
| {l=N l;e=e;r=N r} ->
if l.e < r.e then (h.e <- l.e; h.l <- deletemin l)
else (h.e <- r.e; h.r <- deletemin r);
N h

Matías Giovannini said...

@Jérôme,
you bring up an excellent point. The code for Vec does notwork with floats as it is (as there is no way in OCaml to do a type case). The only type-safe alternative is to functorize the interface, or to supply the creation function with a dummy element to force a monomorphic type.

@Damien,
cool data structure, I didn't know it.