Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 98458 invoked from network); 27 Jan 2011 00:30:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 27 Jan 2011 00:30:04 -0000 Received: (qmail 69113 invoked by uid 500); 27 Jan 2011 00:30:04 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 69095 invoked by uid 500); 27 Jan 2011 00:30:04 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 69082 invoked by uid 99); 27 Jan 2011 00:30:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jan 2011 00:30:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Jan 2011 00:30:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A6A6923888DD; Thu, 27 Jan 2011 00:29:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1063928 [1/2] - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/context/ src/java/org/apache/cassandra/db/marshal/ src/java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/utils/ test/u... Date: Thu, 27 Jan 2011 00:29:40 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110127002940.A6A6923888DD@eris.apache.org> Author: jbellis Date: Thu Jan 27 00:29:39 2011 New Revision: 1063928 URL: http://svn.apache.org/viewvc?rev=1063928&view=rev Log: Fit partitioned counter directly into CounterColumn.value patch by slebresne; reviewed by Kelvin Kakugawa for CASSANDRA-1936 Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/db/Column.java cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterAESCommutativeTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Thu Jan 27 00:29:39 2011 @@ -1,7 +1,7 @@ 0.8-dev * avoid double RowMutation serialization on write path (CASSANDRA-1800) * adds support for columns that act as incr/decr counters - (CASSANDRA-1072, 1937, 1944) + (CASSANDRA-1072, 1937, 1944, 1936) * make NetworkTopologyStrategy the default (CASSANDRA-1960) * configurable internode encryption (CASSANDRA-1567) * human readable column names in sstable2json output (CASSANDRA-1933) Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Thu Jan 27 00:29:39 2011 @@ -132,6 +132,11 @@ public class Column implements IColumn return size(); } + public int serializationFlags() + { + return 0; + } + public void addColumn(IColumn column) { throw new UnsupportedOperationException("This operation is not supported for simple columns."); @@ -155,7 +160,7 @@ public class Column implements IColumn try { buffer.writeLong(timestamp); - buffer.writeByte((isMarkedForDelete()) ? ColumnSerializer.DELETION_MASK : 0); + buffer.writeByte(serializationFlags()); } catch (IOException e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Thu Jan 27 00:29:39 2011 @@ -37,9 +37,10 @@ public class ColumnSerializer implements { private static final Logger logger = LoggerFactory.getLogger(ColumnSerializer.class); - public final static int DELETION_MASK = 0x01; - public final static int EXPIRATION_MASK = 0x02; - public final static int COUNTER_MASK = 0x04; + public final static int DELETION_MASK = 0x01; + public final static int EXPIRATION_MASK = 0x02; + public final static int COUNTER_MASK = 0x04; + public final static int COUNTER_UPDATE_MASK = 0x08; public void serialize(IColumn column, DataOutput dos) { @@ -47,22 +48,16 @@ public class ColumnSerializer implements ByteBufferUtil.writeWithShortLength(column.name(), dos); try { + dos.writeByte(column.serializationFlags()); if (column instanceof CounterColumn) { - dos.writeByte(COUNTER_MASK); dos.writeLong(((CounterColumn)column).timestampOfLastDelete()); - ByteBufferUtil.writeWithShortLength(ByteBuffer.wrap(((CounterColumn)column).partitionedCounter()), dos); } else if (column instanceof ExpiringColumn) { - dos.writeByte(EXPIRATION_MASK); dos.writeInt(((ExpiringColumn) column).getTimeToLive()); dos.writeInt(column.getLocalDeletionTime()); } - else - { - dos.writeByte((column.isMarkedForDelete()) ? DELETION_MASK : 0); - } dos.writeLong(column.timestamp()); ByteBufferUtil.writeWithLength(column.value(), dos); } @@ -82,11 +77,9 @@ public class ColumnSerializer implements if ((b & COUNTER_MASK) != 0) { long timestampOfLastDelete = dis.readLong(); - ByteBuffer pc = ByteBufferUtil.readWithShortLength(dis); - byte[] partitionedCounter = ByteBufferUtil.getArray(pc); - long timestamp = dis.readLong(); + long ts = dis.readLong(); ByteBuffer value = ByteBufferUtil.readWithLength(dis); - return new CounterColumn(name, value, timestamp, partitionedCounter, timestampOfLastDelete); + return new CounterColumn(name, value, ts, timestampOfLastDelete); } else if ((b & EXPIRATION_MASK) != 0) { @@ -112,9 +105,11 @@ public class ColumnSerializer implements { long ts = dis.readLong(); ByteBuffer value = ByteBufferUtil.readWithLength(dis); - return (b & DELETION_MASK) == 0 - ? new Column(name, value, ts) - : new DeletedColumn(name, value, ts); + return (b & COUNTER_UPDATE_MASK) != 0 + ? new CounterUpdateColumn(name, value, ts) + : ((b & DELETION_MASK) == 0 + ? new Column(name, value, ts) + : new DeletedColumn(name, value, ts)); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Thu Jan 27 00:29:39 2011 @@ -40,54 +40,47 @@ public class CounterColumn extends Colum private static CounterContext contextManager = CounterContext.instance(); - protected ByteBuffer value; // NOT final: delta OR total of partitioned counter - protected byte[] partitionedCounter; // NOT final: only modify inline, carefully protected final long timestampOfLastDelete; - public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp) + public CounterColumn(ByteBuffer name, long value, long timestamp) { - this(name, value, timestamp, contextManager.create()); + this(name, contextManager.create(value), timestamp); } - public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, byte[] partitionedCounter) + public CounterColumn(ByteBuffer name, long value, long timestamp, long timestampOfLastDelete) { - this(name, value, timestamp, partitionedCounter, Long.MIN_VALUE); + this(name, contextManager.create(value), timestamp, timestampOfLastDelete); } - public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, byte[] partitionedCounter, long timestampOfLastDelete) + public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp) { - super(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp); - this.value = value; - this.partitionedCounter = partitionedCounter; - this.timestampOfLastDelete = timestampOfLastDelete; + this(name, value, timestamp, Long.MIN_VALUE); } - @Override - public ByteBuffer value() + public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete) { - return value; + super(name, value, timestamp); + this.timestampOfLastDelete = timestampOfLastDelete; } - public byte[] partitionedCounter() + public long timestampOfLastDelete() { - return partitionedCounter; + return timestampOfLastDelete; } - public long timestampOfLastDelete() + public long total() { - return timestampOfLastDelete; + return contextManager.total(value); } @Override public int size() { /* - * An expired column adds to a Column : - * 4 bytes for length of partitionedCounter - * + length of partitionedCounter + * A counter column adds to a Column : * + 8 bytes for timestampOfLastDelete */ - return super.size() + DBConstants.intSize_ + partitionedCounter.length + DBConstants.tsSize_; + return super.size() + DBConstants.tsSize_; } @Override @@ -99,9 +92,7 @@ public class CounterColumn extends Colum return column; if (timestampOfLastDelete() < ((CounterColumn)column).timestampOfLastDelete()) return column; - ContextRelationship rel = contextManager.diff( - ((CounterColumn)column).partitionedCounter(), - partitionedCounter()); + ContextRelationship rel = contextManager.diff(column.value(), value()); if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel) return column; return null; @@ -110,10 +101,7 @@ public class CounterColumn extends Colum @Override public void updateDigest(MessageDigest digest) { - digest.update(name.duplicate()); - digest.update(value.duplicate()); - digest.update(ByteBufferUtil.bytes(timestamp)); - digest.update(partitionedCounter); + super.updateDigest(digest); digest.update(ByteBufferUtil.bytes(timestampOfLastDelete)); } @@ -140,12 +128,7 @@ public class CounterColumn extends Colum return column; } // tombstone > live last delete - return new CounterColumn( - column.name(), - column.value(), - column.timestamp(), - ((CounterColumn)column).partitionedCounter(), - timestamp()); + return new CounterColumn(column.name(), column.value(), column.timestamp(), timestamp()); } } else if (column.isMarkedForDelete()) // live + tombstone: track last tombstone @@ -160,63 +143,27 @@ public class CounterColumn extends Colum return this; } // live last delete < tombstone - return new CounterColumn( - name(), - value(), - timestamp(), - partitionedCounter(), - column.timestamp()); + return new CounterColumn(name(), value(), timestamp(), column.timestamp()); } // live + live: merge clocks; update value - byte[] mergedPartitionedCounter = contextManager.merge( - partitionedCounter(), - ((CounterColumn)column).partitionedCounter()); - ByteBuffer byteBufferValue; - if (0 == mergedPartitionedCounter.length) - { - long mergedValue = value().getLong(value().arrayOffset()) + - column.value().getLong(column.value().arrayOffset()); - byteBufferValue = ByteBufferUtil.bytes(mergedValue); - } else - byteBufferValue = ByteBuffer.wrap(contextManager.total(mergedPartitionedCounter)); return new CounterColumn( name(), - byteBufferValue, + contextManager.merge(value(), column.value()), Math.max(timestamp(), column.timestamp()), - mergedPartitionedCounter, Math.max(timestampOfLastDelete(), ((CounterColumn)column).timestampOfLastDelete())); } @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - CounterColumn column = (CounterColumn)o; - - if (timestamp != column.timestamp) - return false; - - if (timestampOfLastDelete != column.timestampOfLastDelete) - return false; - - if (!Arrays.equals(partitionedCounter, column.partitionedCounter)) - return false; - - if (!name.equals(column.name)) - return false; - - return value.equals(column.value); + // super.equals() returns false if o is not a CounterColumn + return super.equals(o) && timestampOfLastDelete == ((CounterColumn)o).timestampOfLastDelete; } @Override public int hashCode() { int result = super.hashCode(); - result = 31 * result + (partitionedCounter != null ? Arrays.hashCode(partitionedCounter) : 0); result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32)); return result; } @@ -228,7 +175,6 @@ public class CounterColumn extends Colum ByteBufferUtil.clone(name), ByteBufferUtil.clone(value), timestamp, - partitionedCounter, timestampOfLastDelete); } @@ -240,46 +186,31 @@ public class CounterColumn extends Colum sb.append(":"); sb.append(isMarkedForDelete()); sb.append(":"); - sb.append(value.getLong(value.arrayOffset())); + sb.append(contextManager.toString(value)); sb.append("@"); sb.append(timestamp()); sb.append("!"); sb.append(timestampOfLastDelete); - sb.append("@"); - sb.append(contextManager.toString(partitionedCounter)); return sb.toString(); } - private void updateValue() - { - value = ByteBuffer.wrap(contextManager.total(partitionedCounter)); - } - - public void update(InetAddress node) + @Override + public int serializationFlags() { - long delta = value.getLong(value.arrayOffset()); - partitionedCounter = contextManager.update(partitionedCounter, node, delta); - updateValue(); + return ColumnSerializer.COUNTER_MASK; } public CounterColumn cleanNodeCounts(InetAddress node) { - //XXX: inline modification non-destructive; cases: + // use cases: // 1) AES post-stream // 2) RRR, after CF.cloneMe() // 3) RRR, after CF.diff() which creates a new CF - byte[] cleanPartitionedCounter = contextManager.cleanNodeCounts(partitionedCounter, node); - if (cleanPartitionedCounter == partitionedCounter) + ByteBuffer cleanedValue = contextManager.cleanNodeCounts(value, node); + if (cleanedValue == value) // reference equality is enough return this; - if (0 == cleanPartitionedCounter.length) + if (0 == value.remaining()) return null; - return new CounterColumn( - name, - ByteBuffer.wrap(contextManager.total(cleanPartitionedCounter)), - timestamp, - cleanPartitionedCounter, - timestampOfLastDelete - ); + return new CounterColumn(name, cleanedValue, timestamp, timestampOfLastDelete); } } - Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Thu Jan 27 00:29:39 2011 @@ -38,6 +38,7 @@ import org.apache.cassandra.db.marshal.A import org.apache.cassandra.io.ICompactSerializer; import org.apache.cassandra.net.Message; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.thrift.ConsistencyLevel; @@ -163,9 +164,41 @@ public class CounterMutation implements public void apply() throws IOException { - rowMutation.updateCommutativeTypes(FBUtilities.getLocalAddress()); + // We need to transform all CounterUpdateColumn to CounterColumn and we need to deepCopy. Both are done + // below since CUC.asCounterColumn() does a deep copy. + RowMutation rm = new RowMutation(rowMutation.getTable(), ByteBufferUtil.clone(rowMutation.key())); - rowMutation.deepCopy().apply(); + for (ColumnFamily cf_ : rowMutation.getColumnFamilies()) + { + ColumnFamily cf = cf_.cloneMeShallow(); + if (cf_.isSuper()) + { + for (IColumn column : cf_.getSortedColumns()) + { + IColumn sc = ((SuperColumn)column).shallowCopy(); + for (IColumn c : column.getSubColumns()) + { + if (c instanceof CounterUpdateColumn) + sc.addColumn(((CounterUpdateColumn) c).asCounterColumn()); + else + sc.addColumn(c.deepCopy()); + } + cf.addColumn(sc); + } + } + else + { + for (IColumn column : cf_.getSortedColumns()) + { + if (column instanceof CounterUpdateColumn) + cf.addColumn(((CounterUpdateColumn) column).asCounterColumn()); + else + cf.addColumn(column.deepCopy()); + } + } + rm.add(cf); + } + rm.apply(); } @Override Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java Thu Jan 27 00:29:39 2011 @@ -70,4 +70,10 @@ public class DeletedColumn extends Colum { return new DeletedColumn(ByteBufferUtil.clone(name), ByteBufferUtil.clone(value), timestamp); } + + @Override + public int serializationFlags() + { + return ColumnSerializer.DELETION_MASK; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java Thu Jan 27 00:29:39 2011 @@ -91,7 +91,7 @@ public class ExpiringColumn extends Colu try { buffer.writeLong(timestamp); - buffer.writeByte(ColumnSerializer.EXPIRATION_MASK); + buffer.writeByte(serializationFlags()); buffer.writeInt(timeToLive); } catch (IOException e) @@ -135,4 +135,10 @@ public class ExpiringColumn extends Colu throw new IllegalStateException("column is not marked for delete"); } } + + @Override + public int serializationFlags() + { + return ColumnSerializer.EXPIRATION_MASK; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Thu Jan 27 00:29:39 2011 @@ -35,6 +35,7 @@ public interface IColumn public ByteBuffer name(); public int size(); public int serializedSize(); + public int serializationFlags(); public long timestamp(); public ByteBuffer value(); public Collection getSubColumns(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jan 27 00:29:39 2011 @@ -308,20 +308,6 @@ public class RowMutation implements IMut } } - /** - * Update the context of all CounterColumns in this RowMutation - */ - public void updateCommutativeTypes(InetAddress node) - { - for (ColumnFamily cf : modifications_.values()) - { - AbstractType defaultValidator = cf.metadata().getDefaultValidator(); - if (!defaultValidator.isCommutative()) - continue; - ((AbstractCommutativeType)defaultValidator).update(cf, node); - } - } - static RowMutation fromBytes(byte[] raw) throws IOException { RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw))); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Jan 27 00:29:39 2011 @@ -295,6 +295,14 @@ public class SuperColumn implements ICol this.localDeletionTime.set(localDeleteTime); this.markedForDeleteAt.set(timestamp); } + + public IColumn shallowCopy() + { + SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator()); + sc.localDeletionTime = localDeletionTime; + sc.markedForDeleteAt = markedForDeleteAt; + return sc; + } public IColumn deepCopy() { @@ -314,6 +322,11 @@ public class SuperColumn implements ICol { throw new UnsupportedOperationException("This operation is unsupported on super columns."); } + + public int serializationFlags() + { + throw new UnsupportedOperationException("Super columns don't have a serialization mask"); + } } class SuperColumnSerializer implements ICompactSerializer2 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java Thu Jan 27 00:29:39 2011 @@ -19,11 +19,13 @@ package org.apache.cassandra.db.context; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.util.*; import org.apache.commons.lang.ArrayUtils; import org.apache.cassandra.db.DBConstants; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; /** @@ -47,6 +49,7 @@ public class CounterContext implements I { private static final int idLength; private static final byte[] localId; + private static final ByteBuffer wrappedLocalId; private static final int clockLength = DBConstants.longSize_; private static final int countLength = DBConstants.longSize_; private static final int stepLength; // length: id + logical clock + count @@ -60,6 +63,7 @@ public class CounterContext implements I static { localId = FBUtilities.getLocalAddress().getAddress(); + wrappedLocalId = ByteBuffer.wrap(localId); idLength = localId.length; stepLength = idLength + clockLength + countLength; } @@ -70,69 +74,33 @@ public class CounterContext implements I } /** - * Creates an initial counter context. + * Creates an initial counter context with an initial value for the local node with. + * + * @param value the value for this initial update * * @return an empty counter context. */ - public byte[] create() - { - return new byte[0]; - } - - // write a tuple (node id, clock, count) at the front - protected static void writeElement(byte[] context, byte[] id, long clock, long count) - { - writeElementAtStepOffset(context, 0, id, clock, count); - } - - // write a tuple (node id, clock, count) at step offset - protected static void writeElementAtStepOffset(byte[] context, int stepOffset, byte[] id, long clock, long count) + public ByteBuffer create(long value) { - int offset = stepOffset * stepLength; - System.arraycopy(id, 0, context, offset, idLength); - FBUtilities.copyIntoBytes(context, offset + idLength, clock); - FBUtilities.copyIntoBytes(context, offset + idLength + clockLength, count); + ByteBuffer context = ByteBuffer.allocate(stepLength); + writeElementAtOffset(context, 0, localId, 1L, value); + return context; } - public byte[] insertElementAtStepOffset(byte[] context, int stepOffset, byte[] id, long clock, long count) + // For testing purposes + public ByteBuffer create(byte[] id, long clock, long value) { - int offset = stepOffset * stepLength; - byte[] newContext = new byte[context.length + stepLength]; - System.arraycopy(context, 0, newContext, 0, offset); - writeElementAtStepOffset(newContext, stepOffset, id, clock, count); - System.arraycopy(context, offset, newContext, offset + stepLength, context.length - offset); - return newContext; + ByteBuffer context = ByteBuffer.allocate(stepLength); + writeElementAtOffset(context, 0, id, clock, value); + return context; } - public byte[] update(byte[] context, InetAddress node, long delta) + // write a tuple (node id, clock, count) at offset + protected static void writeElementAtOffset(ByteBuffer context, int offset, byte[] id, long clock, long count) { - // calculate node id - byte[] nodeId = node.getAddress(); - int idCount = context.length / stepLength; - - // look for this node id - for (int stepOffset = 0; stepOffset < idCount; ++stepOffset) - { - int offset = stepOffset * stepLength; - int cmp = FBUtilities.compareByteSubArrays(nodeId, 0, context, offset, idLength); - if (cmp == 0) - { - // node id found: increment clock, update count; shift to front - long clock = FBUtilities.byteArrayToLong(context, offset + idLength); - long count = FBUtilities.byteArrayToLong(context, offset + idLength + clockLength); - - writeElementAtStepOffset(context, stepOffset, nodeId, clock + 1L, count + delta); - return context; - } - if (cmp < 0) - { - // id at offset is greater that the one we are updating, inserting - return insertElementAtStepOffset(context, stepOffset, nodeId, 1L, delta); - } - } - - // node id not found: adding at the end - return insertElementAtStepOffset(context, idCount, nodeId, 1L, delta); + System.arraycopy(id, 0, context.array(), offset + context.arrayOffset(), idLength); + context.putLong(offset + idLength, clock); + context.putLong(offset + idLength + clockLength, count); } /** @@ -152,20 +120,20 @@ public class CounterContext implements I * counter context. * @return the ContextRelationship between the contexts. */ - public ContextRelationship diff(byte[] left, byte[] right) + public ContextRelationship diff(ByteBuffer left, ByteBuffer right) { ContextRelationship relationship = ContextRelationship.EQUAL; - int leftIndex = 0; - int rightIndex = 0; - while (leftIndex < left.length && rightIndex < right.length) + int leftIndex = left.position(); + int rightIndex = right.position(); + while (leftIndex < left.remaining() && rightIndex < right.remaining()) { // compare id bytes - int compareId = FBUtilities.compareByteSubArrays(left, leftIndex, right, rightIndex, idLength); + int compareId = ByteBufferUtil.compareSubArrays(left, leftIndex, right, rightIndex, idLength); if (compareId == 0) { - long leftClock = FBUtilities.byteArrayToLong(left, leftIndex + idLength); - long rightClock = FBUtilities.byteArrayToLong(right, rightIndex + idLength); + long leftClock = left.getLong(leftIndex + idLength); + long rightClock = right.getLong(rightIndex + idLength); // advance indexes leftIndex += stepLength; @@ -252,7 +220,7 @@ public class CounterContext implements I } // check final lengths - if (leftIndex < left.length) + if (leftIndex < left.remaining()) { if (relationship == ContextRelationship.EQUAL) { @@ -263,7 +231,7 @@ public class CounterContext implements I return ContextRelationship.DISJOINT; } } - else if (rightIndex < right.length) + else if (rightIndex < right.remaining()) { if (relationship == ContextRelationship.EQUAL) { @@ -278,38 +246,6 @@ public class CounterContext implements I return relationship; } - private class CounterNode - { - public final long clock; - public final long count; - - public CounterNode(long clock, long count) - { - this.clock = clock; - this.count = count; - } - - public int compareClockTo(CounterNode o) - { - if (clock == o.clock) - { - return 0; - } - else if (clock > o.clock) - { - return 1; - } - // clock < o.clock - return -1; - } - - @Override - public String toString() - { - return "(" + clock + "," + count + ")"; - } - } - /** * Return a context w/ an aggregated count for each node id. * @@ -318,22 +254,15 @@ public class CounterContext implements I * @param right * counter context. */ - public byte[] merge(byte[] left, byte[] right) + public ByteBuffer merge(ByteBuffer left, ByteBuffer right) { - if (left.length > right.length) - { - byte[] tmp = right; - right = left; - left = tmp; - } - // Compute size of result int size = 0; - int leftOffset = 0; - int rightOffset = 0; - while ((leftOffset < left.length) && (rightOffset < right.length)) + int leftOffset = left.position(); + int rightOffset = right.position(); + while ((leftOffset < left.limit()) && (rightOffset < right.limit())) { - int cmp = FBUtilities.compareByteSubArrays(left, leftOffset, right, rightOffset, idLength); + int cmp = ByteBufferUtil.compareSubArrays(left, leftOffset, right, rightOffset, idLength); if (cmp == 0) { ++size; @@ -351,35 +280,37 @@ public class CounterContext implements I leftOffset += stepLength; } } - size += (left.length - leftOffset) / stepLength; - size += (right.length - rightOffset) / stepLength; + size += (left.limit() - leftOffset) / stepLength; + size += (right.limit() - rightOffset) / stepLength; - byte[] merged = new byte[size * stepLength]; + ByteBuffer merged = ByteBuffer.allocate(size * stepLength); // Do the actual merge: // a) local id: sum clocks, counts // b) remote id: keep highest clock, count (reconcile) - int mergedOffset = 0; leftOffset = 0; rightOffset = 0; - while ((leftOffset < left.length) && (rightOffset < right.length)) + int mergedOffset = merged.position(); + leftOffset = left.position(); + rightOffset = right.position(); + while ((leftOffset < left.limit()) && (rightOffset < right.limit())) { - int cmp = FBUtilities.compareByteSubArrays(left, leftOffset, right, rightOffset, idLength); + int cmp = ByteBufferUtil.compareSubArrays(left, leftOffset, right, rightOffset, idLength); if (cmp == 0) { // sum for local id, keep highest othewise - long leftClock = FBUtilities.byteArrayToLong(left, leftOffset + idLength); - long rightClock = FBUtilities.byteArrayToLong(right, rightOffset + idLength); - if (FBUtilities.compareByteSubArrays(left, leftOffset, localId, 0, idLength) == 0) - { - long leftCount = FBUtilities.byteArrayToLong(left, leftOffset + idLength + clockLength); - long rightCount = FBUtilities.byteArrayToLong(right, rightOffset + idLength + clockLength); - writeElementAtStepOffset(merged, mergedOffset / stepLength, localId, leftClock + rightClock, leftCount + rightCount); + long leftClock = left.getLong(leftOffset + idLength); + long rightClock = right.getLong(rightOffset + idLength); + if (ByteBufferUtil.compareSubArrays(left, leftOffset, wrappedLocalId, 0, idLength) == 0) + { + long leftCount = left.getLong(leftOffset + idLength + clockLength); + long rightCount = right.getLong(rightOffset + idLength + clockLength); + writeElementAtOffset(merged, mergedOffset, localId, leftClock + rightClock, leftCount + rightCount); } else { if (leftClock >= rightClock) - System.arraycopy(left, leftOffset, merged, mergedOffset, stepLength); + ByteBufferUtil.arrayCopy(left, leftOffset, merged, mergedOffset, stepLength); else - System.arraycopy(right, rightOffset, merged, mergedOffset, stepLength); + ByteBufferUtil.arrayCopy(right, rightOffset, merged, mergedOffset, stepLength); } mergedOffset += stepLength; rightOffset += stepLength; @@ -387,31 +318,31 @@ public class CounterContext implements I } else if (cmp > 0) { - System.arraycopy(right, rightOffset, merged, mergedOffset, stepLength); + ByteBufferUtil.arrayCopy(right, rightOffset, merged, mergedOffset, stepLength); mergedOffset += stepLength; rightOffset += stepLength; } else // cmp < 0 { - System.arraycopy(left, leftOffset, merged, mergedOffset, stepLength); + ByteBufferUtil.arrayCopy(left, leftOffset, merged, mergedOffset, stepLength); mergedOffset += stepLength; leftOffset += stepLength; } } - if (leftOffset < left.length) - System.arraycopy( + if (leftOffset < left.limit()) + ByteBufferUtil.arrayCopy( left, leftOffset, merged, mergedOffset, - left.length - leftOffset); - if (rightOffset < right.length) - System.arraycopy( + left.limit() - leftOffset); + if (rightOffset < right.limit()) + ByteBufferUtil.arrayCopy( right, rightOffset, merged, mergedOffset, - right.length - rightOffset); + right.limit() - rightOffset); return merged; } @@ -423,21 +354,22 @@ public class CounterContext implements I * version context. * @return a human-readable String of the context. */ - public String toString(byte[] context) + public String toString(ByteBuffer context) { StringBuilder sb = new StringBuilder(); sb.append("["); - for (int offset = 0; offset < context.length; offset += stepLength) + for (int offset = context.position(); offset < context.limit(); offset += stepLength) { - if (offset > 0) + if (offset > context.position()) { sb.append(","); } sb.append("{"); try { + int absOffset = context.arrayOffset() + offset; InetAddress address = InetAddress.getByAddress( - ArrayUtils.subarray(context, offset, offset + idLength)); + ArrayUtils.subarray(context.array(), absOffset, absOffset + idLength)); sb.append(address.getHostAddress()); } catch (UnknownHostException uhe) @@ -445,9 +377,9 @@ public class CounterContext implements I sb.append("?.?.?.?"); } sb.append(", "); - sb.append(FBUtilities.byteArrayToLong(context, offset + idLength)); + sb.append(context.getLong(offset + idLength)); sb.append(", "); - sb.append(FBUtilities.byteArrayToLong(context, offset + idLength + clockLength)); + sb.append(context.getLong(offset + idLength + clockLength)); sb.append("}"); } @@ -456,42 +388,42 @@ public class CounterContext implements I } // return an aggregated count across all node ids - public byte[] total(byte[] context) + public long total(ByteBuffer context) { long total = 0L; - for (int offset = 0; offset < context.length; offset += stepLength) + for (int offset = context.position(); offset < context.limit(); offset += stepLength) { - long count = FBUtilities.byteArrayToLong(context, offset + idLength + clockLength); + long count = context.getLong(offset + idLength + clockLength); total += count; } - return FBUtilities.toByteArray(total); + return total; } // remove the count for a given node id - public byte[] cleanNodeCounts(byte[] context, InetAddress node) + public ByteBuffer cleanNodeCounts(ByteBuffer context, InetAddress node) { // calculate node id - byte[] nodeId = node.getAddress(); + ByteBuffer nodeId = ByteBuffer.wrap(node.getAddress()); // look for this node id - for (int offset = 0; offset < context.length; offset += stepLength) + for (int offset = 0; offset < context.remaining(); offset += stepLength) { - int cmp = FBUtilities.compareByteSubArrays(context, offset, nodeId, 0, idLength); + int cmp = ByteBufferUtil.compareSubArrays(context, context.position() + offset, nodeId, 0, idLength); if (cmp < 0) continue; else if (cmp == 0) { // node id found: remove node count - byte[] truncatedContext = new byte[context.length - stepLength]; - System.arraycopy(context, 0, truncatedContext, 0, offset); - System.arraycopy( + ByteBuffer truncatedContext = ByteBuffer.allocate(context.remaining() - stepLength); + ByteBufferUtil.arrayCopy(context, context.position(), truncatedContext, 0, offset); + ByteBufferUtil.arrayCopy( context, - offset + stepLength, + context.position() + offset + stepLength, truncatedContext, offset, - context.length - (offset + stepLength)); + context.remaining() - (offset + stepLength)); return truncatedContext; } else // cmp > 0 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java Thu Jan 27 00:29:39 2011 @@ -17,10 +17,12 @@ */ package org.apache.cassandra.db.context; +import java.nio.ByteBuffer; + /** * An opaque commutative context. * - * Maintains a byte[] context that represents a partitioned commutative value. + * Maintains a ByteBuffer context that represents a partitioned commutative value. */ public interface IContext { @@ -33,13 +35,6 @@ public interface IContext }; /** - * Creates an initial context. - * - * @return the initial context. - */ - public byte[] create(); - - /** * Determine the relationship between two contexts. * * EQUAL: Equal set of nodes and every count is equal. @@ -53,7 +48,7 @@ public interface IContext * context. * @return the ContextRelationship between the contexts. */ - public ContextRelationship diff(byte[] left, byte[] right); + public ContextRelationship diff(ByteBuffer left, ByteBuffer right); /** * Return a context w/ an aggregated count for each node id. @@ -63,7 +58,7 @@ public interface IContext * @param right * context. */ - public byte[] merge(byte[] left, byte[] right); + public ByteBuffer merge(ByteBuffer left, ByteBuffer right); /** * Human-readable String from context. @@ -72,5 +67,5 @@ public interface IContext * context. * @return a human-readable String of the context. */ - public String toString(byte[] context); + public String toString(ByteBuffer context); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java Thu Jan 27 00:29:39 2011 @@ -39,11 +39,6 @@ public abstract class AbstractCommutativ public abstract Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp); /** - * update commutative columns for target node - */ - public abstract void update(IColumnContainer cc, InetAddress node); - - /** * remove target node from commutative columns */ public abstract void cleanContext(IColumnContainer cc, InetAddress node); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java Thu Jan 27 00:29:39 2011 @@ -56,7 +56,7 @@ public class CounterColumnType extends A { throw new MarshalException("A long is exactly 8 bytes"); } - return String.valueOf(bytes.getLong(bytes.position()+bytes.arrayOffset())); + return String.valueOf(bytes.getLong(bytes.position())); } /** @@ -64,27 +64,7 @@ public class CounterColumnType extends A */ public Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp) { - return new CounterColumn(name, value, timestamp); - } - - /** - * update commutative columns for target node - */ - public void update(IColumnContainer cc, InetAddress node) - { - for (IColumn column : cc.getSortedColumns()) - { - if (column instanceof SuperColumn) - { - update((IColumnContainer)column, node); - continue; - } - - if (column instanceof DeletedColumn) - continue; - - ((CounterColumn)column).update(node); - } + return new CounterUpdateColumn(name, value, timestamp); } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Thu Jan 27 00:29:39 2011 @@ -44,6 +44,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.db.migration.*; +import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.dht.*; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.DynamicEndpointSnitch; @@ -1059,12 +1060,12 @@ public class CassandraServer implements private Counter getCounter(ColumnOrSuperColumn cosc) { if (cosc.isSetColumn()) { - return new Counter().setColumn(new CounterColumn(cosc.column.name, cosc.column.value.getLong(cosc.column.value.arrayOffset()))); + return new Counter().setColumn(new CounterColumn(cosc.column.name, CounterContext.instance().total(cosc.column.value))); } else if(cosc.isSetSuper_column()) { List cc = new ArrayList(cosc.super_column.columns.size()); for (Column col : cosc.super_column.columns) { - cc.add(new CounterColumn(col.name, col.value.getLong(col.value.arrayOffset()))); + cc.add(new CounterColumn(col.name, CounterContext.instance().total(col.value))); } return new Counter().setSuper_column(new CounterSuperColumn(cosc.super_column.name, cc)); } Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Thu Jan 27 00:29:39 2011 @@ -235,6 +235,38 @@ public class ByteBufferUtil } } + /** + * Transfer bytes from one ByteBuffer to another. + * This function acts as System.arrayCopy() but for ByteBuffers. + * + * @param src the source ByteBuffer + * @param srcPos starting position in the source ByteBuffer + * @param dst the destination ByteBuffer + * @param dstPos starting position in the destination ByteBuffer + * @param length the number of bytes to copy + */ + public static void arrayCopy(ByteBuffer src, int srcPos, ByteBuffer dst, int dstPos, int length) + { + if (src.hasArray() && dst.hasArray()) + { + System.arraycopy(src.array(), + src.arrayOffset() + srcPos, + dst.array(), + dst.arrayOffset() + dstPos, + length); + } + else + { + if (src.limit() - srcPos < length || dst.limit() - dstPos < length) + throw new IndexOutOfBoundsException(); + + for (int i = 0; i < length; i++) + { + dst.put(dstPos++, src.get(srcPos++)); + } + } + } + public static void writeWithLength(ByteBuffer bytes, DataOutput out) throws IOException { out.writeInt(bytes.remaining()); @@ -399,4 +431,37 @@ public class ByteBufferUtil { return ByteBuffer.wrap(FBUtilities.hexToBytes(str)); } + + /** + * Compare two ByteBuffer at specified offsets for length. + * Compares the non equal bytes as unsigned. + * @param bytes1 First byte buffer to compare. + * @param offset1 Position to start the comparison at in the first array. + * @param bytes2 Second byte buffer to compare. + * @param offset2 Position to start the comparison at in the second array. + * @param length How many bytes to compare? + * @return -1 if byte1 is less than byte2, 1 if byte2 is less than byte1 or 0 if equal. + */ + public static int compareSubArrays(ByteBuffer bytes1, int offset1, ByteBuffer bytes2, int offset2, int length) + { + if ( null == bytes1 ) + { + if ( null == bytes2) return 0; + else return -1; + } + if (null == bytes2 ) return 1; + + assert bytes1.limit() >= offset1 + length : "The first byte array isn't long enough for the specified offset and length."; + assert bytes2.limit() >= offset2 + length : "The second byte array isn't long enough for the specified offset and length."; + for ( int i = 0; i < length; i++ ) + { + byte byte1 = bytes1.get(offset1 + i); + byte byte2 = bytes2.get(offset2 + i); + if ( byte1 == byte2 ) + continue; + // compare non-equal bytes as unsigned + return (byte1 & 0xFF) < (byte2 & 0xFF) ? -1 : 1; + } + return 0; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jan 27 00:29:39 2011 @@ -173,26 +173,6 @@ public class FBUtilities } /** - * @param bytes A byte array containing a serialized integer. - * @param offset Start position of the integer in the array. - * @return The integer value contained in the byte array. - */ - public static int byteArrayToInt(byte[] bytes, int offset) - { - if (bytes.length - offset < 4) - { - throw new IllegalArgumentException("An integer must be 4 bytes in size."); - } - int n = 0; - for ( int i = 0; i < 4; ++i ) - { - n <<= 8; - n |= bytes[offset + i] & 0xFF; - } - return n; - } - - /** * Copy bytes from long into bytes starting from offset. * @param bytes Target array * @param offset Offset into the array @@ -221,36 +201,6 @@ public class FBUtilities return bytes; } - /** - * @param bytes A byte array containing a serialized long. - * @return The long value contained in the byte array. - */ - public static long byteArrayToLong(byte[] bytes) - { - return byteArrayToLong(bytes, 0); - } - - /** - * @param bytes A byte array containing a serialized long. - * @param offset Start position of the long in the array. - * @return The long value contained in the byte array. - */ - public static long byteArrayToLong(byte[] bytes, int offset) - { - if (bytes.length - offset < 8) - { - throw new IllegalArgumentException("A long must be 8 bytes in size."); - } - long n = 0; - for ( int i = 0; i < 8; ++i ) - { - n <<= 8; - - n |= bytes[offset + i] & 0xFF; - } - return n; - } - public static int compareUnsigned(byte[] bytes1, byte[] bytes2, int offset1, int offset2, int len1, int len2) { if (bytes1 == null) @@ -272,38 +222,6 @@ public class FBUtilities } /** - * Compare two byte[] at specified offsets for length. Compares the non equal bytes as unsigned. - * @param bytes1 First array to compare. - * @param offset1 Position to start the comparison at in the first array. - * @param bytes2 Second array to compare. - * @param offset2 Position to start the comparison at in the second array. - * @param length How many bytes to compare? - * @return -1 if byte1 is less than byte2, 1 if byte2 is less than byte1 or 0 if equal. - */ - public static int compareByteSubArrays(byte[] bytes1, int offset1, byte[] bytes2, int offset2, int length) - { - if ( null == bytes1 ) - { - if ( null == bytes2) return 0; - else return -1; - } - if (null == bytes2 ) return 1; - - assert bytes1.length >= (offset1 + length) : "The first byte array isn't long enough for the specified offset and length."; - assert bytes2.length >= (offset2 + length) : "The second byte array isn't long enough for the specified offset and length."; - for ( int i = 0; i < length; i++ ) - { - byte byte1 = bytes1[offset1+i]; - byte byte2 = bytes2[offset2+i]; - if ( byte1 == byte2 ) - continue; - // compare non-equal bytes as unsigned - return (byte1 & 0xFF) < (byte2 & 0xFF) ? -1 : 1; - } - return 0; - } - - /** * @return The bitwise XOR of the inputs. The output will be the same length as the * longer input, but if either input is null, the output will be null. */ Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java Thu Jan 27 00:29:39 2011 @@ -26,6 +26,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.util.Arrays; import org.apache.commons.lang.ArrayUtils; @@ -63,56 +64,12 @@ public class CounterColumnTest { AbstractCommutativeType type = CounterColumnType.instance; long delta = 3L; - CounterColumn column = (CounterColumn)type.createColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(delta), 1L); - assert delta == column.value().getLong(column.value().arrayOffset()); - assert 0 == column.partitionedCounter().length; - - InetAddress node = InetAddress.getByAddress(FBUtilities.toByteArray(1)); - column.update(node); - assert delta == column.value().getLong(column.value().arrayOffset()); - assert 1 == FBUtilities.byteArrayToInt( column.partitionedCounter(), 0*stepLength); - assert 1L == FBUtilities.byteArrayToLong(column.partitionedCounter(), 0*stepLength + idLength); - assert 3L == FBUtilities.byteArrayToLong(column.partitionedCounter(), 0*stepLength + idLength + clockLength); - } - - @Test - public void testUpdate() throws UnknownHostException - { - CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 0L); - assert 0L == c.value().getLong(c.value().arrayOffset()); - - assert c.partitionedCounter().length == 0 : "badly formatted initial context"; - - c.value = ByteBufferUtil.bytes(1L); - c.update(InetAddress.getByAddress(FBUtilities.toByteArray(1))); - assert 1L == c.value().getLong(c.value().arrayOffset()); - - assert c.partitionedCounter().length == stepLength; - - assert 1 == FBUtilities.byteArrayToInt( c.partitionedCounter(), 0*stepLength); - assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength); - assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength + clockLength); - - c.value = ByteBufferUtil.bytes(3L); - c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2))); - - c.value = ByteBufferUtil.bytes(2L); - c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2))); + CounterColumn column = new CounterColumn(ByteBufferUtil.bytes("x"), delta, 1L); - c.value = ByteBufferUtil.bytes(9L); - c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2))); - - assert 15L == c.value().getLong(c.value().arrayOffset()); - - assert c.partitionedCounter().length == (2 * stepLength); - - assert 1 == FBUtilities.byteArrayToInt(c.partitionedCounter(), 0*stepLength); - assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength); - assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength + clockLength); - - assert 2 == FBUtilities.byteArrayToInt(c.partitionedCounter(), 1*stepLength); - assert 3L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 1*stepLength + idLength); - assert 14L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 1*stepLength + idLength + clockLength); + assert delta == column.total(); + assert Arrays.equals(FBUtilities.getLocalAddress().getAddress(), ArrayUtils.subarray(column.value().array(), 0, idLength)); + assert 1L == column.value().getLong(0*stepLength + idLength); + assert delta == column.value().getLong(0*stepLength + idLength + clockLength); } @Test @@ -131,178 +88,171 @@ public class CounterColumnTest // tombstone > live left = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L); - right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 1L); + right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 1L); assert left.reconcile(right) == left; // tombstone < live last delete left = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L); - right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L); + right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L); assert left.reconcile(right) == right; // tombstone == live last delete left = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L); - right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L); + right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L); assert left.reconcile(right) == right; // tombstone > live last delete left = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 4L); - right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 9L, new byte[0], 1L); + right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 9L, 1L); reconciled = left.reconcile(right); assert reconciled.name() == right.name(); assert reconciled.value() == right.value(); assert reconciled.timestamp() == right.timestamp(); - assert ((CounterColumn)reconciled).partitionedCounter() == ((CounterColumn)right).partitionedCounter(); assert ((CounterColumn)reconciled).timestampOfLastDelete() == left.timestamp(); // live < tombstone - left = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 1L); + left = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 1L); right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L); assert left.reconcile(right) == right; // live last delete > tombstone - left = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L); + left = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L); right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L); assert left.reconcile(right) == left; // live last delete == tombstone - left = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L); + left = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L); right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L); assert left.reconcile(right) == left; // live last delete < tombstone - left = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 9L, new byte[0], 1L); + left = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 9L, 1L); right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 4L); reconciled = left.reconcile(right); assert reconciled.name() == left.name(); assert reconciled.value() == left.value(); assert reconciled.timestamp() == left.timestamp(); - assert ((CounterColumn)reconciled).partitionedCounter() == ((CounterColumn)left).partitionedCounter(); assert ((CounterColumn)reconciled).timestampOfLastDelete() == right.timestamp(); // live + live - byte[] context; - context = new byte[0]; - context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 1L); - context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 0L); - context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 1L); - context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 0L); - context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 5L); - left = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 9L, context, 1L); - - context = new byte[0]; - context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 4L); - context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 2L); - right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 3L, context, 4L); + left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(1), 1L, 1L), 4L); + right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(1), 2L, 3L), 1L); reconciled = left.reconcile(right); + assert reconciled.name().equals(left.name()); + assert ((CounterColumn)reconciled).total() == 3L; + assert reconciled.timestamp() == 4L; - assert reconciled.name() == left.name(); - assert 9L == reconciled.value().getLong(reconciled.value().position()); - assert reconciled.timestamp() == 9L; + left = reconciled; + right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(2), 1L, 5L), 2L); - context = ((CounterColumn)reconciled).partitionedCounter(); - assert 3 * stepLength == context.length; + reconciled = left.reconcile(right); - assert 1 == FBUtilities.byteArrayToInt(context, 0*stepLength); - assert 3L == FBUtilities.byteArrayToLong(context, 0*stepLength + idLength); - assert 2L == FBUtilities.byteArrayToLong(context, 0*stepLength + idLength + clockLength); + assert reconciled.name().equals(left.name()); + assert ((CounterColumn)reconciled).total() == 8L; + assert reconciled.timestamp() == 4L; - assert 2 == FBUtilities.byteArrayToInt(context, 1*stepLength); - assert 2L == FBUtilities.byteArrayToLong(context, 1*stepLength + idLength); - assert 5L == FBUtilities.byteArrayToLong(context, 1*stepLength + idLength + clockLength); + left = reconciled; + right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(2), 2L, 2L), 6L); - assert 3 == FBUtilities.byteArrayToInt(context, 2*stepLength); - assert 1L == FBUtilities.byteArrayToLong(context, 2*stepLength + idLength); - assert 2L == FBUtilities.byteArrayToLong(context, 2*stepLength + idLength + clockLength); + reconciled = left.reconcile(right); + assert reconciled.name().equals(left.name()); + assert ((CounterColumn)reconciled).total() == 5L; + assert reconciled.timestamp() == 6L; + + ByteBuffer context = reconciled.value(); + assert 2 * stepLength == context.remaining(); + + assert 1 == context.getInt(0*stepLength); + assert 2L == context.getLong(0*stepLength + idLength); + assert 3L == context.getLong(0*stepLength + idLength + clockLength); + + assert 2 == context.getInt(1*stepLength); + assert 2L == context.getLong(1*stepLength + idLength); + assert 2L == context.getLong(1*stepLength + idLength + clockLength); - assert ((CounterColumn)reconciled).timestampOfLastDelete() == 4L; + assert ((CounterColumn)reconciled).timestampOfLastDelete() == Long.MIN_VALUE; } @Test public void testDiff() throws UnknownHostException { - byte[] left; - byte[] right; + ByteBuffer left; + ByteBuffer right; CounterColumn leftCol; CounterColumn rightCol; // timestamp - left = new byte[0]; - leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left); - - right = new byte[0]; - rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 2L, right); + leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 1L); + rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 2L); assert rightCol == leftCol.diff(rightCol); assert null == rightCol.diff(leftCol); // timestampOfLastDelete - left = new byte[0]; - leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left, 1L); - - right = new byte[0]; - rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right, 2L); + leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 1L, 1L); + rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 1L, 2L); assert rightCol == leftCol.diff(rightCol); assert null == rightCol.diff(leftCol); // equality: equal nodes, all counts same - left = Util.concatByteArrays( + left = ByteBuffer.wrap(Util.concatByteArrays( FBUtilities.toByteArray(3), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(6), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(9), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L) - ); - left = cc.update(left, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 0L); - right = ArrayUtils.clone(left); + )); + //left = cc.update(left, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 0L); + right = ByteBufferUtil.clone(left); - leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left); - rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right); + leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), left, 1L); + rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right, 1L); assert null == leftCol.diff(rightCol); // greater than: left has superset of nodes (counts equal) - left = Util.concatByteArrays( + left = ByteBuffer.wrap(Util.concatByteArrays( FBUtilities.toByteArray(3), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(6), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(9), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(12), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(0L) - ); - right = Util.concatByteArrays( + )); + right = ByteBuffer.wrap(Util.concatByteArrays( FBUtilities.toByteArray(3), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(6), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(9), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L) - ); + )); - leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left); - rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right); + leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), left, 1L); + rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right, 1L); assert null == leftCol.diff(rightCol); // less than: right has subset of nodes (counts equal) assert leftCol == rightCol.diff(leftCol); // disjoint: right and left have disjoint node sets - left = Util.concatByteArrays( + left = ByteBuffer.wrap(Util.concatByteArrays( FBUtilities.toByteArray(3), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(4), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(9), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L) - ); - right = Util.concatByteArrays( + )); + right = ByteBuffer.wrap(Util.concatByteArrays( FBUtilities.toByteArray(3), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(6), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(9), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L) - ); + )); - leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left); - rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right); + leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), left, 1L); + rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right, 1L); assert rightCol == leftCol.diff(rightCol); assert leftCol == rightCol.diff(leftCol); } @@ -310,17 +260,17 @@ public class CounterColumnTest @Test public void testCleanNodeCounts() throws UnknownHostException { - byte[] context = Util.concatByteArrays( + ByteBuffer context = ByteBuffer.wrap(Util.concatByteArrays( FBUtilities.toByteArray(1), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(2), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(4), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(8), FBUtilities.toByteArray(4L), FBUtilities.toByteArray(4L) - ); - CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 1L, context); + )); + CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), context, 1L); CounterColumn d = c.cleanNodeCounts(InetAddress.getByAddress(FBUtilities.toByteArray(4))); - assertEquals(7L, d.value().getLong(d.value().arrayOffset())); + assertEquals(7L, d.total()); } @Test @@ -328,13 +278,13 @@ public class CounterColumnTest { ColumnFamily cf; - byte[] context = Util.concatByteArrays( + ByteBuffer context = ByteBuffer.wrap(Util.concatByteArrays( FBUtilities.toByteArray(1), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(2), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(4), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(8), FBUtilities.toByteArray(4L), FBUtilities.toByteArray(4L) - ); - CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 1L, context); + )); + CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), context, 1L); DataOutputBuffer bufOut = new DataOutputBuffer(); Column.serializer().serialize(original, bufOut); Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java?rev=1063928&r1=1063927&r2=1063928&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java Thu Jan 27 00:29:39 2011 @@ -55,21 +55,21 @@ public class SuperColumnTest FBUtilities.toByteArray(4), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(9L), FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L) ); - sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(cc.total(context)), 3L, context, 0L)); + sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(context), 3L, 0L)); context = concatByteArrays( FBUtilities.toByteArray(2), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(4L), FBUtilities.toByteArray(4), FBUtilities.toByteArray(4L), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(8), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(0L), FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(5L) ); - sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(cc.total(context)), 10L, context, 0L)); + sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(context), 10L, 0L)); context = concatByteArrays( FBUtilities.toByteArray(2), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(3), FBUtilities.toByteArray(6L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(7), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L) ); - sc.addColumn(new CounterColumn(getBytes(2), ByteBuffer.wrap(cc.total(context)), 9L, context, 0L)); + sc.addColumn(new CounterColumn(getBytes(2), ByteBuffer.wrap(context), 9L, 0L)); assertNotNull(sc.getSubColumn(getBytes(1))); assertNull(sc.getSubColumn(getBytes(3))); @@ -82,10 +82,10 @@ public class SuperColumnTest FBUtilities.toByteArray(8), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(0L), FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(12L), FBUtilities.toByteArray(8L) ); - assert 0 == FBUtilities.compareByteSubArrays( - ((CounterColumn)sc.getSubColumn(getBytes(1))).partitionedCounter(), + assert 0 == ByteBufferUtil.compareSubArrays( + ((CounterColumn)sc.getSubColumn(getBytes(1))).value(), 0, - c1, + ByteBuffer.wrap(c1), 0, c1.length); @@ -95,10 +95,10 @@ public class SuperColumnTest FBUtilities.toByteArray(3), FBUtilities.toByteArray(6L), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(7), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L) ); - assert 0 == FBUtilities.compareByteSubArrays( - ((CounterColumn)sc.getSubColumn(getBytes(2))).partitionedCounter(), + assert 0 == ByteBufferUtil.compareSubArrays( + ((CounterColumn)sc.getSubColumn(getBytes(2))).value(), 0, - c2, + ByteBuffer.wrap(c2), 0, c2.length);