Happy Valentine's! The latest Programming Praxis calls for an implementation of the prime Sieve of Atkins. This sieve saves a bunch of operations with a preprocessing phase in which only square-free candidates are retained for latter sieving. For this to be efficient it is necessary to quickly count solutions to certain binary quadratic forms k·x² ± y² = n. Dan Berstein has published an elegant algorithm to quickly generate the solutions to such types of polynomial sums. It crucially requires a priority queue to ensure that the solutions are generated in order and that none is left out. The purpose of this post is to show an implementation of an imperative priority queue using a binary heap.
A binary heap is a sorted data structure over an ordered set (S, ≤): it is a complete binary tree such that every node is less than or equal to either children. The first property is usually called the shape property and amounts to requiring that every level h except possibly the last has exactly 2h nodes; the second property is called the heap property. The shape property makes it convenient (and usual) to represent binary heaps with N elements as arrays of size N: a node with index 1 ≤ j ≤ N has children with indices 2·j and 2·j + 1, in that order. The heap property is maintained by two operations siftup
and siftdown
that bubble up or down elements not necessarily in order into their proper position.
Since a heap is a dynamic structure, I will need a dynamic array or vector to back it up. The following signature will be sufficient for now; I will give the implementation later:
module Vec : sig type 'a t val make : unit -> 'a t val length : 'a t -> int val get : 'a t -> int -> 'a val set : 'a t -> int -> 'a -> unit val push : 'a t -> 'a -> unit val pop : 'a t -> 'a end
The intended semantics of a vector is that of normal arrays with the addition that its length can only change via a sequence of push
and pop
operations, subject to the following laws:
length (make ())
≡ 0pop (make ())
≡ ⊥push v x; pop v
≡ xpush v x; pop v; v
≡ vpush v (pop v); v
≡ v, iflength v
> 0push v x; get v (length v - 1)
≡ x
Given Vec
as a building block, a binary heap implementing a priority queue has the following signature:
module Heap : sig type 'a t val make : ('a -> 'a -> int) -> 'a t val is_empty : 'a t -> bool val min : 'a t -> 'a val replacemin : 'a t -> 'a -> unit val removemin : 'a t -> unit val insert : 'a t -> 'a -> unit end (* … *)
Heaps are fully polymorphic since when made they are associated with a comparison function:
end = struct type 'a t = { compare : 'a -> 'a -> int; heap : 'a Vec.t; } let make compare = { compare = compare; heap = Vec.make (); }
A new heap is backed by an empty vector. The first crucial operation is siftup
: it bubbles up the last element (the one with index length h.heap - 1
), comparing and swapping it with its parent until the heap property is restored, namely, it becomes less than or equal than any of the children it has encountered on its voyage up the tree:
open Vec let siftup h = let rec go cmp v e i = let p = (i + 1) / 2 - 1 in if p >= 0 && cmp (get v p) e > 0 then begin set v i (get v p); go cmp v e p end else set v i e in let n = length h.heap in go h.compare h.heap (get h.heap (n - 1)) (n - 1)
Note that the parent of a child with index 1 ≤ i < N has index ⌊i/2⌋, but since vectors are 0-based it is necessary to adjust for negative underflow while using integer truncating division (with the usual machine-integer semantics, -1 / 0 == 0
which is not correct). Siftup
keeps comparing the node e at index i being inserted with its parent at index p: if it exists and is greater than e they get swapped and the process is repeated until it becomes the new root or it finds a parent with a smaller value. Since the tree is complete, it has exactly ⌊lg N⌋ + 1 levels, which is the number of iterations of this procedure. This iteration is implemented as a tail-recursive lambda-lifted function go
for efficiency. The complementary siftdown
is completely symmetric to it:
let siftdown h = let rec go cmp v n e i = let c = let l = i * 2 + 1 in let r = l + 1 in if r < n && cmp (get v l) (get v r) > 0 then r else l in if c < n && cmp (get v c) e < 0 then begin set v i (get v c); go cmp v n e c end else set v i e in go h.compare h.heap (length h.heap) (get h.heap 0) 0
Instead of bubbling the last element up the tree, siftdown
bubbles the first element down. It compares node e at index i with the smaller c of their children 2·i and 2·i + 1 (again making allowances for the 0-based indexing), if they exist. If it does and is less than e, they are swapped and the process repeated until it becomes the last leaf or it finds a pair of children with greater values. By the same analysis than before, this can happen at most ⌊lg N⌋ + 1 times. Again, the iteration is implemented as a tail-recursive lambda-lifted function go
. With this done, the priority queue is implemented with a few more lines.
A heap is empty if and only if the underlying vector is:
let is_empty h = length h.heap = 0
By the heap property, the minimum element is the root of the heap, if it exists:
let min h = if is_empty h then raise Not_found; get h.heap 0
A low-level efficient operation is to replace the minimum element by another, maintaining the heap property:
let replacemin h x = if is_empty h then raise Not_found; set h.heap 0 x; siftdown h
In order to remove the minimum element another one must be found in its place. This decreases the length of the heap by one, which means that the easiest-to-access candidate is pop h.heap
. I don't bother with a guard as by law 2 above pop
is partial; however, I must be careful not to try to set the last element into an empty array. This element is placed at the root and sifted down as with replacemin
:
let removemin h = let x = pop h.heap in if not (is_empty h) then begin set h.heap 0 x; siftdown h end
Finally, the dual operation of inserting a new element into a heap boils down to placing it at the end (cf law 6 above) and restoring the heap property by bubbling it up:
let insert h x = push h.heap x; siftup h end
It should be obvious that replacemin h x
≡ insert h x; removemin h
where the heap property is kept in suspense by eliminating the intervening siftup h
. It only remains to complete the implementation of Vec
. This is a completely straightforward amortized O(1) extensible array via doubling in expand
. The only hack I use is a sprinkling of Obj.magic
to minimize space leaks:
module Vec : sig (* … *) end = struct type 'a t = { mutable cnt : int; mutable len : int; mutable arr : 'a array } let make () = let len = 16 in { cnt = 0; len = len; arr = Array.make len (Obj.magic 0); } let length a = a.cnt let expand a = assert (a.len = a.cnt); a.len <- 2 * a.len; let arr = Array.make a.len (Obj.magic 0) in Array.blit a.arr 0 arr 0 a.cnt; a.arr <- arr; assert (a.cnt < a.len) let get a i = if not (0 <= i && i < a.cnt) then failwith "get"; Array.unsafe_get a.arr i let set a i x = if not (0 <= i && i < a.cnt) then failwith "set"; Array.unsafe_set a.arr i x let push a x = if a.len = a.cnt then expand a; Array.unsafe_set a.arr a.cnt x; a.cnt <- a.cnt + 1 let pop a = if a.cnt = 0 then raise Not_found; a.cnt <- a.cnt - 1; let x = Array.unsafe_get a.arr a.cnt in Array.unsafe_set a.arr a.cnt (Obj.magic 0); x end
With this, Bernstein's algorithm admits a straightforward implementation. Since it stores triples (y, a, b) where y = p(a) + q(b) for polynomials p and q I need to define a lexicographical ordering on integer triples:
let cint : int -> int -> int = Pervasives.compare let c3int (p, m, b) (q, n, c) = let cmp = cint p q in if cmp <> 0 then cmp else let cmp = cint m n in if cmp <> 0 then cmp else cint b c
Now the algorithm takes an enumerating function f to be called with each element of the triple, up to a limit n:
let bernstein_sum p q (f : int -> int -> int -> unit) n = let tab = Array.init (n + 1) (fun a -> (p a, a)) in Array.sort (fun (p, _) (q, _) -> cint p q) tab; let pa, _ = tab.(0) in let h = Heap.make c3int in for b = 0 to n do Heap.insert h (pa + q b, 0, b) done; while not (Heap.is_empty h) do let (y, i, b) = Heap.min h in let (pa, a) = tab.(i) in if i < n then Heap.replacemin h (fst tab.(i + 1) - pa + y, i + 1, b) else Heap.removemin h; f a b y done
As Bernstein himself notes, for sufficiently simple p and q the table of values of p(a) can be dispensed with completely. In any case, the following example nicely exercises the algorithm:
# let sq i = i*i in bernstein_sum sq sq (Printf.printf "%2d^2 + %2d^2 = %d\n") 10 ;;
3 comments:
One should be very careful when using Obj.magic with arrays, as arrays of floats are not represented the same way as other arrays (floats are unboxed). Your code probably works with floats, tough. But it is very easy to get weird results. For instance:
# let a = Array.make 1 (Obj.magic 0);;
val a : '_a array = [||]
# a.(0) <- 1.;;
Exception: Invalid_argument "index out of bounds".
If you want a dynamic imperative heap then no need for Vec, you can just implement an imperative Braun Heap.
type 'a heap =
| E
| N of 'a node
and 'a node =
{mutable l: 'a heap; mutable e: 'a; mutable r: 'a heap}
let min {e=m} = m
let rec insert x h =
match h with
| E ->
N {l=E;e=x;r=E}
| N n ->
let nr = n.r in
n.r <- n.l;
if compare n.e x < 0 then n.l <- insert x nr
else (n.l <- insert n.e nr; n.e <- x);
h
let rec deletemin h =
match h with
| {l=E;r=t} | {r=E;l=t} ->
t
| {l=N l;e=e;r=N r} ->
if l.e < r.e then (h.e <- l.e; h.l <- deletemin l)
else (h.e <- r.e; h.r <- deletemin r);
N h
@Jérôme,
you bring up an excellent point. The code for Vec does notwork with floats as it is (as there is no way in OCaml to do a type case). The only type-safe alternative is to functorize the interface, or to supply the creation function with a dummy element to force a monomorphic type.
@Damien,
cool data structure, I didn't know it.
Post a Comment