cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jo...@apache.org
Subject svn commit: r1051679 [5/6] - in /cassandra/trunk: ./ conf/ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/config/ src/java/or...
Date Tue, 21 Dec 2010 22:17:12 GMT
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Tue Dec 21 22:17:09 2010
@@ -19,6 +19,8 @@
 
 package org.apache.cassandra.io.sstable;
 
+import java.io.*;
+import java.net.InetAddress;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -31,6 +33,9 @@ import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AbstractCommutativeType;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -43,6 +48,7 @@ import org.apache.cassandra.io.util.File
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.FBUtilities;
@@ -221,7 +227,7 @@ public class SSTableWriter extends SSTab
         return dataFile.getFilePointer();
     }
     
-    public static Builder createBuilder(Descriptor desc)
+    public static Builder createBuilder(Descriptor desc, OperationType type)
     {
         if (!desc.isLatestVersion)
             // TODO: streaming between different versions will fail: need support for
@@ -229,7 +235,7 @@ public class SSTableWriter extends SSTab
             throw new RuntimeException(String.format("Cannot recover SSTable with version %s (current version %s).",
                                                      desc.version, Descriptor.CURRENT_VERSION));
 
-        return new Builder(desc);
+        return new Builder(desc, type);
     }
 
     /**
@@ -240,16 +246,18 @@ public class SSTableWriter extends SSTab
     {
         private final Descriptor desc;
         public final ColumnFamilyStore cfs;
-        private BufferedRandomAccessFile dfile;
+        private final RowIndexer indexer;
 
-        public Builder(Descriptor desc)
+        public Builder(Descriptor desc, OperationType type)
         {
-
             this.desc = desc;
             cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
             try
             {
-                dfile = new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "r", 8 * 1024 * 1024);
+                if (OperationType.AES == type && cfs.metadata.getDefaultValidator().isCommutative())
+                    indexer = new AESCommutativeRowIndexer(desc, cfs.metadata);
+                else
+                    indexer = new RowIndexer(desc, cfs.metadata);
             }
             catch (IOException e)
             {
@@ -266,47 +274,79 @@ public class SSTableWriter extends SSTab
             assert !ifile.exists();
             assert !ffile.exists();
 
-            EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
-            EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
+            long estimatedRows = indexer.prepareIndexing();
+
+            // build the index and filter
+            long rows = indexer.index();
+
+            logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
+            return SSTableReader.open(rename(desc, SSTable.componentsFor(desc)));
+        }
+
+        public long getTotalBytes()
+        {
+            try
+            {
+                return indexer.dfile.length();
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+        public long getBytesRead()
+        {
+            return indexer.dfile.getFilePointer();
+        }
+
+        public String getTaskType()
+        {
+            return "SSTable rebuild";
+        }
+    }
+
+    static class RowIndexer
+    {
+        protected final Descriptor desc;
+        public final BufferedRandomAccessFile dfile;
+
+        protected IndexWriter iwriter;
+        protected CFMetaData metadata;
+
+        RowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
+        {
+            this(desc, new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "r", 8 * 1024 * 1024), metadata);
+        }
+
+        protected RowIndexer(Descriptor desc, BufferedRandomAccessFile dfile, CFMetaData metadata) throws IOException
+        {
+            this.desc = desc;
+            this.dfile = dfile;
+            this.metadata = metadata;
+        }
 
-            IndexWriter iwriter;
+        long prepareIndexing() throws IOException
+        {
             long estimatedRows;
             try
             {
                 estimatedRows = SSTable.estimateRowsFromData(desc, dfile);
                 iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
+                return estimatedRows;
             }
             catch(IOException e)
             {
                 dfile.close();
                 throw e;
             }
+        }
 
-            // build the index and filter
-            long rows = 0;
+        long index() throws IOException
+        {
             try
             {
-                DecoratedKey key;
-                long rowPosition = 0;
-                while (rowPosition < dfile.length())
-                {
-                    key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, FBUtilities.readShortByteArray(dfile));
-                    iwriter.afterAppend(key, rowPosition);
-
-                    long dataSize = SSTableReader.readRowSize(dfile, desc);
-                    rowPosition = dfile.getFilePointer() + dataSize; // next row
-
-                    IndexHelper.skipBloomFilter(dfile);
-                    IndexHelper.skipIndex(dfile);
-                    ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(cfs.metadata), dfile);
-                    rowSizes.add(dataSize);
-                    columnCounts.add(dfile.readInt());
-
-                    dfile.seek(rowPosition);
-                    rows++;
-                }
-
-                writeStatistics(desc, rowSizes, columnCounts);
+                return doIndexing();
             }
             finally
             {
@@ -320,31 +360,113 @@ public class SSTableWriter extends SSTab
                     throw new IOError(e);
                 }
             }
-
-            logger.debug("estimated row count was %s of real count", ((double)estimatedRows) / rows);
-            return SSTableReader.open(rename(desc, SSTable.componentsFor(desc)));
         }
 
-        public long getTotalBytes()
+        protected long doIndexing() throws IOException
         {
-            try
-            {
-                return dfile.length();
-            }
-            catch (IOException e)
+            EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
+            EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
+            long rows = 0;
+            DecoratedKey key;
+            long rowPosition = 0;
+            while (rowPosition < dfile.length())
             {
-                throw new IOError(e);
+                // read key
+                key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, FBUtilities.readShortByteArray(dfile));
+                iwriter.afterAppend(key, rowPosition);
+
+                // seek to next key
+                long dataSize = SSTableReader.readRowSize(dfile, desc);
+                rowPosition = dfile.getFilePointer() + dataSize;
+                
+                IndexHelper.skipBloomFilter(dfile);
+                IndexHelper.skipIndex(dfile);
+                ColumnFamily.serializer().deserializeFromSSTableNoColumns(ColumnFamily.create(metadata), dfile);
+                rowSizes.add(dataSize);
+                columnCounts.add(dfile.readInt());
+                
+                dfile.seek(rowPosition);
+
+                rows++;
             }
+            return rows;
         }
+    }
 
-        public long getBytesRead()
+    static class AESCommutativeRowIndexer extends RowIndexer
+    {
+        AESCommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws IOException
         {
-            return dfile.getFilePointer();
+            super(desc, new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA), "rw", 8 * 1024 * 1024), metadata);
         }
 
-        public String getTaskType()
+        @Override
+        protected long doIndexing() throws IOException
         {
-            return "SSTable rebuild";
+            EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
+            EstimatedHistogram columnCounts = SSTable.defaultColumnHistogram();
+            long rows = 0L;
+            ByteBuffer diskKey;
+            DecoratedKey key;
+
+            long readRowPosition  = 0L;
+            long writeRowPosition = 0L;
+            while (readRowPosition < dfile.length())
+            {
+                // read key
+                dfile.seek(readRowPosition);
+                diskKey = FBUtilities.readShortByteArray(dfile);
+
+                // skip data size, bloom filter, column index
+                long dataSize = SSTableReader.readRowSize(dfile, desc);
+                dfile.skipBytes(dfile.readInt());
+                dfile.skipBytes(dfile.readInt());
+
+                // deserialize CF
+                ColumnFamily cf = ColumnFamily.create(desc.ksname, desc.cfname);
+                ColumnFamily.serializer().deserializeFromSSTableNoColumns(cf, dfile);
+                ColumnFamily.serializer().deserializeColumns(dfile, cf);
+                rowSizes.add(dataSize);
+                columnCounts.add(cf.getEstimatedColumnCount());
+
+                // remove source node from CF's commutative columns
+                ((AbstractCommutativeType)cf.metadata().getDefaultValidator()).cleanContext(cf, FBUtilities.getLocalAddress());
+
+                readRowPosition = dfile.getFilePointer();
+
+
+                // update index writer
+                key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, diskKey);
+                iwriter.afterAppend(key, writeRowPosition);
+
+
+                // write key
+                dfile.seek(writeRowPosition);
+                FBUtilities.writeShortByteArray(diskKey, dfile);
+
+                // write data size; serialize CF w/ bloom filter, column index
+                long writeSizePosition = dfile.getFilePointer();
+                dfile.writeLong(-1L);
+                ColumnFamily.serializer().serializeWithIndexes(cf, dfile);
+                long writeEndPosition = dfile.getFilePointer();
+                dfile.seek(writeSizePosition);
+                dfile.writeLong(writeEndPosition - (writeSizePosition + 8L));
+
+                writeRowPosition = writeEndPosition;
+
+                rows++;
+
+                dfile.sync();
+            }
+
+            if (writeRowPosition != readRowPosition)
+            {
+                // truncate file to new, reduced length
+                dfile.setLength(writeRowPosition);
+                dfile.sync();
+            }
+
+            return rows;
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Tue Dec 21 22:17:09 2010
@@ -42,6 +42,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.streaming.StreamIn;
 import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
@@ -502,9 +503,9 @@ public class AntiEntropyService
                 Callback callback = new Callback();
                 // send ranges to the remote node
                 StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint, callback);
-                StreamOut.transferSSTables(outsession, sstables, ranges);
+                StreamOut.transferSSTables(outsession, sstables, ranges, OperationType.AES);
                 // request ranges from the remote node
-                StreamIn.requestRanges(request.endpoint, request.cf.left, ranges, callback);
+                StreamIn.requestRanges(request.endpoint, request.cf.left, ranges, callback, OperationType.AES);
             }
             catch(Exception e)
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Dec 21 22:17:09 2010
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.util.*;
 
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AbstractCommutativeType;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 
@@ -52,6 +54,13 @@ public class ReadResponseResolver implem
         this.table = table;
         this.key = StorageService.getPartitioner().decorateKey(key);
     }
+
+    private void checkDigest(DecoratedKey key, ByteBuffer digest, ByteBuffer resultDigest) throws DigestMismatchException
+    {
+        if (resultDigest.equals(digest))
+            return;
+        throw new DigestMismatchException(key, digest, resultDigest);
+    }
     
     /*
       * This method for resolving read data should look at the timestamps of each
@@ -82,21 +91,25 @@ public class ReadResponseResolver implem
             Message message = entry.getKey();
             if (result.isDigestQuery())
             {
-                if (digest == null)
-                {
-                    digest = result.digest();
-                }
-                else
-                {
-                    ByteBuffer digest2 = result.digest();
-                    if (!digest.equals(digest2))
-                        throw new DigestMismatchException(key, digest, digest2);
-                }
+                if (digest != null)
+                    checkDigest(key, digest, result.digest());
+                digest = result.digest();
             }
             else
             {
-                versions.add(result.row().cf);
-                endpoints.add(message.getFrom());
+                ColumnFamily cf = result.row().cf;
+                InetAddress from = message.getFrom();
+                
+                if(cf != null) {
+                    AbstractType defaultValidator = cf.metadata().getDefaultValidator();
+                    if (!FBUtilities.getLocalAddress().equals(from) && cf != null && defaultValidator.isCommutative())
+                    {
+                        cf = cf.cloneMe();
+                        ((AbstractCommutativeType)defaultValidator).cleanContext(cf, FBUtilities.getLocalAddress());
+                    }                   
+                }
+                versions.add(cf);
+                endpoints.add(from);
             }
         }
 
@@ -107,9 +120,7 @@ public class ReadResponseResolver implem
             
             for (ColumnFamily cf : versions)
             {
-                ByteBuffer digest2 = ColumnFamily.digest(cf);
-                if (!digest.equals(digest2))
-                    throw new DigestMismatchException(key, digest, digest2);
+                checkDigest(key, digest, ColumnFamily.digest(cf));
             }
             if (logger_.isDebugEnabled())
                 logger_.debug("digests verified");
@@ -147,6 +158,14 @@ public class ReadResponseResolver implem
 
             // create and send the row mutation message based on the diff
             RowMutation rowMutation = new RowMutation(table, key.key);
+
+            AbstractType defaultValidator = diffCf.metadata().getDefaultValidator();
+            if (defaultValidator.isCommutative())
+                ((AbstractCommutativeType)defaultValidator).cleanContext(diffCf, endpoints.get(i));
+
+            if (diffCf.getColumnsMap().isEmpty() && !diffCf.isMarkedForDelete())
+                continue;
+
             rowMutation.add(diffCf);
             RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
             Message repairMessage;
@@ -170,7 +189,7 @@ public class ReadResponseResolver implem
         {
             if (cf != null)
             {
-                resolved = cf.cloneMe();
+                resolved = cf.cloneMeShallow();
                 break;
             }
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Dec 21 22:17:09 2010
@@ -41,6 +41,8 @@ import org.apache.cassandra.concurrent.S
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AbstractCommutativeType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -117,6 +119,10 @@ public class StorageProxy implements Sto
 
                 responseHandlers.add(responseHandler);
                 Message unhintedMessage = null;
+
+                //XXX: if commutative value, only allow CL.ONE write
+                updateDestinationForCommutativeTypes(consistency_level, rm, hintedEndpoints);
+
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
                 {
                     InetAddress destination = entry.getKey();
@@ -124,6 +130,9 @@ public class StorageProxy implements Sto
 
                     if (targets.size() == 1 && targets.iterator().next().equals(destination))
                     {
+                        // only non-hinted writes are supported
+                        rm.updateCommutativeTypes(destination);
+
                         // unhinted writes
                         if (destination.equals(FBUtilities.getLocalAddress()))
                         {
@@ -180,6 +189,40 @@ public class StorageProxy implements Sto
 
     }
 
+    /**
+     * Update destination endpoints depending on the clock type.
+     */
+    private static void updateDestinationForCommutativeTypes(ConsistencyLevel consistency_level, RowMutation rm,
+            Multimap<InetAddress, InetAddress> destinationEndpoints)
+    {
+        AbstractType defaultValidator = rm.getColumnFamilies().iterator().next().metadata().getDefaultValidator();
+        if (!defaultValidator.isCommutative())
+            return;
+
+        InetAddress randomDestination = pickRandomDestination(destinationEndpoints);
+        destinationEndpoints.clear();
+        destinationEndpoints.put(randomDestination, randomDestination);
+    }
+
+    /**
+     * @param endpoints potential destinations.
+     * @return one destination randomly chosen from the endpoints unless localhost is in the map, then that is returned.
+     */
+    private static InetAddress pickRandomDestination(Multimap<InetAddress, InetAddress> endpoints)
+    {
+        Set<InetAddress> destinationSet = endpoints.keySet();
+
+        if (destinationSet.contains(FBUtilities.getLocalAddress()))
+        {
+            return FBUtilities.getLocalAddress();
+        }
+        else
+        {
+            InetAddress[] destinations = destinationSet.toArray(new InetAddress[0]);
+            return destinations[random.nextInt(destinations.length)];
+        }
+    }
+
     private static void addHintHeader(Message message, InetAddress target) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -203,6 +246,10 @@ public class StorageProxy implements Sto
             {
                 rm.deepCopy().apply();
                 responseHandler.response(null);
+
+                // repair-on-write (local message)
+                ReplicateOnWriteTask replicateOnWriteTask = new ReplicateOnWriteTask(rm);
+                StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(replicateOnWriteTask);
             }
         };
         StageManager.getStage(Stage.MUTATION).execute(runnable);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Dec 21 22:17:09 2010
@@ -69,6 +69,7 @@ import org.apache.cassandra.db.HintedHan
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadRepairVerbHandler;
 import org.apache.cassandra.db.ReadVerbHandler;
+import org.apache.cassandra.db.ReplicateOnWriteVerbHandler;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutationVerbHandler;
 import org.apache.cassandra.db.SchemaCheckVerbHandler;
@@ -101,6 +102,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
+import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.streaming.ReplicationFinishedVerbHandler;
 import org.apache.cassandra.streaming.StreamIn;
 import org.apache.cassandra.streaming.StreamOut;
@@ -165,6 +167,7 @@ public class StorageService implements I
         INDEX_SCAN,
         REPLICATION_FINISHED,
         INTERNAL_RESPONSE, // responses to internal calls
+        REPLICATE_ON_WRITE,
         ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
@@ -193,6 +196,7 @@ public class StorageService implements I
         put(Verb.INDEX_SCAN, Stage.READ);
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
         put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
+        put(Verb.REPLICATE_ON_WRITE, Stage.REPLICATE_ON_WRITE);
     }};
 
 
@@ -278,6 +282,7 @@ public class StorageService implements I
         MessagingService.instance.registerVerbHandlers(Verb.MUTATION, new RowMutationVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.READ, new ReadVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.REPLICATE_ON_WRITE, new ReplicateOnWriteVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.RANGE_SLICE, new RangeSliceVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.INDEX_SCAN, new IndexScanVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
@@ -1004,7 +1009,7 @@ public class StorageService implements I
                 };
                 if (logger_.isDebugEnabled())
                     logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", "));
-                StreamIn.requestRanges(source, table, ranges, callback);
+                StreamIn.requestRanges(source, table, ranges, callback, OperationType.RESTORE_REPLICA_COUNT);
             }
         }
     }
@@ -1608,7 +1613,7 @@ public class StorageService implements I
                     public void run()
                     {
                         // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
-                        StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback);
+                        StreamOut.transferRanges(newEndpoint, table, Arrays.asList(range), callback, OperationType.UNBOOTSTRAP);
                     }
                 });
             }
@@ -1979,6 +1984,7 @@ public class StorageService implements I
                 rcf.comment = cfm.getComment();
                 rcf.keys_cached = cfm.getKeyCacheSize();
                 rcf.read_repair_chance = cfm.getReadRepairChance();
+                rcf.replicate_on_write = cfm.getReplicateOnWrite();
                 rcf.gc_grace_seconds = cfm.getGcGraceSeconds();
                 rcf.rows_cached = cfm.getRowCacheSize();
                 rcf.column_metadata = new RawColumnDefinition[cfm.getColumn_metadata().size()];

Added: cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java?rev=1051679&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/OperationType.java Tue Dec 21 22:17:09 2010
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+/**
+ * Streaming operation type.
+ */
+public enum OperationType
+{
+    AES,
+    BOOTSTRAP,
+    UNBOOTSTRAP,
+    RESTORE_REPLICA_COUNT;
+}
+

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Tue Dec 21 22:17:09 2010
@@ -52,20 +52,23 @@ public class PendingFile
     public final Descriptor desc;
     public final String component;
     public final List<Pair<Long,Long>> sections;
+    public final OperationType type;
     public final long size;
     public long progress;
 
     public PendingFile(Descriptor desc, PendingFile pf)
     {
-        this(null, desc, pf.component, pf.sections);
+        this(null, desc, pf.component, pf.sections, pf.type);
     }
 
-    public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections)
+    public PendingFile(SSTable sstable, Descriptor desc, String component, List<Pair<Long,Long>> sections, OperationType type)
     {
         this.sstable = sstable;
         this.desc = desc;
         this.component = component;
         this.sections = sections;
+        this.type = type;
+
         long tempSize = 0;
         for(Pair<Long,Long> section : sections)
         {
@@ -115,6 +118,7 @@ public class PendingFile
             {
                 dos.writeLong(section.left); dos.writeLong(section.right);
             }
+            dos.writeUTF(sc.type.name());
         }
 
         public PendingFile deserialize(DataInputStream dis) throws IOException
@@ -129,7 +133,8 @@ public class PendingFile
             List<Pair<Long,Long>> sections = new ArrayList<Pair<Long,Long>>(count);
             for (int i = 0; i < count; i++)
                 sections.add(new Pair<Long,Long>(Long.valueOf(dis.readLong()), Long.valueOf(dis.readLong())));
-            return new PendingFile(null, desc, component, sections);
+            OperationType type = OperationType.valueOf(dis.readUTF());
+            return new PendingFile(null, desc, component, sections, type);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Tue Dec 21 22:17:09 2010
@@ -49,19 +49,19 @@ public class StreamIn
     /**
      * Request ranges to be transferred from source to local node
      */
-    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges)
+    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, OperationType type)
     {
-        requestRanges(source, tableName, ranges, null);
+        requestRanges(source, tableName, ranges, null, type);
     }
 
-    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback)
+    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
     {
         assert ranges.size() > 0;
 
         if (logger.isDebugEnabled())
             logger.debug("Requesting from {} ranges {}", source, StringUtils.join(ranges, ", "));
         StreamInSession session = StreamInSession.create(source, callback);
-        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName, session.getSessionId()).makeMessage();
+        Message message = new StreamRequestMessage(FBUtilities.getLocalAddress(), ranges, tableName, session.getSessionId(), type).makeMessage();
         MessagingService.instance.sendOneWay(message, source);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Tue Dec 21 22:17:09 2010
@@ -107,7 +107,7 @@ public class StreamInSession
         if (logger.isDebugEnabled())
             logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
 
-        Future future = CompactionManager.instance.submitSSTableBuild(localFile.desc);
+        Future future = CompactionManager.instance.submitSSTableBuild(localFile.desc, remoteFile.type);
         buildFutures.add(future);
 
         files.remove(remoteFile);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Tue Dec 21 22:17:09 2010
@@ -65,7 +65,7 @@ public class StreamOut
     /**
      * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
     */
-    public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback)
+    public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback, OperationType type)
     {
         assert ranges.size() > 0;
         
@@ -79,7 +79,7 @@ public class StreamOut
         {
             Table table = flushSSTable(tableName);
             // send the matching portion of every sstable in the keyspace
-            transferSSTables(session, table.getAllSSTables(), ranges);
+            transferSSTables(session, table.getAllSSTables(), ranges, type);
         }
         catch (IOException e)
         {
@@ -117,7 +117,7 @@ public class StreamOut
     /**
      * Split out files for all tables on disk locally for each range and then stream them to the target endpoint.
     */
-    public static void transferRangesForRequest(StreamOutSession session, Collection<Range> ranges)
+    public static void transferRangesForRequest(StreamOutSession session, Collection<Range> ranges, OperationType type)
     {
         assert ranges.size() > 0;
 
@@ -128,7 +128,7 @@ public class StreamOut
         {
             Table table = flushSSTable(session.table);
             // send the matching portion of every sstable in the keyspace
-            List<PendingFile> pending = createPendingFiles(table.getAllSSTables(), ranges);
+            List<PendingFile> pending = createPendingFiles(table.getAllSSTables(), ranges, type);
             session.addFilesToStream(pending);
             session.begin();
         }
@@ -141,9 +141,9 @@ public class StreamOut
     /**
      * Transfers matching portions of a group of sstables from a single table to the target endpoint.
      */
-    public static void transferSSTables(StreamOutSession session, Collection<SSTableReader> sstables, Collection<Range> ranges) throws IOException
+    public static void transferSSTables(StreamOutSession session, Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type) throws IOException
     {
-        List<PendingFile> pending = createPendingFiles(sstables, ranges);
+        List<PendingFile> pending = createPendingFiles(sstables, ranges, type);
 
         if (pending.size() > 0)
         {
@@ -157,7 +157,7 @@ public class StreamOut
     }
 
     // called prior to sending anything.
-    private static List<PendingFile> createPendingFiles(Collection<SSTableReader> sstables, Collection<Range> ranges)
+    private static List<PendingFile> createPendingFiles(Collection<SSTableReader> sstables, Collection<Range> ranges, OperationType type)
     {
         List<PendingFile> pending = new ArrayList<PendingFile>();
         for (SSTableReader sstable : sstables)
@@ -166,7 +166,7 @@ public class StreamOut
             List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
             if (sections.isEmpty())
                 continue;
-            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections));
+            pending.add(new PendingFile(sstable, desc, SSTable.COMPONENT_DATA, sections, type));
         }
         logger.info("Stream context metadata {}, {} sstables.", pending, sstables.size());
         return pending;

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java Tue Dec 21 22:17:09 2010
@@ -66,13 +66,15 @@ class StreamRequestMessage
     // if these are specified, file shoud not be.
     protected final Collection<Range> ranges;
     protected final String table;
+    protected final OperationType type;
 
-    StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, long sessionId)
+    StreamRequestMessage(InetAddress target, Collection<Range> ranges, String table, long sessionId, OperationType type)
     {
         this.target = target;
         this.ranges = ranges;
         this.table = table;
         this.sessionId = sessionId;
+        this.type = type;
         file = null;
     }
 
@@ -81,6 +83,7 @@ class StreamRequestMessage
         this.target = target;
         this.file = file;
         this.sessionId = sessionId;
+        this.type = file.type;
         ranges = null;
         table = null;
     }
@@ -114,6 +117,7 @@ class StreamRequestMessage
                 sb.append(range);
                 sb.append(" ");
             }
+            sb.append(type);
         }
         else
         {
@@ -142,6 +146,7 @@ class StreamRequestMessage
                 {
                     AbstractBounds.serializer().serialize(range, dos);
                 }
+                dos.writeUTF(srm.type.name());
             }
         }
 
@@ -164,7 +169,8 @@ class StreamRequestMessage
                 {
                     ranges.add((Range) AbstractBounds.serializer().deserialize(dis));
                 }
-                return new StreamRequestMessage(target, ranges, table, sessionId);
+                OperationType type = OperationType.valueOf(dis.readUTF());
+                return new StreamRequestMessage(target, ranges, table, sessionId, type);
             }
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Tue Dec 21 22:17:09 2010
@@ -52,7 +52,7 @@ public class StreamRequestVerbHandler im
                 logger.debug(srm.toString());
 
             StreamOutSession session = StreamOutSession.create(srm.table, message.getFrom(), srm.sessionId);
-            StreamOut.transferRangesForRequest(session, srm.ranges);
+            StreamOut.transferRangesForRequest(session, srm.ranges, srm.type);
         }
         catch (IOException ex)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Dec 21 22:17:09 2010
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -72,6 +73,7 @@ import org.apache.cassandra.scheduler.IR
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -269,7 +271,6 @@ public class CassandraServer implements 
         logger.debug("multiget_slice");
 
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
-
         return multigetSliceInternal(state().getKeyspace(), keys, column_parent, predicate, consistency_level);
     }
 
@@ -301,11 +302,9 @@ public class CassandraServer implements 
         return getSlice(commands, consistency_level);
     }
 
-    public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
+    private ColumnOrSuperColumn internal_get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
     {
-        logger.debug("get");
-        
         state().hasColumnFamilyAccess(column_path.column_family, Permission.READ);
         String keyspace = state().getKeyspace();
 
@@ -329,6 +328,14 @@ public class CassandraServer implements 
         return tcolumns.get(0);
     }
 
+    public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
+    {
+        logger.debug("get");
+
+        return internal_get(key, column_path, consistency_level);
+    }
+
     public int get_count(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
@@ -356,11 +363,9 @@ public class CassandraServer implements 
         return counts;
     }
 
-    public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
+    private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("insert");
-
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
 
         ThriftValidation.validateKey(key);
@@ -379,11 +384,17 @@ public class CassandraServer implements 
         doInsert(consistency_level, Arrays.asList(rm));
     }
 
-    public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
+    public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        logger.debug("insert");
+
+        internal_insert(key, column_parent, column, consistency_level);
+    }
+
+    private void internal_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("batch_mutate");
-        
         List<String> cfamsSeen = new ArrayList<String>();
 
         List<RowMutation> rowMutations = new ArrayList<RowMutation>();
@@ -415,11 +426,17 @@ public class CassandraServer implements 
         doInsert(consistency_level, rowMutations);
     }
 
-    public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
+    public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
-        logger.debug("remove");
+        logger.debug("batch_mutate");
 
+        internal_batch_mutate(mutation_map, consistency_level);
+    }
+
+    private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
         state().hasColumnFamilyAccess(column_path.column_family, Permission.WRITE);
 
         ThriftValidation.validateKey(key);
@@ -431,6 +448,14 @@ public class CassandraServer implements 
         doInsert(consistency_level, Arrays.asList(rm));
     }
 
+    public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        logger.debug("remove");
+
+        internal_remove(key, column_path, timestamp, consistency_level);
+    }
+
     private void doInsert(ConsistencyLevel consistency_level, List<RowMutation> mutations) throws UnavailableException, TimedOutException
     {
         try
@@ -898,6 +923,7 @@ public class CassandraServer implements 
                               cf_def.row_cache_size,
                               cf_def.key_cache_size,
                               cf_def.read_repair_chance,
+                              cf_def.replicate_on_write,
                               cf_def.isSetGc_grace_seconds() ? cf_def.gc_grace_seconds : CFMetaData.DEFAULT_GC_GRACE_SECONDS,
                               DatabaseDescriptor.getComparator(cf_def.default_validation_class),
                               cf_def.isSetMin_compaction_threshold() ? cf_def.min_compaction_threshold : CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD,
@@ -946,5 +972,190 @@ public class CassandraServer implements 
         return StorageProxy.describeSchemaVersions();
     }
 
+    // counter methods
+
+    private Column getCounterColumn(CounterColumn column)
+    {
+        return new Column(column.name, FBUtilities.toByteBuffer(column.value), System.currentTimeMillis());
+    }
+
+    public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)
+            throws InvalidRequestException, UnavailableException, TimedOutException, TException
+    {
+        logger.debug("add");
+
+        if (ConsistencyLevel.ONE != consistency_level)
+        {
+            throw new InvalidRequestException("Commutative CFs only support ConsistencyLevel.ONE");
+        }
+
+        String keyspace = state().getKeyspace();
+        ThriftValidation.validateCommutative(keyspace, column_parent.column_family);
+
+        internal_insert(key, column_parent, getCounterColumn(column), consistency_level);
+    }
+
+    private Mutation getMutation(CounterMutation counterMutation)
+    {
+        Mutation mutation = new Mutation();
+
+        if (counterMutation.isSetCounter())
+        {
+            Counter counter = counterMutation.counter;
+            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+            if (counter.isSetColumn())
+            {
+                Column c = new Column(counter.column.name, FBUtilities.toByteBuffer(counter.column.value), System.currentTimeMillis());
+                cosc.setColumn(c);
+            }
+
+            if (counter.isSetSuper_column())
+            {
+                List<Column> subcolumns = new ArrayList<Column>(counter.super_column.columns.size());
+                for (CounterColumn subcol : counter.super_column.columns)
+                {
+                    subcolumns.add(new Column(subcol.name, FBUtilities.toByteBuffer(subcol.value), System.currentTimeMillis()));
+                }
+                SuperColumn sc = new SuperColumn(counter.super_column.name, subcolumns);
+                cosc.setSuper_column(sc);
+            }
+            mutation.setColumn_or_supercolumn(cosc);
+        }
+
+        if (counterMutation.isSetDeletion())
+        {
+            Deletion deletion = new Deletion(System.currentTimeMillis());
+            if (counterMutation.deletion.isSetSuper_column())
+                deletion.setSuper_column(counterMutation.deletion.super_column);
+            if (counterMutation.deletion.isSetPredicate())
+                deletion.setPredicate(counterMutation.deletion.predicate);
+            mutation.setDeletion(deletion);
+        }
+
+        return mutation;
+    }
+
+    public void batch_add(Map<ByteBuffer, Map<String, List<CounterMutation>>> updateMap, ConsistencyLevel consistency_level)
+            throws InvalidRequestException, UnavailableException, TimedOutException, TException
+    {
+        logger.debug("batch_add");
+
+        if (ConsistencyLevel.ONE != consistency_level)
+        {
+            throw new InvalidRequestException("Commutative CFs only support ConsistencyLevel.ONE");
+        }
+
+        String keyspace = state().getKeyspace();
+        
+        Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map = new HashMap<ByteBuffer,Map<String,List<Mutation>>>();
+        
+        for (Entry<ByteBuffer, Map<String, List<CounterMutation>>> entry : updateMap.entrySet())
+        {
+            Map<String, List<Mutation>> valueMap = new HashMap<String, List<Mutation>>(entry.getValue().size());
+            
+            for (Entry<String, List<CounterMutation>> innerEntry : entry.getValue().entrySet())
+            {
+                ThriftValidation.validateCommutative(keyspace, innerEntry.getKey());
+                
+                List<Mutation> mutations = new ArrayList<Mutation>(innerEntry.getValue().size());
+                for (CounterMutation cm : innerEntry.getValue())
+                {
+                    mutations.add(getMutation(cm));
+                }
+                valueMap.put(innerEntry.getKey(), mutations);
+            }
+            
+            mutation_map.put(entry.getKey(), valueMap);
+        }
+        
+        internal_batch_mutate(mutation_map, consistency_level);
+    }
+
+    private Counter getCounter(ColumnOrSuperColumn cosc)
+    {
+        if (cosc.isSetColumn()) {
+            return new Counter().setColumn(new CounterColumn(cosc.column.name, cosc.column.value.getLong(cosc.column.value.arrayOffset())));
+        } else if(cosc.isSetSuper_column()) {
+            List<CounterColumn> cc = new ArrayList<CounterColumn>(cosc.super_column.columns.size());
+            for (Column col : cosc.super_column.columns)
+            {
+                cc.add(new CounterColumn(col.name, col.value.getLong(col.value.arrayOffset())));
+            }
+            return new Counter().setSuper_column(new CounterSuperColumn(cosc.super_column.name, cc));
+        }
+        return null;
+    }
+
+    public Counter get_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)
+            throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException
+    {
+        logger.debug("get_counter");
+
+        String keyspace = state().getKeyspace();
+        ThriftValidation.validateCommutative(keyspace, path.column_family);
+
+        return getCounter(internal_get(key, path, consistency_level));
+    }
+    
+    private List<Counter> getCounters(List<ColumnOrSuperColumn> cosc)
+    {
+        List<Counter> rv = new ArrayList<Counter>(cosc.size());
+        for (ColumnOrSuperColumn columnOrSuperColumn : cosc)
+        {
+            Counter c = getCounter(columnOrSuperColumn);
+            if (c != null) {
+                rv.add(c);                
+            }
+        }
+        return rv;
+    }
+
+    public List<Counter> get_counter_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate,
+            ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException,
+            TException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("get_counter_slice");
+
+        String keyspace = state().getKeyspace();
+        ThriftValidation.validateCommutative(keyspace, column_parent.column_family);
+
+        state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+        List<ColumnOrSuperColumn> cosc = multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
+        return getCounters(cosc);
+    }
+
+    public Map<ByteBuffer, List<Counter>> multiget_counter_slice(List<ByteBuffer> keys, ColumnParent column_parent,
+            SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException,
+            UnavailableException, TimedOutException, TException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("multiget_counter_slice");
+
+        String keyspace = state().getKeyspace();
+        ThriftValidation.validateCommutative(keyspace, column_parent.column_family);
+
+        state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+        Map<ByteBuffer, List<ColumnOrSuperColumn>> slices = multigetSliceInternal(state().getKeyspace(), keys, column_parent, predicate, consistency_level);
+        Map<ByteBuffer, List<Counter>> rv = new HashMap<ByteBuffer, List<Counter>>(slices.size());
+        for (Entry<ByteBuffer, List<ColumnOrSuperColumn>> entry : slices.entrySet())
+        {
+            rv.put(entry.getKey(), getCounters(entry.getValue()));
+        }
+        return rv;
+    }
+
+    public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)
+            throws InvalidRequestException, UnavailableException, TimedOutException, TException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("remove_counter");
+
+        String keyspace = state().getKeyspace();
+        ThriftValidation.validateCommutative(keyspace, path.column_family);
+
+        internal_remove(key, path, System.currentTimeMillis(), consistency_level);
+    }
+
     // main method moved to CassandraDaemon
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Tue Dec 21 22:17:09 2010
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Set;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
@@ -432,4 +433,28 @@ public class ThriftValidation
             throw new InvalidRequestException(e.getMessage());
         }
     }
+
+    public static CFMetaData validateCFMetaData(String tablename, String cfName) throws InvalidRequestException
+    {
+        if (cfName.isEmpty())
+        {
+            throw new InvalidRequestException("non-empty columnfamily is required");
+        }
+        CFMetaData metadata = DatabaseDescriptor.getCFMetaData(tablename, cfName);
+        if (metadata == null)
+        {
+            throw new InvalidRequestException("unconfigured columnfamily " + cfName);
+        }
+        return metadata;
+    }
+
+    static void validateCommutative(String tablename, String cfName) throws InvalidRequestException
+    {
+        validateTable(tablename);
+        CFMetaData metadata = validateCFMetaData(tablename, cfName);
+        if (!metadata.getDefaultValidator().isCommutative())
+        {
+            throw new InvalidRequestException("not commutative columnfamily " + cfName);
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Tue Dec 21 22:17:09 2010
@@ -152,6 +152,70 @@ public class FBUtilities
         return new Pair(midpoint, remainder);
     }
 
+    /**
+     * Copy bytes from int into bytes starting from offset.
+     * @param bytes Target array
+     * @param offset Offset into the array
+     * @param i Value to write
+     */
+    public static void copyIntoBytes(byte[] bytes, int offset, int i)
+    {
+        bytes[offset]   = (byte)( ( i >>> 24 ) & 0xFF );
+        bytes[offset+1] = (byte)( ( i >>> 16 ) & 0xFF );
+        bytes[offset+2] = (byte)( ( i >>> 8  ) & 0xFF );
+        bytes[offset+3] = (byte)(   i          & 0xFF );
+    }
+
+    /**
+     * @param i Write this int to an array
+     * @return 4-byte array containing the int
+     */
+    public static byte[] toByteArray(int i)
+    {
+        byte[] bytes = new byte[4];
+        copyIntoBytes(bytes, 0, i);
+        return bytes;
+    }
+
+    /**
+     * @param bytes A byte array containing a serialized integer.
+     * @param offset Start position of the integer in the array.
+     * @return The integer value contained in the byte array.
+     */
+    public static int byteArrayToInt(byte[] bytes, int offset)
+    {
+        if (bytes.length - offset < 4)
+        {
+            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+        }
+        int n = 0;
+        for ( int i = 0; i < 4; ++i )
+        {
+            n <<= 8;
+            n |= bytes[offset + i] & 0xFF;
+        }
+        return n;
+    }
+
+    /**
+     * Convert a byte buffer to an integer.
+     * Does not change the byte buffer position.
+     */
+    public static int byteBufferToInt(ByteBuffer bytes)
+    {
+        if (bytes.remaining() < 4)
+        {
+            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+        }
+        int n = 0;
+        for (int i = 0; i < 4; ++i)
+        {
+            n <<= 8;
+            n |= bytes.array()[bytes.position() + bytes.arrayOffset() + i] & 0xFF;
+        }
+        return n;
+    }
+
     public static ByteBuffer toByteBuffer(int i)
     {
         byte[] bytes = new byte[4];
@@ -162,21 +226,71 @@ public class FBUtilities
         return ByteBuffer.wrap(bytes);
     }
 
-    public static int byteBufferToInt(ByteBuffer bytes)
+    /**
+     * Copy bytes from long into bytes starting from offset.
+     * @param bytes Target array
+     * @param offset Offset into the array
+     * @param l Value to write
+     */
+    public static void copyIntoBytes(byte[] bytes, int offset, long l)
+    {
+        bytes[offset]   = (byte)( ( l >>> 56 ) & 0xFF );
+        bytes[offset+1] = (byte)( ( l >>> 48 ) & 0xFF );
+        bytes[offset+2] = (byte)( ( l >>> 40 ) & 0xFF );
+        bytes[offset+3] = (byte)( ( l >>> 32 ) & 0xFF );
+        bytes[offset+4] = (byte)( ( l >>> 24 ) & 0xFF );
+        bytes[offset+5] = (byte)( ( l >>> 16 ) & 0xFF );
+        bytes[offset+6] = (byte)( ( l >>> 8  ) & 0xFF );
+        bytes[offset+7] = (byte)(   l          & 0xFF );
+    }
+
+    /**
+     * @param l Write this long to an array
+     * @return 8-byte array containing the long
+     */
+    public static byte[] toByteArray(long l)
+    {
+        byte[] bytes = new byte[8];
+        copyIntoBytes(bytes, 0, l);
+        return bytes;
+    }
+    
+    /**
+     * @param bytes A byte array containing a serialized long.
+     * @return The long value contained in the byte array.
+     */
+    public static long byteArrayToLong(byte[] bytes)
     {
-        if (bytes.remaining() < 4 )
+        return byteArrayToLong(bytes, 0);
+    }
+
+    /**
+     * @param bytes A byte array containing a serialized long.
+     * @param offset Start position of the long in the array.
+     * @return The long value contained in the byte array.
+     */
+    public static long byteArrayToLong(byte[] bytes, int offset)
+    {
+        if (bytes.length - offset < 8)
         {
-            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
+            throw new IllegalArgumentException("A long must be 8 bytes in size.");
         }
-        int n = 0;
-        for ( int i = 0; i < 4; ++i )
+        long n = 0;
+        for ( int i = 0; i < 8; ++i )
         {
             n <<= 8;
-            n |= bytes.array()[bytes.position() + bytes.arrayOffset() + i] & 0xFF;
+            n |= bytes[offset + i] & 0xFF;
         }
         return n;
     }
 
+    public static ByteBuffer toByteBuffer(long n)
+    {
+        byte[] bytes = new byte[8];
+        ByteBuffer bb = ByteBuffer.wrap(bytes).putLong(0, n);
+        return bb;
+    }
+
     public static int compareUnsigned(byte[] bytes1, byte[] bytes2, int offset1, int offset2, int len1, int len2)
     {
         if (bytes1 == null)
@@ -196,6 +310,38 @@ public class FBUtilities
         if ((len1 - offset1) == (len2 - offset2)) return 0;
         else return ((len1 - offset1) < (len2 - offset2)) ? -1 : 1;
     }
+  
+    /**
+     * Compare two byte[] at specified offsets for length. Compares the non equal bytes as unsigned.
+     * @param bytes1 First array to compare.
+     * @param offset1 Position to start the comparison at in the first array.
+     * @param bytes2 Second array to compare.
+     * @param offset2 Position to start the comparison at in the second array.
+     * @param length How many bytes to compare?
+     * @return -1 if byte1 is less than byte2, 1 if byte2 is less than byte1 or 0 if equal.
+     */
+    public static int compareByteSubArrays(byte[] bytes1, int offset1, byte[] bytes2, int offset2, int length)
+    {
+        if ( null == bytes1 )
+        {
+            if ( null == bytes2) return 0;
+            else return -1;
+        }
+        if (null == bytes2 ) return 1;
+
+        assert bytes1.length >= (offset1 + length) : "The first byte array isn't long enough for the specified offset and length.";
+        assert bytes2.length >= (offset2 + length) : "The second byte array isn't long enough for the specified offset and length.";
+        for ( int i = 0; i < length; i++ )
+        {
+            byte byte1 = bytes1[offset1+i];
+            byte byte2 = bytes2[offset2+i];
+            if ( byte1 == byte2 )
+                continue;
+            // compare non-equal bytes as unsigned
+            return (byte1 & 0xFF) < (byte2 & 0xFF) ? -1 : 1;
+        }
+        return 0;
+    }
 
     /**
      * @return The bitwise XOR of the inputs. The output will be the same length as the
@@ -496,12 +642,41 @@ public class FBUtilities
         return decoded;
     }
 
-    public static ByteBuffer toByteBuffer(long n)
+    /** 
+     * Thin wrapper around byte[] to provide meaningful equals() and hashCode() operations
+     * caveat: assumed that wrapped byte[] will not be modified
+     */
+    public static final class ByteArrayWrapper
     {
-        byte[] bytes = new byte[8];
-        ByteBuffer bb = ByteBuffer.wrap(bytes).putLong(n);
-        bb.rewind();
-        return bb;
+        public final byte[] data;
+
+        public ByteArrayWrapper(byte[] data)
+        {
+            if ( null == data )
+            {
+                throw new NullPointerException();
+            }
+            this.data = data;
+        }
+
+        public boolean equals(Object other)
+        {
+            if ( !( other instanceof ByteArrayWrapper ) )
+            {
+                return false;
+            }
+            return Arrays.equals(data, ((ByteArrayWrapper)other).data);
+        }
+
+        public int hashCode()
+        {
+            return Arrays.hashCode(data);
+        }
+
+        public String toString()
+        {
+            return ArrayUtils.toString(data);
+        }
     }
 
     public static String resourceToFile(String filename) throws ConfigurationException
@@ -519,6 +694,10 @@ public class FBUtilities
         try
         {
             InputStream in = FBUtilities.class.getClassLoader().getResourceAsStream("org/apache/cassandra/config/version.properties");
+            if (in == null)
+            {
+                return "Unknown";
+            }
             Properties props = new Properties();
             props.load(in);
             return props.getProperty("CassandraVersion");

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Tue Dec 21 22:17:09 2010
@@ -70,6 +70,14 @@ keyspaces:
           column_type: Super
           compare_subcolumns_with: UTF8Type
 
+        - name: Counter1
+          column_type: Standard
+          default_validation_class: CounterColumnType
+
+        - name: SuperCounter1
+          column_type: Super
+          default_validation_class: CounterColumnType
+
         - name: Indexed1
           column_metadata:
             - name: birthdate
@@ -135,3 +143,7 @@ keyspaces:
       replication_factor: 2
       column_families:
         - name: Standard1
+
+        - name: Counter1
+          column_type: Standard
+          default_validation_class: CounterColumnType

Modified: cassandra/trunk/test/system/__init__.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/__init__.py?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/system/__init__.py (original)
+++ cassandra/trunk/test/system/__init__.py Tue Dec 21 22:17:09 2010
@@ -170,6 +170,8 @@ class ThriftTester(BaseTester):
             Cassandra.CfDef('Keyspace1', 'Super2', column_type='Super', subcomparator_type='LongType'), 
             Cassandra.CfDef('Keyspace1', 'Super3', column_type='Super', subcomparator_type='LongType'), 
             Cassandra.CfDef('Keyspace1', 'Super4', column_type='Super', subcomparator_type='UTF8Type'),
+            Cassandra.CfDef('Keyspace1', 'Counter1', default_validation_class='CounterColumnType'),
+            Cassandra.CfDef('Keyspace1', 'SuperCounter1', column_type='Super', default_validation_class='CounterColumnType'),
             Cassandra.CfDef('Keyspace1', 'Indexed1', column_metadata=[Cassandra.ColumnDef('birthdate', 'LongType', Cassandra.IndexType.KEYS, 'birthdate')]),
         ])
 

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Tue Dec 21 22:17:09 2010
@@ -1467,6 +1467,255 @@ class TestMutations(ThriftTester):
             client.describe_ring('system')
         _expect_exception(req, InvalidRequestException)
 
+    def test_incr_decr_standard_add(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 12
+        d2 = -21
+        d3 = 35
+        # insert positive and negative values and check the counts
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(0.1)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv1.column.value == d1
+
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d2), ConsistencyLevel.ONE)
+        time.sleep(0.1)
+        rv2 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv2.column.value == (d1+d2)
+
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d3), ConsistencyLevel.ONE)
+        time.sleep(0.1)
+        rv3 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv3.column.value == (d1+d2+d3)
+
+    def test_incr_decr_super_add(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = -234
+        d2 = 52345
+        d3 = 3123
+
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'),  CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'),  CounterColumn('c2', d2), ConsistencyLevel.ONE)
+        time.sleep(0.1)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1'), ConsistencyLevel.ONE)
+        assert rv1.super_column.columns[0].value == d1
+        assert rv1.super_column.columns[1].value == d2
+
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'),  CounterColumn('c1', d2), ConsistencyLevel.ONE)
+        time.sleep(0.1)
+        rv2 = client.get_counter('key1', ColumnPath('SuperCounter1', 'sc1', 'c1'), ConsistencyLevel.ONE)
+        assert rv2.column.value == (d1+d2)
+
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'),  CounterColumn('c1', d3), ConsistencyLevel.ONE)
+        time.sleep(0.1)
+        rv3 = client.get_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'), ConsistencyLevel.ONE)
+        assert rv3.column.value == (d1+d2+d3)
+
+    def test_incr_standard_remove(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 124
+
+        # insert value and check it exists
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv1.column.value == d1
+
+        # remove the previous column and check that it is gone
+        client.remove_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='Counter1', column='c1'))
+
+        # insert again and this time delete the whole row, check that it is gone
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv2 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv2.column.value == d1
+        client.remove_counter('key1', ColumnPath(column_family='Counter1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='Counter1', column='c1'))
+
+    def test_incr_super_remove(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 52345
+
+        # insert value and check it exists
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'), ConsistencyLevel.ONE)
+        assert rv1.column.value == d1
+
+        # remove the previous column and check that it is gone
+        client.remove_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'))
+
+        # insert again and this time delete the whole row, check that it is gone
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv2 = client.get_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'), ConsistencyLevel.ONE)
+        assert rv2.column.value == d1
+        client.remove_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'))
+
+    def test_incr_decr_standard_remove(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 124
+
+        # insert value and check it exists
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv1.column.value == d1
+
+        # remove the previous column and check that it is gone
+        client.remove_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='Counter1', column='c1'))
+
+        # insert again and this time delete the whole row, check that it is gone
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv2 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv2.column.value == d1
+        client.remove_counter('key1', ColumnPath(column_family='Counter1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='Counter1', column='c1'))
+
+    def test_incr_decr_super_remove(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 52345
+
+        # insert value and check it exists
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'), ConsistencyLevel.ONE)
+        assert rv1.column.value == d1
+
+        # remove the previous column and check that it is gone
+        client.remove_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'))
+
+        # insert again and this time delete the whole row, check that it is gone
+        client.add('key1', ColumnParent(column_family='SuperCounter1', super_column='sc1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv2 = client.get_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'), ConsistencyLevel.ONE)
+        assert rv2.column.value == d1
+        client.remove_counter('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1'), ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='SuperCounter1', super_column='sc1', column='c1'))
+        
+    def test_incr_decr_standard_batch_add(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 12
+        d2 = -21
+        update_map = {'key1': {'Counter1': [
+            CounterMutation(counter=Counter(column=CounterColumn('c1', d1))),
+            CounterMutation(counter=Counter(column=CounterColumn('c1', d2))),
+            ]}}
+        
+        # insert positive and negative values and check the counts
+        client.batch_add(update_map, ConsistencyLevel.ONE)
+        time.sleep(0.1)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv1.column.value == d1+d2
+
+    def test_incr_decr_standard_batch_remove(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 12
+        d2 = -21
+
+        # insert positive and negative values and check the counts
+        update_map = {'key1': {'Counter1': [
+            CounterMutation(counter=Counter(column=CounterColumn('c1', d1))),
+            CounterMutation(counter=Counter(column=CounterColumn('c1', d2))),
+            ]}}
+        client.batch_add(update_map, ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv1 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv1.column.value == d1+d2
+
+        # remove the previous column and check that it is gone
+        update_map = {'key1': {'Counter1': [
+            CounterMutation(deletion=CounterDeletion(predicate=SlicePredicate(column_names=['c1']))),
+            ]}}
+        client.batch_add(update_map, ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='Counter1', column='c1'))
+
+        # insert again and this time delete the whole row, check that it is gone
+        update_map = {'key1': {'Counter1': [
+            CounterMutation(counter=Counter(column=CounterColumn('c1', d1))),
+            CounterMutation(counter=Counter(column=CounterColumn('c1', d2))),
+            ]}}
+        client.batch_add(update_map, ConsistencyLevel.ONE)
+        time.sleep(5)
+        rv2 = client.get_counter('key1', ColumnPath(column_family='Counter1', column='c1'), ConsistencyLevel.ONE)
+        assert rv2.column.value == d1+d2
+
+        update_map = {'key1': {'Counter1': [
+            CounterMutation(deletion=CounterDeletion()),
+            ]}}
+        client.batch_add(update_map, ConsistencyLevel.ONE)
+        time.sleep(5)
+        _assert_no_columnpath('key1', ColumnPath(column_family='Counter1', column='c1'))
+        
+    def test_incr_decr_standard_slice(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 12
+        d2 = -21
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c1', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c2', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c3', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c3', d2), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c4', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c5', d1), ConsistencyLevel.ONE)
+
+        time.sleep(0.1)
+        # insert positive and negative values and check the counts
+        counters = client.get_counter_slice('key1', ColumnParent('Counter1'), SlicePredicate(['c3', 'c4']), ConsistencyLevel.ONE)
+        
+        assert counters[0].column.value == d1+d2
+        assert counters[1].column.value == d1
+         
+    def test_incr_decr_standard_muliget_slice(self):
+        _set_keyspace('Keyspace1')
+
+        d1 = 12
+        d2 = -21
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c2', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c3', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c3', d2), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c4', d1), ConsistencyLevel.ONE)
+        client.add('key1', ColumnParent(column_family='Counter1'), CounterColumn('c5', d1), ConsistencyLevel.ONE)
+
+        client.add('key2', ColumnParent(column_family='Counter1'), CounterColumn('c2', d1), ConsistencyLevel.ONE)
+        client.add('key2', ColumnParent(column_family='Counter1'), CounterColumn('c3', d1), ConsistencyLevel.ONE)
+        client.add('key2', ColumnParent(column_family='Counter1'), CounterColumn('c3', d2), ConsistencyLevel.ONE)
+        client.add('key2', ColumnParent(column_family='Counter1'), CounterColumn('c4', d1), ConsistencyLevel.ONE)
+        client.add('key2', ColumnParent(column_family='Counter1'), CounterColumn('c5', d1), ConsistencyLevel.ONE)
+
+
+        time.sleep(0.1)
+        # insert positive and negative values and check the counts
+        counters = client.multiget_counter_slice(['key1', 'key2'], ColumnParent('Counter1'), SlicePredicate(['c3', 'c4']), ConsistencyLevel.ONE)
+        
+        assert counters['key1'][0].column.value == d1+d2
+        assert counters['key1'][1].column.value == d1   
+        assert counters['key2'][0].column.value == d1+d2
+        assert counters['key2'][1].column.value == d1
+
     def test_index_scan(self):
         _set_keyspace('Keyspace1')
         client.insert('key1', ColumnParent('Indexed1'), Column('birthdate', _i64(1), 0), ConsistencyLevel.ONE)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=1051679&r1=1051678&r2=1051679&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/Util.java Tue Dec 21 22:17:09 2010
@@ -128,6 +128,27 @@ public class Util
         return cfStore.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
     }
 
+    public static byte[] concatByteArrays(byte[] first, byte[]... remaining)
+    {
+        int length = first.length;
+        for (byte[] array : remaining)
+        {
+            length += array.length;
+        }
+
+        byte[] result = new byte[length];
+        System.arraycopy(first, 0, result, 0, first.length);
+        int offset = first.length;
+
+        for (byte[] array : remaining)
+        {
+            System.arraycopy(array, 0, result, offset, array.length);
+            offset += array.length;
+        }
+
+        return result;
+    }
+
     public static ColumnFamily cloneAndRemoveDeleted(ColumnFamily cf, int gcBefore)
     {
         return ColumnFamilyStore.removeDeleted(cf.cloneMe(), gcBefore);



Mime
View raw message