activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r806921 - in /activemq/sandbox/activemq-flow/activemq-util/src: main/java/org/apache/activemq/util/HashRing.java main/java/org/apache/activemq/util/Hasher.java test/java/org/apache/activemq/util/HashRingTest.java
Date Sun, 23 Aug 2009 02:23:32 GMT
Author: chirino
Date: Sun Aug 23 02:23:31 2009
New Revision: 806921

URL: http://svn.apache.org/viewvc?rev=806921&view=rev
Log:
Adding a HashRing class that could come in handy to implement distributed partitioning.


Added:
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/Hasher.java
    activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/HashRingTest.java

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java?rev=806921&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java
Sun Aug 23 02:23:31 2009
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * A hash ring is used to map resources to a to a set of nodes.
+ * This hash ring implements
+ * <a href="http://www8.org/w8-papers/2a-webserver/caching/paper2.html#chash1">
+ * Consistent Hashing</a> and therefore adding a removing nodes minimally
+ * changes how resources are map to the nodes.  
+ * <p/>
+ * This implementation also allows you to apply non-uniform node weighting.  This
+ * feature is usefull when you want to allocate more resources to some nodes and
+ * fewer to others.
+ * <p/>
+ * The default weight of node is 200.  The weight of a node determins how many
+ * points on the hash ring the node is alocated. Higher node weights increases
+ * the uniform distribution of resources.
+ * <p/>
+ * Note that the order that nodes are added to the ring impact how resources
+ * map to the nodes due to node hash collisions.
+ *
+ */
+public class HashRing<Node, Resource> {
+
+    public static int DEFAULT_WEIGHT = 200;
+
+    /**
+     * The Hasher.ToStringHasher implemenation is as the hasher when none is specifed in
the constructor.
+     */
+    public static Hasher TO_STRING_HASHER = new Hasher.ToStringHasher();
+
+    private static class Wrapper<N> {
+        private N node;
+        private int weight;
+
+        public Wrapper(N node, int weight) {
+            this.node = node;
+            this.weight = weight;
+        }
+
+        @Override
+        public String toString() {
+            return "Wrapper{" + "node=" + node + ", weight=" + weight + '}';
+        }
+    }
+
+    private final Hasher hasher;
+    private final TreeMap<Integer, Wrapper<Node>> ring = new TreeMap<Integer,
Wrapper<Node>>();
+    private final LinkedHashMap<Node, Wrapper<Node>> nodes = new LinkedHashMap<Node,
Wrapper<Node>>();
+
+    /**
+     * Constructs a <tt>HashRing</tt> which uses the OBJECT_HASHER to hash the
nodes and values.
+     *
+     */
+    public HashRing() {
+        this(TO_STRING_HASHER);
+    }
+
+    /**
+     * Constructs a <tt>HashRing</tt> with the specified
+     * hasher.
+     *
+     * @param hasher
+     */
+    public HashRing(Hasher hasher) {
+        this.hasher = hasher;
+    }
+
+    /**
+     * Adds all the specified nodes to the <tt>HashRing</tt> using the default
+     * weight of 200 for each node.
+     *
+     * @param nodes the nodes to add
+     */
+    public void addAll(Iterable<Node> nodes) {
+        for (Node node : nodes) {
+            add(node);
+        }
+    }
+
+    /**
+     * Adds all the specified nodes to the <tt>HashRing</tt> using the default
+     * weight of 200 for each node.
+     *
+     * @param nodes the nodes to add
+     */
+    public void add(Node... nodes) {
+        addAll(Arrays.asList(nodes));
+    }
+
+    /**
+     * Adds a node to the <tt>HashRing</tt> using the default
+     * weight of 200 for the node.
+     *
+     * @param node the node to add
+     */
+    public void add(Node node) {
+        add(node, DEFAULT_WEIGHT);
+    }
+
+    /**
+     * Adds a node to the <tt>HashRing</tt> using the specified weight.
+     *
+     * @param node the node to add
+     * @param weight the number of hash replicas to create the node in the <tt>HashRing</tt>
+     * @throws IllegalArgumentException if the weight is less than 1
+     */
+    public void add(Node node, int weight) {
+        if( weight < 1 ) {
+            throw new IllegalArgumentException("weight must be 1 or greater");
+        }
+
+        Wrapper<Node> wrapper = new Wrapper<Node>(node, weight);
+        nodes.put(node, wrapper);
+        for (int i = 0; i < wrapper.weight; i++) {
+            ring.put(hasher.hashNode(node, i), wrapper);
+        }
+    }
+
+    /**
+     * Removes a previously added node from the <tt>HashRing</tt>
+     *
+     * @param node the node to remove
+     * @return true if the node was previously added
+     */
+    public boolean remove(Node node) {
+        Wrapper<Node> wrapper  = nodes.remove(node);
+        if( wrapper == null ) {
+            return false;
+        }
+
+        // We HAVE to re-hash the ring to keep it consistent since
+        // nodes hashes may collide and last node added takes over the
+        // the previously added node.  Order matters.
+        ring.clear();
+        for (Wrapper<Node> w : nodes.values()) {
+            for (int i = 0; i < w.weight; i++) {
+                ring.put(hasher.hashNode(w.node, i), w);
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Removes all previously added nodes.
+     */
+    public void clear() {
+        ring.clear();
+        nodes.clear();
+    }
+
+    /**
+     * @return all the previously added nodes.
+     */
+    List<Node> getNodes() {
+        return new ArrayList(nodes.keySet());
+    }
+
+    /**
+     * Maps a resource value to a node.
+     *
+     * @param resource the resource to map
+     * @return the Node that the resource maps to or null if the <tt>HashRing</tt>
is empty.
+     */
+    public Node get(Resource resource) {
+        Map.Entry<Integer, Wrapper<Node>> entry = getFirstEntry(resource);
+        if (entry==null) {
+            return null;
+        }
+        return entry.getValue().node;
+    }
+
+    /**
+     * Maps a resource value to an interator to the nodes in the <tt>HashRing</tt>
+     * starting at the Node which resource maps to.
+     *
+     * Note that duplicate node objects may be returned.  This is because
+     *
+     *
+     *
+     * @param resource the resource to map
+     * @return a Iterator
+     */
+    public Iterator<Node> iterator(Resource resource) {
+        final Map.Entry<Integer, Wrapper<Node>> first = getFirstEntry(resource);
+
+        return new Iterator<Node>() {
+            Map.Entry<Integer, Wrapper<Node>> removealCandidate;
+            Map.Entry<Integer, Wrapper<Node>> last;
+            Map.Entry<Integer, Wrapper<Node>> next = first;
+
+            public boolean hasNext() {
+                // We might allready know the next entry..
+                if( next != null )
+                    return true;
+
+                // Since we use last to figure out the next..
+                if( last==null )
+                    return false;
+
+                // Figure out the enxt entry...
+                Map.Entry<Integer, Wrapper<Node>> next = ring.upperEntry(last.getKey());
+                if( next == null ) {
+                    next = ring.firstEntry();
+                }
+
+                // We don't need last anymore..
+                last = null;
+
+                // But the next entry might circle back to the first...
+                if( next.getKey()==first.getKey() ) {
+                    next = null;
+                }
+                return next!=null;
+            }
+
+            public Node next() {
+                if( !hasNext() ) {
+                    throw new NoSuchElementException();
+                }
+                removealCandidate = last = next;
+                next = null;
+                return last.getValue().node;
+            }
+
+            public void remove() {
+                if( removealCandidate ==null ) {
+                    throw new IllegalStateException();
+                }
+                HashRing.this.remove(last.getValue().node);
+                removealCandidate =null;
+            }
+        };
+    }
+
+    private Map.Entry<Integer, Wrapper<Node>> getFirstEntry(Resource resource)
{
+        if (ring.isEmpty()) {
+            return null;
+        }
+        int hash = hasher.hashKey(resource);
+        Map.Entry<Integer, Wrapper<Node>> entry = ring.ceilingEntry(hash);
+        if( entry == null ) {
+            entry = ring.firstEntry();
+        }
+        return entry;
+    }
+
+
+}

Added: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/Hasher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/Hasher.java?rev=806921&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/Hasher.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/Hasher.java
Sun Aug 23 02:23:31 2009
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import org.apache.activemq.util.marshaller.Marshaller;
+import org.apache.activemq.util.marshaller.VariableMarshaller;
+import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
+import org.apache.activemq.util.buffer.Buffer;
+
+import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+import java.util.zip.Adler32;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+/**
+ * Implementing this interface allows an object to customize
+ * how hash values are computed for Node objects and Key objects.
+ *
+ * @param <N>
+ * @param <K>
+ */
+public interface Hasher<N, K> {
+
+    int hashNode(N node, int i);
+
+    int hashKey(K value);
+
+    /**
+     * This Hasher implementation works with any type of object by using
+     * the Object.hashCode() method of the Node and Keys objects
+     * to compute the hash.  The node object MUST implement a toString()
+     * method which returns a unique id for the Node.
+     */
+    public class Native implements Hasher {
+        /**
+         * @param node
+         * @param i
+         * @return (node.toString()+":"+i).hashCode();
+         */
+        public int hashNode(Object node, int i) {
+            return (node.toString() + ":" + i).hashCode();
+        }
+
+        /**
+         * @param value
+         * @return value.hashCode();
+         */
+        public int hashKey(Object value) {
+            return value.hashCode();
+        }
+    }
+
+
+    /**
+     * A Hasher implemenation which first convert the Node and Key to
+     * byte arrays before calculating the hash using a HashAlgorithim.
+     *
+     * @param <N>
+     * @param <K>
+     */
+    public class BinaryHasher<N, K> implements Hasher<N, K> {
+        private final Marshaller<N> nodeMarshaller;
+        private final Marshaller<K> keyMarshaller;
+        private final HashAlgorithim hashAlgorithim;
+
+        public BinaryHasher(Marshaller<N> nodeMarshaller, Marshaller<K> keyMarshaller,
HashAlgorithim hashAlgorithim) {
+            this.nodeMarshaller = nodeMarshaller;
+            this.keyMarshaller = keyMarshaller;
+            this.hashAlgorithim = hashAlgorithim;
+        }
+
+        public int hashNode(N node, int i) {
+            try {
+                DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+                nodeMarshaller.writePayload(node, os);
+                os.write(':');
+                os.writeInt(i);
+                return hash(os.toBuffer());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public int hashKey(K value) {
+            try {
+                DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+                keyMarshaller.writePayload(value, os);
+                return hash(os.toBuffer());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public int hash(Buffer buffer) {
+            return hashAlgorithim.hash(buffer.data, buffer.length);
+        }
+    }
+
+    /**
+     * Used to calculate the hash of a binary buffer.
+     */
+    public interface HashAlgorithim {
+        int hash(byte[] data, int len);
+    }
+
+    /**
+     * A HashAlgorithim instance which use a MessageDigest
+     * algorithim to compute the hash value.
+     */
+    public class MessageDigestFactory implements HashAlgorithim {
+        String algorithim;
+
+        public MessageDigestFactory(String algorithim) {
+            this.algorithim = algorithim;
+        }
+
+        public int hash(byte[] data, int len) {
+            try {
+                MessageDigest md = MessageDigest.getInstance(algorithim);
+                md.update(data, 0, len);
+                byte[] digest = md.digest();
+                // Return the high bytes bytes of the digest as an int
+                return (int)
+                        ((digest[0] & 0xFF) << 24)
+                        | ((digest[1] & 0xFF) << 16)
+                        | ((digest[2] & 0xFF) << 8)
+                        | (digest[3] & 0xFF);
+            } catch (NoSuchAlgorithmException e) {
+                throw new RuntimeException(algorithim + " not supported", e);
+            }
+        }
+    }
+
+    public static HashAlgorithim MD5 = new MessageDigestFactory("MD5");
+
+    long INTEGER_MASK = 0xFFFFFFFFL;
+
+    /**
+     * A HashAlgorithim instance which uses the Fowler/Noll/Vo FNV-1a hash
+     * algorithim to compute the hash value.
+     * <p/>
+     * see: http://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function
+     */
+    public static HashAlgorithim FNV1A = new HashAlgorithim() {
+        private static final long INIT = 0xcbf29ce484222325L;
+        private static final long PRIME = 1099511628211L;
+
+        public int hash(byte[] b, int len) {
+            long value = INIT;
+            for (int i = 0; i < len; i++) {
+                value ^= b[i];
+                value *= PRIME;
+            }
+            return (int)((value >> 16) & INTEGER_MASK);
+        }
+    };
+
+    /**
+     * A HashAlgorithim instance which uses the Murmur hash algorithim
+     * to compute the hash value.
+     * <p/>
+     * lifted from: https://svn.apache.org/repos/asf/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MurmurHash.java
+     * <p/>
+     *
+     * This is a very fast, non-cryptographic hash suitable for general hash-based
+     * lookup.  See http://murmurhash.googlepages.com/ for more details.
+     *
+     * <p>The C version of MurmurHash 2.0 found at that site was ported
+     * to Java by Andrzej Bialecki (ab at getopt org).</p>
+     */
+    public static HashAlgorithim MURMUR = new HashAlgorithim() {
+        private static final int seed = 0xcbf29ce4;
+        public int hash(byte[] data, int length) {
+            int m = 0x5bd1e995;
+            int r = 24;
+
+            int h = seed ^ length;
+
+            int len_4 = length >> 2;
+
+            for (int i = 0; i < len_4; i++) {
+                int i_4 = i << 2;
+                int k = data[i_4 + 3];
+                k = k << 8;
+                k = k | (data[i_4 + 2] & 0xff);
+                k = k << 8;
+                k = k | (data[i_4 + 1] & 0xff);
+                k = k << 8;
+                k = k | (data[i_4 + 0] & 0xff);
+                k *= m;
+                k ^= k >>> r;
+                k *= m;
+                h *= m;
+                h ^= k;
+            }
+
+            // avoid calculating modulo
+            int len_m = len_4 << 2;
+            int left = length - len_m;
+
+            if (left != 0) {
+                if (left >= 3) {
+                    h ^= data[length - 3] << 16;
+                }
+                if (left >= 2) {
+                    h ^= data[length - 2] << 8;
+                }
+                if (left >= 1) {
+                    h ^= data[length - 1];
+                }
+
+                h *= m;
+            }
+
+            h ^= h >>> 13;
+            h *= m;
+            h ^= h >>> 15;
+
+            return h;
+        }
+    };
+
+
+    /**
+     * A HashAlgorithim instance which uses the Jenkins hash algorithim
+     * to compute the hash value.
+     * <p/>
+     * lifted from: https://svn.apache.org/repos/asf/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/MurmurHash.java
+     * <p/>
+     *
+     * <pre>lookup3.c, by Bob Jenkins, May 2006, Public Domain.
+     *
+     * You can use this free for any purpose.  It's in the public domain.
+     * It has no warranty.
+     * </pre>
+     *
+     * @see <a href="http://burtleburtle.net/bob/c/lookup3.c">lookup3.c</a>
+     * @see <a href="http://www.ddj.com/184410284">Hash Functions (and how this
+     * function compares to others such as CRC, MD?, etc</a>
+     * @see <a href="http://burtleburtle.net/bob/hash/doobs.html">Has update on the
+     * Dr. Dobbs Article</a>
+     */
+    public static HashAlgorithim JENKINS = new HashAlgorithim() {
+        private static final long INIT = 0xcbf29ce484222325L;
+        private final long BYTE_MASK = 0x00000000000000ffL;
+
+        private long rot(long val, int pos) {
+          return ((Integer.rotateLeft( (int)(val & INTEGER_MASK), pos)) & INTEGER_MASK);
+        }
+
+        public int hash(byte[] key, int nbytes) {
+          int length = nbytes;
+          long a, b, c;       // We use longs because we don't have unsigned ints
+          a = b = c = (0x00000000deadbeefL + length + INIT) & INTEGER_MASK;
+          int offset = 0;
+          for (; length > 12; offset += 12, length -= 12) {
+
+            a = (a + (key[offset + 0]    & BYTE_MASK)) & INTEGER_MASK;
+            a = (a + (((key[offset + 1]  & BYTE_MASK) <<  8) & INTEGER_MASK))
& INTEGER_MASK;
+            a = (a + (((key[offset + 2]  & BYTE_MASK) << 16) & INTEGER_MASK))
& INTEGER_MASK;
+            a = (a + (((key[offset + 3]  & BYTE_MASK) << 24) & INTEGER_MASK))
& INTEGER_MASK;
+            b = (b + (key[offset + 4]    & BYTE_MASK)) & INTEGER_MASK;
+            b = (b + (((key[offset + 5]  & BYTE_MASK) <<  8) & INTEGER_MASK))
& INTEGER_MASK;
+            b = (b + (((key[offset + 6]  & BYTE_MASK) << 16) & INTEGER_MASK))
& INTEGER_MASK;
+            b = (b + (((key[offset + 7]  & BYTE_MASK) << 24) & INTEGER_MASK))
& INTEGER_MASK;
+            c = (c + (key[offset + 8]    & BYTE_MASK)) & INTEGER_MASK;
+            c = (c + (((key[offset + 9]  & BYTE_MASK) <<  8) & INTEGER_MASK))
& INTEGER_MASK;
+            c = (c + (((key[offset + 10] & BYTE_MASK) << 16) & INTEGER_MASK))
& INTEGER_MASK;
+            c = (c + (((key[offset + 11] & BYTE_MASK) << 24) & INTEGER_MASK))
& INTEGER_MASK;
+
+            a = (a - c) & INTEGER_MASK;  a ^= rot(c, 4);  c = (c + b) & INTEGER_MASK;
+            b = (b - a) & INTEGER_MASK;  b ^= rot(a, 6);  a = (a + c) & INTEGER_MASK;
+            c = (c - b) & INTEGER_MASK;  c ^= rot(b, 8);  b = (b + a) & INTEGER_MASK;
+            a = (a - c) & INTEGER_MASK;  a ^= rot(c,16);  c = (c + b) & INTEGER_MASK;
+            b = (b - a) & INTEGER_MASK;  b ^= rot(a,19);  a = (a + c) & INTEGER_MASK;
+            c = (c - b) & INTEGER_MASK;  c ^= rot(b, 4);  b = (b + a) & INTEGER_MASK;
+          }
+
+          //-------------------------------- last block: affect all 32 bits of (c)
+          switch (length) {                   // all the case statements fall through
+          case 12:
+            c = (c + (((key[offset + 11] & BYTE_MASK) << 24) & INTEGER_MASK))
& INTEGER_MASK;
+          case 11:
+            c = (c + (((key[offset + 10] & BYTE_MASK) << 16) & INTEGER_MASK))
& INTEGER_MASK;
+          case 10:
+            c = (c + (((key[offset + 9]  & BYTE_MASK) <<  8) & INTEGER_MASK))
& INTEGER_MASK;
+          case  9:
+            c = (c + (key[offset + 8]    & BYTE_MASK)) & INTEGER_MASK;
+          case  8:
+            b = (b + (((key[offset + 7]  & BYTE_MASK) << 24) & INTEGER_MASK))
& INTEGER_MASK;
+          case  7:
+            b = (b + (((key[offset + 6]  & BYTE_MASK) << 16) & INTEGER_MASK))
& INTEGER_MASK;
+          case  6:
+            b = (b + (((key[offset + 5]  & BYTE_MASK) <<  8) & INTEGER_MASK))
& INTEGER_MASK;
+          case  5:
+            b = (b + (key[offset + 4]    & BYTE_MASK)) & INTEGER_MASK;
+          case  4:
+            a = (a + (((key[offset + 3]  & BYTE_MASK) << 24) & INTEGER_MASK))
& INTEGER_MASK;
+          case  3:
+            a = (a + (((key[offset + 2]  & BYTE_MASK) << 16) & INTEGER_MASK))
& INTEGER_MASK;
+          case  2:
+            a = (a + (((key[offset + 1]  & BYTE_MASK) <<  8) & INTEGER_MASK))
& INTEGER_MASK;
+          case  1:
+            a = (a + (key[offset + 0]    & BYTE_MASK)) & INTEGER_MASK;
+            break;
+          case  0:
+            return (int)(c & INTEGER_MASK);
+          }
+          c ^= b; c = (c - rot(b,14)) & INTEGER_MASK;
+          a ^= c; a = (a - rot(c,11)) & INTEGER_MASK;
+          b ^= a; b = (b - rot(a,25)) & INTEGER_MASK;
+          c ^= b; c = (c - rot(b,16)) & INTEGER_MASK;
+          a ^= c; a = (a - rot(c,4))  & INTEGER_MASK;
+          b ^= a; b = (b - rot(a,14)) & INTEGER_MASK;
+          c ^= b; c = (c - rot(b,24)) & INTEGER_MASK;
+
+          return (int)(c & INTEGER_MASK);
+        }
+    };
+
+    /**
+     * A HashAlgorithim instance which uses CRC32 checksum
+     * algorithim to compute the hash value.
+     */
+    public static HashAlgorithim CRC32 = new HashAlgorithim() {
+        public int hash(byte[] data, int len) {
+            Checksum checksum =  new CRC32();
+            checksum.update(data, 0, len);
+            return (int) (((checksum.getValue() >> 32) ^ checksum.getValue()) &
INTEGER_MASK);
+        }
+    };
+
+    /**
+     * Used to convert an object to a byte[] by basically doing:
+     * Object.toString().getBytes("UTF-8")
+     */
+    public class ToStringMarshaller extends VariableMarshaller {
+        public void writePayload(Object o, DataOutput dataOutput) throws IOException {
+            dataOutput.write(o.toString().getBytes("UTF-8"));
+        }
+
+        public Object readPayload(DataInput dataInput) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * This Hasher implementation works with any type of object by using
+     * Object.toString() and uses a checksum to compute the hash of
+     * the key and value.
+     */
+    public class ToStringHasher extends BinaryHasher {
+
+        /**
+         * Constructs a ToStringChecksumHasher that uses the JENKINS hash algorithim. 
+         */
+        public ToStringHasher() {
+            this(JENKINS);
+        }
+
+        public ToStringHasher(HashAlgorithim hashAlgorithim) {
+            super(new ToStringMarshaller(), new ToStringMarshaller(), hashAlgorithim);
+        }
+
+        @Override
+        public int hashNode(Object node, int i) {
+            return super.hashKey(node.toString() + ":" + i);
+        }
+    }
+
+    ;
+
+
+}

Added: activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/HashRingTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/HashRingTest.java?rev=806921&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/HashRingTest.java
(added)
+++ activemq/sandbox/activemq-flow/activemq-util/src/test/java/org/apache/activemq/util/HashRingTest.java
Sun Aug 23 02:23:31 2009
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import junit.framework.TestCase;
+
+import java.util.HashMap;
+import java.util.Random;
+
+/**
+ */
+public class HashRingTest extends TestCase {
+
+    public void testHashRingTest() throws Exception {
+        HashRing<String, String> ring = new HashRing<String, String>();
+        assertNull( ring.get("foo") );
+
+        ring.add("node1");
+        assertEquals("node1", ring.get("foo"));
+
+        assertFalse(ring.remove("node2"));
+        assertTrue(ring.remove("node1"));
+
+        assertNull( ring.get("foo") );
+
+    }
+
+    // Changing this random seed wiil change the generated data used
+    // in the distribution tests and change the deviation results.  
+    private static final int RANDOM_SEED = 274423;
+
+
+    public void testJENKINSDistribution() throws Exception {
+        double deviationPercent = findDistribution(new Hasher.ToStringHasher(Hasher.JENKINS),
"JENKINS");
+        assertTrue(deviationPercent < 3.23 );
+    }
+
+    public void testMURMURDistribution() throws Exception {
+        double deviationPercent = findDistribution(new Hasher.ToStringHasher(Hasher.MURMUR),
"MURMUR");
+        assertTrue(deviationPercent < 6.13 );
+    }
+
+    public void testMD5Distribution() throws Exception {
+        double deviationPercent = findDistribution(new Hasher.ToStringHasher(Hasher.MD5),
"MD5");
+        assertTrue(deviationPercent < 7.18 );
+    }
+
+    public void testFNV1ADistribution() throws Exception {
+        double deviationPercent = findDistribution(new Hasher.ToStringHasher(Hasher.FNV1A),
"FNV1A");
+        assertTrue(deviationPercent < 6.66 );
+    }
+
+    public void testCRC32Distribution() throws Exception {
+        double deviationPercent = findDistribution(new Hasher.ToStringHasher(Hasher.CRC32),
"CRC32");
+        assertTrue(deviationPercent < 25.92 );
+    }
+
+    /**
+     * Goes to show that the native String.hashCode() implementation is not very good a generating
+     * a uniformly distributed hash values.
+     */
+    public void testNativeDistribution() throws Exception {
+        double deviationPercent = findDistribution(new Hasher.Native(), "Native");
+        assertTrue(deviationPercent < 60.66 );
+    }
+
+    Random random;
+
+    private double findDistribution(Hasher hasher, String name) {
+        double deviation;
+        final int NODE_COUNT = 5;
+        final int KEYS_PER_NODE = 100000;
+        final int KEY_COUNT = KEYS_PER_NODE*NODE_COUNT;
+        int counts[] = new int[NODE_COUNT];
+
+        System.out.println("Tesing key distribution of hasher: "+name);
+        random = new Random(RANDOM_SEED);
+        HashRing<String, String> ring = new HashRing<String, String>(hasher);
+        HashMap<String, Integer> nodes = new  HashMap<String, Integer>();
+        for( int i=0; i< NODE_COUNT; i++) {
+            String node = randomWord(8);
+            ring.add(node);
+            nodes.put(node, i);
+        }
+
+        long start = System.currentTimeMillis();
+        for( int i=0; i< KEY_COUNT; i++) {
+            String node = ring.get(randomWord(15));
+            counts[nodes.get(node)] ++;
+        }
+        long end = System.currentTimeMillis();
+        System.out.println("Hashed "+KEY_COUNT+" keys in "+(end-start)+" ms.");
+
+        deviation = stdDeviation(counts);
+        double deviationPercent = (deviation / KEYS_PER_NODE) * 100;
+        System.out.println(name +" stadard deviation: "+deviation+", as percent of keys/node:
"+ deviationPercent +"%");
+        return deviationPercent;
+    }
+
+    char [] wordCharacters = createWordCharacters();
+
+    private char[] createWordCharacters() {
+        StringBuilder sb = new StringBuilder();
+        for( char c='a'; c <= 'z'; c++) {
+            sb.append(c);
+        }
+        for( char c='A'; c <= 'Z'; c++) {
+            sb.append(c);
+        }
+        for( char c='0'; c <= '9'; c++) {
+            sb.append(c);
+        }
+        return sb.toString().toCharArray();
+    }
+
+
+    public double stdDeviation(int []values) {
+        long c=0;
+        for (int value : values) {
+            c += value;
+        }
+        double mean = (1.0 * c) / values.length;
+        double rc = 0;
+        for (int value : values) {
+            double v = value - mean;
+            rc += (v*v);
+        }
+        return Math.sqrt(rc / values.length);
+    }
+
+    private String randomWord(int size) {
+        StringBuilder sb = new StringBuilder(size);
+        for( int i=0; i < size; i++) {
+            sb.append(wordCharacters[random.nextInt(wordCharacters.length)]);
+        }
+        return sb.toString();
+    }
+}



Mime
View raw message