cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jo...@apache.org
Subject svn commit: r1051679 [6/6] - in /cassandra/trunk: ./ conf/ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/or...
Date Tue, 21 Dec 2010 22:17:12 GMT
Added: cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,346 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.AbstractCommutativeType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CounterColumnTest
+{
+    private static final CounterContext cc = new CounterContext();
+
+    private static final int idLength;
+    private static final int clockLength;
+    private static final int countLength;
+
+    private static final int stepLength;
+
+    static
+    {
+        idLength      = 4; // size of int
+        clockLength   = 8; // size of long
+        countLength   = 8; // size of long
+
+        stepLength    = idLength + clockLength + countLength;
+    }
+
+    @Test
+    public void testCreate() throws UnknownHostException
+    {
+        AbstractCommutativeType type = CounterColumnType.instance;
+        long delta = 3L;
+        CounterColumn column = (CounterColumn)type.createColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(delta), 1L);
+        assert delta == column.value().getLong(column.value().arrayOffset());
+        assert 0 == column.partitionedCounter().length;
+
+        InetAddress node = InetAddress.getByAddress(FBUtilities.toByteArray(1));
+        column.update(node);
+        assert delta == column.value().getLong(column.value().arrayOffset());
+        assert  1 == FBUtilities.byteArrayToInt( column.partitionedCounter(), 0*stepLength);
+        assert 1L == FBUtilities.byteArrayToLong(column.partitionedCounter(), 0*stepLength + idLength);
+        assert 3L == FBUtilities.byteArrayToLong(column.partitionedCounter(), 0*stepLength + idLength + clockLength);
+    }
+
+    @Test
+    public void testUpdate() throws UnknownHostException
+    {
+        CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 0L);
+        assert 0L == c.value().getLong(c.value().arrayOffset());
+
+        assert c.partitionedCounter().length == 0 : "badly formatted initial context";
+
+        c.value = FBUtilities.toByteBuffer(1L);
+        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(1)));
+        assert 1L == c.value().getLong(c.value().arrayOffset());
+
+        assert c.partitionedCounter().length == stepLength;
+
+        assert  1 == FBUtilities.byteArrayToInt( c.partitionedCounter(), 0*stepLength);
+        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength);
+        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength + clockLength);
+
+        c.value = FBUtilities.toByteBuffer(3L);
+        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2)));
+
+        c.value = FBUtilities.toByteBuffer(2L);
+        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2)));
+
+        c.value = FBUtilities.toByteBuffer(9L);
+        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2)));
+
+        assert 15L == c.value().getLong(c.value().arrayOffset());
+
+        assert c.partitionedCounter().length == (2 * stepLength);
+
+        assert   2 == FBUtilities.byteArrayToInt(c.partitionedCounter(),  0*stepLength);
+        assert  3L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength);
+        assert 14L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength + clockLength);
+
+        assert  1 == FBUtilities.byteArrayToInt(c.partitionedCounter(),  1*stepLength);
+        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 1*stepLength + idLength);
+        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 1*stepLength + idLength + clockLength);
+    }
+
+    @Test
+    public void testReconcile() throws UnknownHostException
+    {
+        IColumn left;
+        IColumn right;
+        IColumn reconciled;
+
+        // tombstone + tombstone
+        left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L);
+        right = new DeletedColumn(ByteBufferUtil.bytes("x"), 2, 2L);
+
+        assert left.reconcile(right).timestamp() == right.timestamp();
+        assert right.reconcile(left).timestamp() == right.timestamp();
+        
+        // tombstone > live
+        left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 1L);
+
+        assert left.reconcile(right) == left;
+
+        // tombstone < live last delete
+        left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 4L, new byte[0], 2L);
+
+        assert left.reconcile(right) == right;
+
+        // tombstone == live last delete
+        left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 4L, new byte[0], 2L);
+
+        assert left.reconcile(right) == right;
+
+        // tombstone > live last delete
+        left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 4L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 9L, new byte[0], 1L);
+
+        reconciled = left.reconcile(right);
+        assert reconciled.name() == right.name();
+        assert reconciled.value() == right.value();
+        assert reconciled.timestamp() == right.timestamp();
+        assert ((CounterColumn)reconciled).partitionedCounter() == ((CounterColumn)right).partitionedCounter();
+        assert ((CounterColumn)reconciled).timestampOfLastDelete() == left.timestamp();
+
+        // live < tombstone
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 1L);
+        right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
+
+        assert left.reconcile(right) == right;
+
+        // live last delete > tombstone
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 4L, new byte[0], 2L);
+        right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L);
+
+        assert left.reconcile(right) == left;
+
+        // live last delete == tombstone
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 4L, new byte[0], 2L);
+        right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
+
+        assert left.reconcile(right) == left;
+
+        // live last delete < tombstone
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), FBUtilities.toByteBuffer(0L), 9L, new byte[0], 1L);
+        right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 4L);
+
+        reconciled = left.reconcile(right);
+        assert reconciled.name() == left.name();
+        assert reconciled.value() == left.value();
+        assert reconciled.timestamp() == left.timestamp();
+        assert ((CounterColumn)reconciled).partitionedCounter() == ((CounterColumn)left).partitionedCounter();
+        assert ((CounterColumn)reconciled).timestampOfLastDelete() == right.timestamp();
+
+        // live + live
+        byte[] context;
+
+        context = new byte[0];
+        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 1L);
+        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 0L);
+        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 1L);
+        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 0L);
+        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 5L);
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 9L, context, 1L);
+
+        context = new byte[0];
+        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 4L);
+        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 2L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 3L, context, 4L);
+
+        reconciled = left.reconcile(right);
+        assert reconciled.name() == left.name();
+        assert 9L == reconciled.value().getLong(reconciled.value().arrayOffset());
+        assert reconciled.timestamp() == 9L;
+
+        context = ((CounterColumn)reconciled).partitionedCounter();
+        assert 3 * stepLength == context.length;
+
+        assert  1 == FBUtilities.byteArrayToInt(context,  0*stepLength);
+        assert 3L == FBUtilities.byteArrayToLong(context, 0*stepLength + idLength);
+        assert 2L == FBUtilities.byteArrayToLong(context, 0*stepLength + idLength + clockLength);
+
+        assert  2 == FBUtilities.byteArrayToInt(context,  1*stepLength);
+        assert 2L == FBUtilities.byteArrayToLong(context, 1*stepLength + idLength);
+        assert 5L == FBUtilities.byteArrayToLong(context, 1*stepLength + idLength + clockLength);
+
+        assert  3 == FBUtilities.byteArrayToInt(context,  2*stepLength);
+        assert 1L == FBUtilities.byteArrayToLong(context, 2*stepLength + idLength);
+        assert 2L == FBUtilities.byteArrayToLong(context, 2*stepLength + idLength + clockLength);
+
+        assert ((CounterColumn)reconciled).timestampOfLastDelete() == 4L;
+    }
+
+    @Test
+    public void testDiff() throws UnknownHostException
+    {
+        byte[] left;
+        byte[] right;
+
+        CounterColumn leftCol;
+        CounterColumn rightCol;
+
+        // timestamp
+        left    = new byte[0];
+        leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left);
+
+        right    = new byte[0];
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 2L, right);
+
+        assert rightCol == leftCol.diff(rightCol);
+        assert null     == rightCol.diff(leftCol);
+
+        // timestampOfLastDelete
+        left    = new byte[0];
+        leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left, 1L);
+
+        right    = new byte[0];
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right, 2L);
+
+        assert rightCol == leftCol.diff(rightCol);
+        assert null     == rightCol.diff(leftCol);
+
+        // equality: equal nodes, all counts same
+        left = Util.concatByteArrays(
+            FBUtilities.toByteArray(3), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(6), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(9), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
+            );
+        left = cc.update(left, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 0L);
+        right = ArrayUtils.clone(left);
+
+        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)),  1L, left);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right);
+        assert null == leftCol.diff(rightCol);
+
+        // greater than: left has superset of nodes (counts equal)
+        left = Util.concatByteArrays(
+            FBUtilities.toByteArray(3),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(6),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(12), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(0L)
+            );
+        right = Util.concatByteArrays(
+            FBUtilities.toByteArray(3),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(6),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
+            );
+
+        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)),  1L, left);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right);
+        assert null == leftCol.diff(rightCol);
+
+        // less than: right has subset of nodes (counts equal)
+        assert leftCol == rightCol.diff(leftCol);
+
+        // disjoint: right and left have disjoint node sets
+        left = Util.concatByteArrays(
+            FBUtilities.toByteArray(3),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(4),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
+            );
+        right = Util.concatByteArrays(
+            FBUtilities.toByteArray(3),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(6),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
+            );
+
+        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)),  1L, left);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right);
+        assert rightCol == leftCol.diff(rightCol);
+        assert leftCol  == rightCol.diff(leftCol);
+    }
+
+    @Test
+    public void testCleanNodeCounts() throws UnknownHostException
+    {
+        byte[] context = Util.concatByteArrays(
+            FBUtilities.toByteArray(1),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(1L),
+            FBUtilities.toByteArray(2),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(2L),
+            FBUtilities.toByteArray(4),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(8),  FBUtilities.toByteArray(4L), FBUtilities.toByteArray(4L)
+            );
+        CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 1L, context);
+
+        CounterColumn d = c.cleanNodeCounts(InetAddress.getByAddress(FBUtilities.toByteArray(4)));
+
+        assertEquals(7L, d.value().getLong(d.value().arrayOffset()));
+    }
+
+    @Test
+    public void testSerializeDeserialize() throws IOException
+    {
+        ColumnFamily cf;
+
+        byte[] context = Util.concatByteArrays(
+            FBUtilities.toByteArray(1),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(1L),
+            FBUtilities.toByteArray(2),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(2L),
+            FBUtilities.toByteArray(4),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(8),  FBUtilities.toByteArray(4L), FBUtilities.toByteArray(4L)
+            );
+        CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 1L, context);
+
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        Column.serializer().serialize(original, bufOut);
+
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(bufOut.getData(), 0, bufOut.getLength());
+        CounterColumn deserialized = (CounterColumn)Column.serializer().deserialize(new DataInputStream(bufIn));
+
+        assert original.equals(deserialized);
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Tue Dec 21 22:17:09 2010
@@ -115,6 +115,7 @@ public class DefsTest extends CleanupHel
                 1.0,
                 1.0,
                 0.5,
+                false,
                 100000,
                 null,
                 500,
@@ -766,6 +767,7 @@ public class DefsTest extends CleanupHel
                               0,
                               1.0,
                               0,
+							  false,
                               CFMetaData.DEFAULT_GC_GRACE_SECONDS,
                               BytesType.instance,
                               CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD,

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java Tue Dec 21 22:17:09 2010
@@ -24,12 +24,16 @@ import org.junit.Test;
 import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.assertNull;
 import static org.apache.cassandra.Util.getBytes;
+import static org.apache.cassandra.Util.concatByteArrays;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.utils.ByteBufferUtil;
-
+import org.apache.cassandra.utils.FBUtilities;
 
 public class SuperColumnTest
 {   
+    private static final CounterContext cc = new CounterContext();
+
     @Test
     public void testMissingSubcolumn() {
     	SuperColumn sc = new SuperColumn(ByteBufferUtil.bytes("sc1"), LongType.instance);
@@ -37,4 +41,69 @@ public class SuperColumnTest
     	assertNotNull(sc.getSubColumn(getBytes(1)));
     	assertNull(sc.getSubColumn(getBytes(2)));
     }
+
+    @Test
+    public void testAddColumnIncrementCounter()
+    {
+        byte[] context;
+
+    	SuperColumn sc = new SuperColumn(ByteBufferUtil.bytes("sc1"), LongType.instance);
+
+        context = concatByteArrays(
+            FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(1), FBUtilities.toByteArray(7L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(2), FBUtilities.toByteArray(5L), FBUtilities.toByteArray(7L),
+            FBUtilities.toByteArray(4), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(9L)
+            );
+    	sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(cc.total(context)), 3L, context, 0L));
+        context = concatByteArrays(
+            FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(5L),
+            FBUtilities.toByteArray(8), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(4), FBUtilities.toByteArray(4L), FBUtilities.toByteArray(1L),
+            FBUtilities.toByteArray(2), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(4L)
+            );
+    	sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(cc.total(context)), 10L, context, 0L));
+
+        context = concatByteArrays(
+            FBUtilities.toByteArray(3), FBUtilities.toByteArray(6L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(7), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
+            FBUtilities.toByteArray(2), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
+            );
+    	sc.addColumn(new CounterColumn(getBytes(2), ByteBuffer.wrap(cc.total(context)), 9L, context, 0L));
+                    
+    	assertNotNull(sc.getSubColumn(getBytes(1)));
+    	assertNull(sc.getSubColumn(getBytes(3)));
+
+        // column: 1
+    	byte[] c1 = concatByteArrays(
+                FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(12L), FBUtilities.toByteArray(8L),
+                FBUtilities.toByteArray(8), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(0L),
+                FBUtilities.toByteArray(1), FBUtilities.toByteArray(7L), FBUtilities.toByteArray(0L),
+                FBUtilities.toByteArray(2), FBUtilities.toByteArray(5L), FBUtilities.toByteArray(7L),
+                FBUtilities.toByteArray(4), FBUtilities.toByteArray(4L), FBUtilities.toByteArray(1L)
+                );
+        assert 0 == FBUtilities.compareByteSubArrays(
+            ((CounterColumn)sc.getSubColumn(getBytes(1))).partitionedCounter(),
+            0,
+            c1,
+            0,
+            c1.length);
+
+        // column: 2
+        byte[] c2 = concatByteArrays(
+                FBUtilities.toByteArray(3), FBUtilities.toByteArray(6L), FBUtilities.toByteArray(0L),
+                FBUtilities.toByteArray(7), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
+                FBUtilities.toByteArray(2), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
+                );
+        assert 0 == FBUtilities.compareByteSubArrays(
+            ((CounterColumn)sc.getSubColumn(getBytes(2))).partitionedCounter(),
+            0,
+            c2,
+            0,
+            c2.length);
+
+    	assertNotNull(sc.getSubColumn(getBytes(1)));
+    	assertNotNull(sc.getSubColumn(getBytes(2)));
+    	assertNull(sc.getSubColumn(getBytes(3)));
+    }
 }

Added: cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,536 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+package org.apache.cassandra.db.context;
+
+import static org.junit.Assert.*;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import org.junit.Test;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.context.IContext.ContextRelationship;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Note: these tests assume IPv4 (4 bytes) is used for id.
+ *       if IPv6 (16 bytes) is used, tests will fail (but the code will work).
+ *       however, it might be pragmatic to modify the code to just use
+ *       the IPv4 portion of the IPv6 address-space.
+ */
+public class CounterContextTest
+{
+    private static final CounterContext cc = new CounterContext();
+
+    private static final InetAddress idAddress;
+    private static final byte[] id;
+    private static final int idLength;
+    private static final int clockLength;
+    private static final int countLength;
+
+    private static final int stepLength;
+    private static final int defaultEntries;
+
+    static
+    {
+        idAddress      = FBUtilities.getLocalAddress();
+        id             = idAddress.getAddress();
+        idLength       = 4; // size of int
+        clockLength    = 8; // size of long
+        countLength    = 8; // size of long
+        stepLength     = idLength + clockLength + countLength;
+
+        defaultEntries = 10;
+    }
+
+    @Test
+    public void testCreate()
+    {
+        byte[] context = cc.create();
+        assert context.length == 0;
+    }
+
+    @Test
+    public void testUpdatePresentReorder() throws UnknownHostException
+    {
+        byte[] context;
+
+        context = new byte[stepLength * defaultEntries];
+
+        for (int i = 0; i < defaultEntries - 1; i++)
+        {
+            cc.writeElementAtStepOffset(
+                context,
+                i,
+                FBUtilities.toByteArray(i),
+                1L,
+                1L);
+        }
+        cc.writeElementAtStepOffset(
+            context,
+            (defaultEntries - 1),
+            id,
+            2L,
+            3L);
+
+        context = cc.update(context, idAddress, 10L);
+
+        assertEquals(context.length, stepLength * defaultEntries);
+        assertEquals(  3L, FBUtilities.byteArrayToLong(context, idLength));
+        assertEquals( 13L, FBUtilities.byteArrayToLong(context, idLength + clockLength));
+        for (int i = 1; i < defaultEntries; i++)
+        {
+            int offset = i * stepLength;
+            assertEquals( i-1, FBUtilities.byteArrayToInt(context,  offset));
+            assertEquals(1L, FBUtilities.byteArrayToLong(context, offset + idLength));
+            assertEquals(1L, FBUtilities.byteArrayToLong(context, offset + idLength + clockLength));
+        }
+    }
+
+    @Test
+    public void testUpdateNotPresent()
+    {
+        byte[] context = new byte[stepLength * 2];
+
+        for (int i = 0; i < 2; i++)
+        {
+            cc.writeElementAtStepOffset(
+                context,
+                i,
+                FBUtilities.toByteArray(i),
+                1L,
+                1L);
+        }
+
+        context = cc.update(context, idAddress, 328L);
+
+        assert context.length == stepLength * 3;
+        assert   1L == FBUtilities.byteArrayToLong(context, idLength);
+        assert 328L == FBUtilities.byteArrayToLong(context, idLength + clockLength);
+        for (int i = 1; i < 3; i++)
+        {
+            int offset = i * stepLength;
+            assert i-1 == FBUtilities.byteArrayToInt(context,  offset);
+            assert  1L == FBUtilities.byteArrayToLong(context, offset + idLength);
+            assert  1L == FBUtilities.byteArrayToLong(context, offset + idLength + clockLength);
+        }
+    }
+
+    @Test
+    public void testSwapElement()
+    {
+        byte[] context = new byte[stepLength * 3];
+
+        for (int i = 0; i < 3; i++)
+        {
+            cc.writeElementAtStepOffset(
+                context,
+                i,
+                FBUtilities.toByteArray(i),
+                1L,
+                1L);
+        }
+        cc.swapElement(context, 0, 2*stepLength);
+
+        assert 2 == FBUtilities.byteArrayToInt(context, 0);
+        assert 0 == FBUtilities.byteArrayToInt(context, 2*stepLength);
+
+        cc.swapElement(context, 0, 1*stepLength);
+
+        assert 1 == FBUtilities.byteArrayToInt(context, 0);
+        assert 2 == FBUtilities.byteArrayToInt(context, 1*stepLength);
+    }
+
+    @Test
+    public void testPartitionElements()
+    {
+        byte[] context = new byte[stepLength * 10];
+
+        cc.writeElementAtStepOffset(context, 0, FBUtilities.toByteArray(5), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 1, FBUtilities.toByteArray(3), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 2, FBUtilities.toByteArray(6), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 3, FBUtilities.toByteArray(7), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 4, FBUtilities.toByteArray(8), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 5, FBUtilities.toByteArray(9), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 6, FBUtilities.toByteArray(2), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 7, FBUtilities.toByteArray(4), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 8, FBUtilities.toByteArray(1), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 9, FBUtilities.toByteArray(3), 1L, 1L);
+
+        cc.partitionElements(
+            context,
+            0, // left
+            9, // right (inclusive)
+            2  // pivot
+            );
+
+        assert 5 == FBUtilities.byteArrayToInt(context, 0*stepLength);
+        assert 3 == FBUtilities.byteArrayToInt(context, 1*stepLength);
+        assert 3 == FBUtilities.byteArrayToInt(context, 2*stepLength);
+        assert 2 == FBUtilities.byteArrayToInt(context, 3*stepLength);
+        assert 4 == FBUtilities.byteArrayToInt(context, 4*stepLength);
+        assert 1 == FBUtilities.byteArrayToInt(context, 5*stepLength);
+        assert 6 == FBUtilities.byteArrayToInt(context, 6*stepLength);
+        assert 8 == FBUtilities.byteArrayToInt(context, 7*stepLength);
+        assert 9 == FBUtilities.byteArrayToInt(context, 8*stepLength);
+        assert 7 == FBUtilities.byteArrayToInt(context, 9*stepLength);
+    }
+
+    @Test
+    public void testSortElementsById()
+    {
+        byte[] context = new byte[stepLength * 10];
+
+        cc.writeElementAtStepOffset(context, 0, FBUtilities.toByteArray(5), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 1, FBUtilities.toByteArray(3), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 2, FBUtilities.toByteArray(6), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 3, FBUtilities.toByteArray(7), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 4, FBUtilities.toByteArray(8), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 5, FBUtilities.toByteArray(9), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 6, FBUtilities.toByteArray(2), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 7, FBUtilities.toByteArray(4), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 8, FBUtilities.toByteArray(1), 1L, 1L);
+        cc.writeElementAtStepOffset(context, 9, FBUtilities.toByteArray(3), 1L, 1L);
+        
+        byte[] sorted = cc.sortElementsById(context);
+
+        assertEquals( 1, FBUtilities.byteArrayToInt(sorted, 0*stepLength));
+        assertEquals( 2, FBUtilities.byteArrayToInt(sorted, 1*stepLength));
+        assertEquals( 3, FBUtilities.byteArrayToInt(sorted, 2*stepLength));
+        assertEquals( 3, FBUtilities.byteArrayToInt(sorted, 3*stepLength));
+        assertEquals( 4, FBUtilities.byteArrayToInt(sorted, 4*stepLength));
+        assertEquals( 5, FBUtilities.byteArrayToInt(sorted, 5*stepLength));
+        assertEquals( 6, FBUtilities.byteArrayToInt(sorted, 6*stepLength));
+        assertEquals( 7, FBUtilities.byteArrayToInt(sorted, 7*stepLength));
+        assertEquals( 8, FBUtilities.byteArrayToInt(sorted, 8*stepLength));
+        assertEquals( 9, FBUtilities.byteArrayToInt(sorted, 9*stepLength));
+    }
+
+    @Test
+    public void testDiff()
+    {
+        byte[] left = new byte[3 * stepLength];
+        byte[] right;
+
+        // equality: equal nodes, all counts same
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 3L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 1L, 0L);
+        right = ArrayUtils.clone(left);
+
+        assert ContextRelationship.EQUAL ==
+            cc.diff(left, right);
+
+        // greater than: left has superset of nodes (counts equal)
+        left = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3),  3L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6),  2L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9),  1L, 0L);
+        cc.writeElementAtStepOffset(left, 3, FBUtilities.toByteArray(12), 0L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 3L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        assert ContextRelationship.GREATER_THAN ==
+            cc.diff(left, right);
+        
+        // less than: left has subset of nodes (counts equal)
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 3L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        right = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3),  3L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6),  2L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9),  1L, 0L);
+        cc.writeElementAtStepOffset(right, 3, FBUtilities.toByteArray(12), 0L, 0L);
+
+        assert ContextRelationship.LESS_THAN ==
+            cc.diff(left, right);
+
+        // greater than: equal nodes, but left has higher counts
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 3L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 3L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 3L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        assert ContextRelationship.GREATER_THAN ==
+            cc.diff(left, right);
+
+        // less than: equal nodes, but right has higher counts
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 3L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 3L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 3L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 9L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 3L, 0L);
+
+        assert ContextRelationship.LESS_THAN ==
+            cc.diff(left, right);
+
+        // disjoint: right and left have disjoint node sets
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 1L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(4), 1L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 1L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 1L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 1L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(4), 1L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(2),  1L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6),  1L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(12), 1L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+
+        // disjoint: equal nodes, but right and left have higher counts in differing nodes
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 1L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 3L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 1L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 1L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 5L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 3L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 1L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 1L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 9L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 5L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+
+        // disjoint: left has more nodes, but lower counts
+        left = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3),  2L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6),  3L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9),  1L, 0L);
+        cc.writeElementAtStepOffset(left, 3, FBUtilities.toByteArray(12), 1L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 4L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 9L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 5L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+        
+        // disjoint: left has less nodes, but higher counts
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 5L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 3L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 2L, 0L);
+
+        right = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3),  4L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6),  3L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9),  2L, 0L);
+        cc.writeElementAtStepOffset(right, 3, FBUtilities.toByteArray(12), 1L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+
+        // disjoint: mixed nodes and counts
+        left = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 5L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(9), 2L, 0L);
+
+        right = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3),  4L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6),  3L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9),  2L, 0L);
+        cc.writeElementAtStepOffset(right, 3, FBUtilities.toByteArray(12), 1L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+
+        left = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(3), 5L, 0L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(6), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(7), 2L, 0L);
+        cc.writeElementAtStepOffset(left, 3, FBUtilities.toByteArray(9), 2L, 0L);
+
+        right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 0, FBUtilities.toByteArray(3), 4L, 0L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(6), 3L, 0L);
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(9), 2L, 0L);
+
+        assert ContextRelationship.DISJOINT ==
+            cc.diff(left, right);
+    }
+
+    @Test
+    public void testMerge()
+    {
+        // note: local counts aggregated; remote counts are reconciled (i.e. take max)
+        byte[] left = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(1), 1L, 1L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(2), 2L, 2L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(4), 6L, 3L);
+        cc.writeElementAtStepOffset(
+            left,
+            3,
+            FBUtilities.getLocalAddress().getAddress(),
+            7L,
+            3L);
+
+        byte[] right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(5), 5L, 5L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(4), 4L, 4L);
+        cc.writeElementAtStepOffset(
+            right,
+            0,
+            FBUtilities.getLocalAddress().getAddress(),
+            2L,
+            9L);
+
+        byte[] merged = cc.merge(left, right);
+
+        // local node id's counts are aggregated
+        assertEquals(0, FBUtilities.compareUnsigned(
+            FBUtilities.getLocalAddress().getAddress(),
+            merged, 
+            0,
+            0*stepLength,
+            4,
+            4));
+        assertEquals(  9L, FBUtilities.byteArrayToLong(merged, 0*stepLength + idLength));
+        assertEquals(12L,  FBUtilities.byteArrayToLong(merged, 0*stepLength + idLength + clockLength));
+
+        // remote node id counts are reconciled (i.e. take max)
+        assertEquals( 4,   FBUtilities.byteArrayToInt(merged,  1*stepLength));
+        assertEquals( 6L,  FBUtilities.byteArrayToLong(merged, 1*stepLength + idLength));
+        assertEquals( 3L,  FBUtilities.byteArrayToLong(merged, 1*stepLength + idLength + clockLength));
+
+        assertEquals( 5,   FBUtilities.byteArrayToInt(merged,  2*stepLength));
+        assertEquals( 5L,  FBUtilities.byteArrayToLong(merged, 2*stepLength + idLength));
+        assertEquals( 5L,  FBUtilities.byteArrayToLong(merged, 2*stepLength + idLength + clockLength));
+
+        assertEquals( 2,   FBUtilities.byteArrayToInt(merged,  3*stepLength));
+        assertEquals( 2L,  FBUtilities.byteArrayToLong(merged, 3*stepLength + idLength));
+        assertEquals( 2L,  FBUtilities.byteArrayToLong(merged, 3*stepLength + idLength + clockLength));
+
+        assertEquals( 1,   FBUtilities.byteArrayToInt(merged,  4*stepLength));
+        assertEquals( 1L,  FBUtilities.byteArrayToLong(merged, 4*stepLength + idLength));
+        assertEquals( 1L,  FBUtilities.byteArrayToLong(merged, 4*stepLength + idLength + clockLength));
+    }
+
+    @Test
+    public void testTotal()
+    {
+        byte[] left = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(left, 0, FBUtilities.toByteArray(1), 1L, 1L);
+        cc.writeElementAtStepOffset(left, 1, FBUtilities.toByteArray(2), 2L, 2L);
+        cc.writeElementAtStepOffset(left, 2, FBUtilities.toByteArray(4), 3L, 3L);
+        cc.writeElementAtStepOffset(
+            left,
+            3,
+            FBUtilities.getLocalAddress().getAddress(),
+            3L,
+            3L);
+
+        byte[] right = new byte[3 * stepLength];
+        cc.writeElementAtStepOffset(right, 2, FBUtilities.toByteArray(5), 5L, 5L);
+        cc.writeElementAtStepOffset(right, 1, FBUtilities.toByteArray(4), 4L, 4L);
+        cc.writeElementAtStepOffset(
+            right,
+            0,
+            FBUtilities.getLocalAddress().getAddress(),
+            9L,
+            9L);
+
+        byte[] merged = cc.merge(left, right);
+
+        // 127.0.0.1: 12 (3+9)
+        // 0.0.0.1:    1
+        // 0.0.0.2:    2
+        // 0.0.0.4:    4
+        // 0.0.0.5:    5
+
+        assertEquals(24L, FBUtilities.byteArrayToLong(cc.total(merged)));
+    }
+
+    @Test
+    public void testCleanNodeCounts() throws UnknownHostException
+    {
+        byte[] bytes = new byte[4 * stepLength];
+        cc.writeElementAtStepOffset(bytes, 0, FBUtilities.toByteArray(1), 1L, 1L);
+        cc.writeElementAtStepOffset(bytes, 1, FBUtilities.toByteArray(2), 2L, 2L);
+        cc.writeElementAtStepOffset(bytes, 2, FBUtilities.toByteArray(4), 3L, 3L);
+        cc.writeElementAtStepOffset(bytes, 3, FBUtilities.toByteArray(8), 4L, 4L);
+
+        assertEquals(4, FBUtilities.byteArrayToInt(bytes,  2*stepLength));
+        assertEquals(3L, FBUtilities.byteArrayToLong(bytes, 2*stepLength + idLength));
+
+        bytes = cc.cleanNodeCounts(bytes, InetAddress.getByAddress(FBUtilities.toByteArray(4)));
+
+        // node: 0.0.0.4 should be removed
+        assertEquals(3 * stepLength, bytes.length);
+
+        // other nodes should be unaffected
+        assertEquals(1, FBUtilities.byteArrayToInt(bytes,  0*stepLength));
+        assertEquals(1L, FBUtilities.byteArrayToLong(bytes, 0*stepLength + idLength));
+
+        assertEquals(2, FBUtilities.byteArrayToInt(bytes,  1*stepLength));
+        assertEquals(2L, FBUtilities.byteArrayToLong(bytes, 1*stepLength + idLength));
+
+        assertEquals(8, FBUtilities.byteArrayToInt(bytes,  2*stepLength));
+        assertEquals(4L, FBUtilities.byteArrayToLong(bytes, 2*stepLength + idLength));
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java Tue Dec 21 22:17:09 2010
@@ -52,6 +52,11 @@ public class SSTableUtils
 
     public static File tempSSTableFile(String tablename, String cfname) throws IOException
     {
+        return tempSSTableFile(tablename, cfname, 0);
+    }
+
+    public static File tempSSTableFile(String tablename, String cfname, int generation) throws IOException
+    {
         File tempdir = File.createTempFile(tablename, cfname);
         if(!tempdir.delete() || !tempdir.mkdir())
             throw new IOException("Temporary directory creation failed.");
@@ -59,7 +64,7 @@ public class SSTableUtils
         File tabledir = new File(tempdir, tablename);
         tabledir.mkdir();
         tabledir.deleteOnExit();
-        File datafile = new File(new Descriptor(tabledir, tablename, cfname, 0, false).filenameFor("Data.db"));
+        File datafile = new File(new Descriptor(tabledir, tablename, cfname, generation, false).filenameFor("Data.db"));
         if (!datafile.createNewFile())
             throw new IOException("unable to create file " + datafile);
         datafile.deleteOnExit();
@@ -92,7 +97,12 @@ public class SSTableUtils
 
     public static SSTableReader writeRawSSTable(String tablename, String cfname, Map<ByteBuffer, ByteBuffer> entries) throws IOException
     {
-        File datafile = tempSSTableFile(tablename, cfname);
+        return writeRawSSTable(tablename, cfname, entries, 0);
+    }
+
+    public static SSTableReader writeRawSSTable(String tablename, String cfname, Map<ByteBuffer, ByteBuffer> entries, int generation) throws IOException
+    {
+        File datafile = tempSSTableFile(tablename, cfname, generation);
         SSTableWriter writer = new SSTableWriter(datafile.getAbsolutePath(), entries.size());
         SortedMap<DecoratedKey, ByteBuffer> sortedEntries = new TreeMap<DecoratedKey, ByteBuffer>();
         for (Map.Entry<ByteBuffer, ByteBuffer> entry : entries.entrySet())

Added: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterAESCommutativeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterAESCommutativeTest.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterAESCommutativeTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterAESCommutativeTest.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,186 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+package org.apache.cassandra.io.sstable;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Test;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SSTableWriterAESCommutativeTest extends CleanupHelper
+{
+    private static final CounterContext cc = new CounterContext();
+    private static final CounterColumnType ctype = CounterColumnType.instance;
+
+    @Test
+    public void testRecoverAndOpenAESCommutative() throws IOException, ExecutionException, InterruptedException, UnknownHostException
+    {
+        String keyspace = "Keyspace1";
+        String cfname   = "Counter1";
+
+        Map<ByteBuffer, ByteBuffer> entries = new HashMap<ByteBuffer, ByteBuffer>();
+        Map<ByteBuffer, ByteBuffer> cleanedEntries = new HashMap<ByteBuffer, ByteBuffer>();
+
+        DataOutputBuffer buffer;
+
+        ColumnFamily cf = ColumnFamily.create(keyspace, cfname);
+        byte[] context;
+
+        // key: k
+        context = Util.concatByteArrays(
+            FBUtilities.getLocalAddress().getAddress(),
+                FBUtilities.toByteArray(9L),
+                FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(2),  FBUtilities.toByteArray(4L), FBUtilities.toByteArray(2L),
+            FBUtilities.toByteArray(4),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(8),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(4L)
+            );
+        cf.addColumn(new CounterColumn(
+            ByteBufferUtil.bytes("x"),
+            ByteBuffer.wrap(cc.total(context)),
+            0L,
+            context
+            ));
+        context = Util.concatByteArrays(
+            FBUtilities.toByteArray(1),  FBUtilities.toByteArray(7L), FBUtilities.toByteArray(12L),
+            FBUtilities.getLocalAddress().getAddress(),
+                FBUtilities.toByteArray(5L),
+                FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(3),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(33L),
+            FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(24L)
+            );
+        cf.addColumn(new CounterColumn(
+            ByteBufferUtil.bytes("y"),
+            ByteBuffer.wrap(cc.total(context)),
+            0L,
+            context
+            ));
+
+        buffer = new DataOutputBuffer();
+        ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+        entries.put(
+            ByteBufferUtil.bytes("k"),
+            ByteBuffer.wrap(Arrays.copyOf(buffer.getData(), buffer.getLength()))
+            );
+
+        ctype.cleanContext(cf, FBUtilities.getLocalAddress());
+        buffer = new DataOutputBuffer();
+        ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+        cleanedEntries.put(
+            ByteBufferUtil.bytes("k"),
+            ByteBuffer.wrap(Arrays.copyOf(buffer.getData(), buffer.getLength()))
+            );
+        
+        cf.clear();
+
+        // key: l
+        context = Util.concatByteArrays(
+            FBUtilities.getLocalAddress().getAddress(),
+                FBUtilities.toByteArray(9L),
+                FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(2),  FBUtilities.toByteArray(4L), FBUtilities.toByteArray(2L),
+            FBUtilities.toByteArray(4),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(8),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(4L)
+            );
+        cf.addColumn(new CounterColumn(
+            ByteBufferUtil.bytes("x"),
+            ByteBuffer.wrap(cc.total(context)),
+            0L,
+            context
+            ));
+        context = Util.concatByteArrays(
+            FBUtilities.toByteArray(1),  FBUtilities.toByteArray(7L), FBUtilities.toByteArray(12L),
+            FBUtilities.toByteArray(3),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(33L),
+            FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(24L)
+            );
+        cf.addColumn(new CounterColumn(
+            ByteBufferUtil.bytes("y"),
+            ByteBuffer.wrap(cc.total(context)),
+            0L,
+            context
+            ));
+
+        buffer = new DataOutputBuffer();
+        ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+        entries.put(
+            ByteBufferUtil.bytes("l"),
+            ByteBuffer.wrap(Arrays.copyOf(buffer.getData(), buffer.getLength()))
+            );
+
+        ctype.cleanContext(cf, FBUtilities.getLocalAddress());
+        buffer = new DataOutputBuffer();
+        ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+        cleanedEntries.put(
+            ByteBufferUtil.bytes("l"),
+            ByteBuffer.wrap(Arrays.copyOf(buffer.getData(), buffer.getLength()))
+            );
+
+        cf.clear();
+
+        // write out unmodified CF
+        SSTableReader orig = SSTableUtils.writeRawSSTable(keyspace, cfname, entries, 0);
+
+        // whack the index to trigger the recover
+        FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
+        FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
+
+        // re-build inline
+        SSTableReader rebuilt = CompactionManager.instance.submitSSTableBuild(
+            orig.descriptor,
+            OperationType.AES
+            ).get();
+
+        // write out cleaned CF
+        SSTableReader cleaned = SSTableUtils.writeRawSSTable(keyspace, cfname, cleanedEntries, 0);
+
+        // verify
+        BufferedRandomAccessFile origFile    = new BufferedRandomAccessFile(orig.descriptor.filenameFor(SSTable.COMPONENT_DATA), "r", 8 * 1024 * 1024);
+        BufferedRandomAccessFile cleanedFile = new BufferedRandomAccessFile(cleaned.descriptor.filenameFor(SSTable.COMPONENT_DATA), "r", 8 * 1024 * 1024);
+
+        while(origFile.getFilePointer() < origFile.length() && cleanedFile.getFilePointer() < cleanedFile.length())
+        {
+            assert origFile.readByte() == cleanedFile.readByte();
+        }
+        assert origFile.getFilePointer() == origFile.length();
+        assert cleanedFile.getFilePointer() == cleanedFile.length();
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Tue Dec 21 22:17:09 2010
@@ -41,6 +41,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
 import org.apache.cassandra.thrift.IndexOperator;
@@ -80,7 +81,7 @@ public class SSTableWriterTest extends C
         FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.PRIMARY_INDEX));
         FileUtils.deleteWithConfirm(orig.descriptor.filenameFor(Component.FILTER));
 
-        SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.descriptor).get();
+        SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.descriptor, OperationType.AES).get();
         assert sstr != null;
         ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
         cfs.addSSTable(sstr);

Added: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceCounterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceCounterTest.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceCounterTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceCounterTest.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,10 @@
+package org.apache.cassandra.service;
+
+public class AntiEntropyServiceCounterTest extends AntiEntropyServiceTestAbstract
+{
+    public void init()
+    {
+        tablename = "Keyspace5";
+        cfname    = "Counter1";
+    }
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceStandardTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceStandardTest.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceStandardTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceStandardTest.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,10 @@
+package org.apache.cassandra.service;
+
+public class AntiEntropyServiceStandardTest extends AntiEntropyServiceTestAbstract
+{
+    public void init()
+    {
+        tablename = "Keyspace5";
+        cfname    = "Standard1";
+    }
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,273 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.PrecompactedRow;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+
+import static org.apache.cassandra.service.AntiEntropyService.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public abstract class AntiEntropyServiceTestAbstract extends CleanupHelper
+{
+    // table and column family to test against
+    public AntiEntropyService aes;
+
+    public String tablename;
+    public String cfname;
+    public TreeRequest request;
+    public ColumnFamilyStore store;
+    public InetAddress LOCAL, REMOTE;
+
+    private boolean initialized;
+
+    public abstract void init();
+
+    @Before
+    public void prepare() throws Exception
+    {
+        if (!initialized)
+        {
+            initialized = true;
+
+            init();
+
+            LOCAL = FBUtilities.getLocalAddress();
+            StorageService.instance.initServer();
+            // generate a fake endpoint for which we can spoof receiving/sending trees
+            REMOTE = InetAddress.getByName("127.0.0.2");
+            store = null;
+            for (ColumnFamilyStore cfs : Table.open(tablename).getColumnFamilyStores())
+            {
+                if (cfs.columnFamily.equals(cfname))
+                {
+                    store = cfs;
+                    break;
+                }
+            }
+            assert store != null : "CF not found: " + cfname;
+        }
+
+        aes = AntiEntropyService.instance;
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        tmd.clearUnsafe();
+        StorageService.instance.setToken(StorageService.getPartitioner().getRandomToken());
+        tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
+        assert tmd.isMember(REMOTE);
+        
+        // random session id for each test
+        request = new TreeRequest(UUID.randomUUID().toString(), LOCAL, new CFPair(tablename, cfname));
+    }
+
+    @After
+    public void teardown() throws Exception
+    {
+        flushAES();
+    }
+
+    @Test
+    public void testValidatorPrepare() throws Throwable
+    {
+        Validator validator;
+
+        // write
+        List<RowMutation> rms = new LinkedList<RowMutation>();
+        RowMutation rm;
+        rm = new RowMutation(tablename, ByteBufferUtil.bytes("key1"));
+        rm.add(new QueryPath(cfname, null, ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"), 0);
+        rms.add(rm);
+        Util.writeColumnFamily(rms);
+
+        // sample
+        validator = new Validator(request);
+        validator.prepare(store);
+
+        // and confirm that the tree was split
+        assertTrue(validator.tree.size() > 1);
+    }
+    
+    @Test
+    public void testValidatorComplete() throws Throwable
+    {
+        Validator validator = new Validator(request);
+        validator.prepare(store);
+        validator.complete();
+
+        // confirm that the tree was validated
+        Token min = validator.tree.partitioner().getMinimumToken();
+        assert null != validator.tree.hash(new Range(min, min));
+    }
+
+    @Test
+    public void testValidatorAdd() throws Throwable
+    {
+        Validator validator = new Validator(request);
+        IPartitioner part = validator.tree.partitioner();
+        Token min = part.getMinimumToken();
+        Token mid = part.midpoint(min, min);
+        validator.prepare(store);
+
+        // add a row with the minimum token
+        validator.add(new PrecompactedRow(new DecoratedKey(min, ByteBufferUtil.bytes("nonsense!")),
+                                       new DataOutputBuffer()));
+
+        // and a row after it
+        validator.add(new PrecompactedRow(new DecoratedKey(mid, ByteBufferUtil.bytes("inconceivable!")),
+                                       new DataOutputBuffer()));
+        validator.complete();
+
+        // confirm that the tree was validated
+        assert null != validator.tree.hash(new Range(min, min));
+    }
+
+    @Test
+    public void testManualRepair() throws Throwable
+    {
+        AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(tablename, cfname);
+        sess.start();
+        sess.blockUntilRunning();
+
+        // ensure that the session doesn't end without a response from REMOTE
+        sess.join(100);
+        assert sess.isAlive();
+
+        // deliver a fake response from REMOTE
+        AntiEntropyService.instance.completedRequest(new TreeRequest(sess.getName(), REMOTE, request.cf));
+
+        // block until the repair has completed
+        sess.join();
+    }
+
+    @Test
+    public void testGetNeighborsPlusOne() throws Throwable
+    {
+        // generate rf+1 nodes, and ensure that all nodes are returned
+        Set<InetAddress> expected = addTokens(1 + Table.open(tablename).getReplicationStrategy().getReplicationFactor());
+        expected.remove(FBUtilities.getLocalAddress());
+        assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+    }
+
+    @Test
+    public void testGetNeighborsTimesTwo() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
+        addTokens(2 * Table.open(tablename).getReplicationStrategy().getReplicationFactor());
+        AbstractReplicationStrategy ars = Table.open(tablename).getReplicationStrategy();
+        Set<InetAddress> expected = new HashSet<InetAddress>();
+        for (Range replicaRange : ars.getAddressRanges().get(FBUtilities.getLocalAddress()))
+        {
+            expected.addAll(ars.getRangeAddresses(tmd).get(replicaRange));
+        }
+        expected.remove(FBUtilities.getLocalAddress());
+        assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
+    }
+
+    @Test
+    public void testDifferencer() throws Throwable
+    {
+        // generate a tree
+        Validator validator = new Validator(request);
+        validator.prepare(store);
+        validator.complete();
+        MerkleTree ltree = validator.tree;
+
+        // and a clone
+        validator = new Validator(request);
+        validator.prepare(store);
+        validator.complete();
+        MerkleTree rtree = validator.tree;
+
+        // change a range we own in one of the trees
+        Token ltoken = StorageService.instance.getLocalToken();
+        ltree.invalidate(ltoken);
+        MerkleTree.TreeRange changed = ltree.invalids(StorageService.instance.getLocalPrimaryRange()).next();
+        changed.hash("non-empty hash!".getBytes());
+        // the changed range has two halves, split on our local token: both will be repaired
+        // (since this keyspace has RF > N, so every node is responsible for the entire ring)
+        Set<Range> interesting = new HashSet<Range>();
+        interesting.add(new Range(changed.left, ltoken));
+        interesting.add(new Range(ltoken, changed.right));
+
+        // difference the trees
+        Differencer diff = new Differencer(request, ltree, rtree);
+        diff.run();
+        
+        // ensure that the changed range was recorded
+        assertEquals("Wrong differing ranges", interesting, new HashSet<Range>(diff.differences));
+    }
+
+    Set<InetAddress> addTokens(int max) throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+        Set<InetAddress> endpoints = new HashSet<InetAddress>();
+        for (int i = 1; i <= max; i++)
+        {
+            InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
+            tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint);
+            endpoints.add(endpoint);
+        }
+        return endpoints;
+    }
+
+    void flushAES() throws Exception
+    {
+        final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY);
+        final Callable noop = new Callable<Object>()
+        {
+            public Boolean call()
+            {
+                return true;
+            }
+        };
+        
+        // send two tasks through the stage: one to follow existing tasks and a second to follow tasks created by
+        // those existing tasks: tasks won't recursively create more tasks
+        stage.submit(noop).get(5000, TimeUnit.MILLISECONDS);
+        stage.submit(noop).get(5000, TimeUnit.MILLISECONDS);
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Tue Dec 21 22:17:09 2010
@@ -37,7 +37,7 @@ public class BootstrapTest extends Schem
     public void testGetNewNames() throws IOException
     {
         Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Standard1-500-Data.db").toString());
-        PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(new Pair<Long,Long>(0L, 1L)));
+        PendingFile inContext = new PendingFile(null, desc, "Data.db", Arrays.asList(new Pair<Long,Long>(0L, 1L)), OperationType.BOOTSTRAP);
 
         PendingFile outContext = StreamIn.getContextMapping(inContext);
         // filename and generation are expected to have changed

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Tue Dec 21 22:17:09 2010
@@ -85,7 +85,7 @@ public class StreamingTransferTest exten
         ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1"))));
         ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken()));
         StreamOutSession session = StreamOutSession.create(table.name, LOCAL, null);
-        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges);
+        StreamOut.transferSSTables(session, Arrays.asList(sstable), ranges, OperationType.BOOTSTRAP);
         session.await();
 
         // confirm that the SSTable was transferred and registered
@@ -135,7 +135,7 @@ public class StreamingTransferTest exten
         ranges.add(new Range(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("transfer1"))));
         ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("test2")), p.getMinimumToken()));
         StreamOutSession session = StreamOutSession.create(tablename, LOCAL, null);
-        StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges);
+        StreamOut.transferSSTables(session, Arrays.asList(sstable, sstable2), ranges, OperationType.BOOTSTRAP);
         session.await();
 
         // confirm that the SSTable was transferred and registered

Added: cassandra/trunk/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/thrift/ThriftValidationTest.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,20 @@
+package org.apache.cassandra.thrift;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+
+public class ThriftValidationTest extends CleanupHelper
+{
+    @Test(expected=InvalidRequestException.class)
+    public void testValidateCommutativeWithStandard() throws InvalidRequestException
+    {
+        ThriftValidation.validateCommutative("Keyspace1", "Standard1");
+    }
+
+    @Test
+    public void testValidateCommutativeWithCounter() throws InvalidRequestException
+    {
+        ThriftValidation.validateCommutative("Keyspace1", "Counter1");
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java Tue Dec 21 22:17:09 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.utils;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -85,6 +86,78 @@ public class FBUtilitiesTest 
         }
     }
 
+    @Test
+    public void testCopyIntoBytes()
+    {
+        int i = 300;
+        long l = 1000;
+        byte[] b = new byte[20];
+        FBUtilities.copyIntoBytes(b, 0, i);
+        FBUtilities.copyIntoBytes(b, 4, l);
+        assertEquals(i, FBUtilities.byteArrayToInt(b, 0));
+        assertEquals(l, FBUtilities.byteArrayToLong(b, 4));
+    }
+    
+    @Test
+    public void testLongBytesConversions()
+    {
+        // positive, negative, 1 and 2 byte cases, including
+        // a few edges that would foul things up unless you're careful
+        // about masking away sign extension.
+        long[] longs = new long[]
+        {
+            -20L, -127L, -128L, 0L, 1L, 127L, 128L, 65534L, 65535L, -65534L, -65535L,
+            4294967294L, 4294967295L, -4294967294L, -4294967295L
+        };
+
+        for (long l : longs) {
+            byte[] ba = FBUtilities.toByteArray(l);
+            long actual = FBUtilities.byteArrayToLong(ba);
+            assertEquals(l, actual);
+        }
+    }
+  
+    @Test
+    public void testCompareByteSubArrays()
+    {
+        byte[] bytes = new byte[16];
+
+        // handle null
+        assert FBUtilities.compareByteSubArrays(
+                null, 0, null, 0, 0) == 0;
+        assert FBUtilities.compareByteSubArrays(
+                null, 0, FBUtilities.toByteArray(524255231), 0, 4) == -1;
+        assert FBUtilities.compareByteSubArrays(
+                FBUtilities.toByteArray(524255231), 0, null, 0, 4) == 1;
+
+        // handle comparisons
+        FBUtilities.copyIntoBytes(bytes, 3, 524255231);
+        assert FBUtilities.compareByteSubArrays(
+                bytes, 3, FBUtilities.toByteArray(524255231), 0, 4) == 0;
+        assert FBUtilities.compareByteSubArrays(
+                bytes, 3, FBUtilities.toByteArray(524255232), 0, 4) == -1;
+        assert FBUtilities.compareByteSubArrays(
+                bytes, 3, FBUtilities.toByteArray(524255230), 0, 4) == 1;
+
+        // check that incorrect length throws exception
+        try
+        {
+            assert FBUtilities.compareByteSubArrays(
+                    bytes, 3, FBUtilities.toByteArray(524255231), 0, 24) == 0;
+            fail("Should raise an AssertionError.");
+        } catch (AssertionError ae)
+        {
+        }
+        try
+        {
+            assert FBUtilities.compareByteSubArrays(
+                    bytes, 3, FBUtilities.toByteArray(524255231), 0, 12) == 0;
+            fail("Should raise an AssertionError.");
+        } catch (AssertionError ae)
+        {
+        }
+    }
+
     @Test(expected=CharacterCodingException.class)
     public void testDecode() throws IOException
     {



Mime
View raw message