2010-02-14

Heap Up (the Bernstein Sums)

Happy Valentine's! The latest Programming Praxis calls for an implementation of the prime Sieve of Atkins. This sieve saves a bunch of operations with a preprocessing phase in which only square-free candidates are retained for latter sieving. For this to be efficient it is necessary to quickly count solutions to certain binary quadratic forms k·x² ± y² = n. Dan Berstein has published an elegant algorithm to quickly generate the solutions to such types of polynomial sums. It crucially requires a priority queue to ensure that the solutions are generated in order and that none is left out. The purpose of this post is to show an implementation of an imperative priority queue using a binary heap.

A binary heap is a sorted data structure over an ordered set (S, ≤): it is a complete binary tree such that every node is less than or equal to either children. The first property is usually called the shape property and amounts to requiring that every level h except possibly the last has exactly 2h nodes; the second property is called the heap property. The shape property makes it convenient (and usual) to represent binary heaps with N elements as arrays of size N: a node with index 1 ≤ jN has children with indices 2·j and 2·j + 1, in that order. The heap property is maintained by two operations siftup and siftdown that bubble up or down elements not necessarily in order into their proper position.

Since a heap is a dynamic structure, I will need a dynamic array or vector to back it up. The following signature will be sufficient for now; I will give the implementation later:

module Vec : sig
  type 'a t
  val make   : unit -> 'a t
  val length : 'a t -> int
  val get    : 'a t -> int -> 'a
  val set    : 'a t -> int -> 'a -> unit
  val push   : 'a t -> 'a -> unit
  val pop    : 'a t -> 'a
end

The intended semantics of a vector is that of normal arrays with the addition that its length can only change via a sequence of push and pop operations, subject to the following laws:

  1. length (make ()) ≡ 0
  2. pop (make ()) ≡ ⊥
  3. push v x; pop vx
  4. push v x; pop v; vv
  5. push v (pop v); vv, if length v > 0
  6. push v x; get v (length v - 1)x

Given Vec as a building block, a binary heap implementing a priority queue has the following signature:

module Heap : sig
  type 'a t
  val make       : ('a -> 'a -> int) -> 'a t
  val is_empty   : 'a t -> bool
  val min        : 'a t -> 'a
  val replacemin : 'a t -> 'a -> unit
  val removemin  : 'a t -> unit
  val insert     : 'a t -> 'a -> unit
end (* … *)

Heaps are fully polymorphic since when made they are associated with a comparison function:

end = struct
  type 'a t = { compare : 'a -> 'a -> int; heap : 'a Vec.t; }

  let make compare = { compare = compare; heap = Vec.make (); }

A new heap is backed by an empty vector. The first crucial operation is siftup: it bubbles up the last element (the one with index length h.heap - 1), comparing and swapping it with its parent until the heap property is restored, namely, it becomes less than or equal than any of the children it has encountered on its voyage up the tree:

  open Vec

  let siftup h =
    let rec go cmp v e i =
      let p = (i + 1) / 2 - 1 in
      if p >= 0 && cmp (get v p) e > 0 then begin
        set v i (get v p);
        go cmp v e p
      end else set v i e
    in
    let n = length h.heap in
    go h.compare h.heap (get h.heap (n - 1)) (n - 1)

Note that the parent of a child with index 1 ≤ i < N has index ⌊i/2⌋, but since vectors are 0-based it is necessary to adjust for negative underflow while using integer truncating division (with the usual machine-integer semantics, -1 / 0 == 0 which is not correct). Siftup keeps comparing the node e at index i being inserted with its parent at index p: if it exists and is greater than e they get swapped and the process is repeated until it becomes the new root or it finds a parent with a smaller value. Since the tree is complete, it has exactly ⌊lg N⌋ + 1 levels, which is the number of iterations of this procedure. This iteration is implemented as a tail-recursive lambda-lifted function go for efficiency. The complementary siftdown is completely symmetric to it:

  let siftdown h =
    let rec go cmp v n e i =
      let c =
        let l = i * 2 + 1 in
        let r = l + 1 in
        if r < n && cmp (get v l) (get v r) > 0 then r else l
      in
      if c < n && cmp (get v c) e < 0 then begin
        set v i (get v c);
        go cmp v n e c
      end else set v i e
    in go h.compare h.heap (length h.heap) (get h.heap 0) 0

Instead of bubbling the last element up the tree, siftdown bubbles the first element down. It compares node e at index i with the smaller c of their children 2·i and 2·i + 1 (again making allowances for the 0-based indexing), if they exist. If it does and is less than e, they are swapped and the process repeated until it becomes the last leaf or it finds a pair of children with greater values. By the same analysis than before, this can happen at most ⌊lg N⌋ + 1 times. Again, the iteration is implemented as a tail-recursive lambda-lifted function go. With this done, the priority queue is implemented with a few more lines.

A heap is empty if and only if the underlying vector is:

  let is_empty h = length h.heap = 0

By the heap property, the minimum element is the root of the heap, if it exists:

  let min h =
    if is_empty h then raise Not_found;
    get h.heap 0

A low-level efficient operation is to replace the minimum element by another, maintaining the heap property:

  let replacemin h x =
    if is_empty h then raise Not_found;
    set h.heap 0 x; siftdown h

In order to remove the minimum element another one must be found in its place. This decreases the length of the heap by one, which means that the easiest-to-access candidate is pop h.heap. I don't bother with a guard as by law 2 above pop is partial; however, I must be careful not to try to set the last element into an empty array. This element is placed at the root and sifted down as with replacemin:

  let removemin h =
    let x = pop h.heap in
    if not (is_empty h) then begin
      set h.heap 0 x; siftdown h
    end

Finally, the dual operation of inserting a new element into a heap boils down to placing it at the end (cf law 6 above) and restoring the heap property by bubbling it up:

  let insert h x =
    push h.heap x; siftup h
end

It should be obvious that replacemin h xinsert h x; removemin h where the heap property is kept in suspense by eliminating the intervening siftup h. It only remains to complete the implementation of Vec. This is a completely straightforward amortized O(1) extensible array via doubling in expand. The only hack I use is a sprinkling of Obj.magic to minimize space leaks:

module Vec : sig (* … *) end = struct
  type 'a t = { mutable cnt : int; mutable len : int; mutable arr : 'a array }

  let make () = let len = 16 in
    { cnt = 0; len = len; arr = Array.make len (Obj.magic 0); }

  let length a = a.cnt

  let expand a =
    assert (a.len = a.cnt);
    a.len <- 2 * a.len;
    let arr = Array.make a.len (Obj.magic 0) in
    Array.blit a.arr 0 arr 0 a.cnt;
    a.arr <- arr;
    assert (a.cnt < a.len)

  let get a i =
    if not (0 <= i && i < a.cnt) then failwith "get";
    Array.unsafe_get a.arr i

  let set a i x =
    if not (0 <= i && i < a.cnt) then failwith "set";
    Array.unsafe_set a.arr i x

  let push a x =
    if a.len = a.cnt then expand a;
    Array.unsafe_set a.arr a.cnt x;
    a.cnt <- a.cnt + 1

  let pop a =
    if a.cnt = 0 then raise Not_found;
    a.cnt <- a.cnt - 1;
    let x = Array.unsafe_get a.arr a.cnt in
    Array.unsafe_set a.arr a.cnt (Obj.magic 0);
    x
end

With this, Bernstein's algorithm admits a straightforward implementation. Since it stores triples (y, a, b) where y = p(a) + q(b) for polynomials p and q I need to define a lexicographical ordering on integer triples:

let cint : int -> int -> int = Pervasives.compare

let c3int (p, m, b) (q, n, c) =
  let cmp = cint p q in
  if cmp <> 0 then cmp else
  let cmp = cint m n in
  if cmp <> 0 then cmp else
  cint b c

Now the algorithm takes an enumerating function f to be called with each element of the triple, up to a limit n:

let bernstein_sum p q (f : int -> int -> int -> unit) n =
  let tab = Array.init (n + 1) (fun a -> (p a, a)) in
  Array.sort (fun (p, _) (q, _) -> cint p q) tab;
  let pa, _ = tab.(0) in
  let h = Heap.make c3int in
  for b = 0 to n do
    Heap.insert h (pa + q b, 0, b)
  done;
  while not (Heap.is_empty h) do
    let (y, i, b) = Heap.min h in
    let (pa, a) = tab.(i) in
    if i < n
      then Heap.replacemin h (fst tab.(i + 1) - pa + y, i + 1, b)
      else Heap.removemin h;
    f a b y
  done

As Bernstein himself notes, for sufficiently simple p and q the table of values of p(a) can be dispensed with completely. In any case, the following example nicely exercises the algorithm:

# let sq i = i*i in bernstein_sum sq sq (Printf.printf "%2d^2 + %2d^2 = %d\n") 10 ;;

3 comments:

Jérôme said...

One should be very careful when using Obj.magic with arrays, as arrays of floats are not represented the same way as other arrays (floats are unboxed). Your code probably works with floats, tough. But it is very easy to get weird results. For instance:

# let a = Array.make 1 (Obj.magic 0);;

val a : '_a array = [||]

# a.(0) <- 1.;;

Exception: Invalid_argument "index out of bounds".

Damien Guichard said...

If you want a dynamic imperative heap then no need for Vec, you can just implement an imperative Braun Heap.

type 'a heap =
| E
| N of 'a node
and 'a node =
{mutable l: 'a heap; mutable e: 'a; mutable r: 'a heap}

let min {e=m} = m

let rec insert x h =
match h with
| E ->
N {l=E;e=x;r=E}
| N n ->
let nr = n.r in
n.r <- n.l;
if compare n.e x < 0 then n.l <- insert x nr
else (n.l <- insert n.e nr; n.e <- x);
h

let rec deletemin h =
match h with
| {l=E;r=t} | {r=E;l=t} ->
t
| {l=N l;e=e;r=N r} ->
if l.e < r.e then (h.e <- l.e; h.l <- deletemin l)
else (h.e <- r.e; h.r <- deletemin r);
N h

Matías Giovannini said...

@Jérôme,
you bring up an excellent point. The code for Vec does notwork with floats as it is (as there is no way in OCaml to do a type case). The only type-safe alternative is to functorize the interface, or to supply the creation function with a dummy element to force a monomorphic type.

@Damien,
cool data structure, I didn't know it.