cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1095334 - in /cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra: db/CounterMutationVerbHandler.java db/RowMutation.java service/StorageProxy.java service/StorageProxyMBean.java thrift/CassandraServer.java
Date Wed, 20 Apr 2011 09:19:32 GMT
Author: slebresne
Date: Wed Apr 20 09:19:32 2011
New Revision: 1095334

URL: http://svn.apache.org/viewvc?rev=1095334&view=rev
Log:
Fix batch_mutate with mixed counter/standard mutations
patch by slebresne; reviewed by stuhood for CASSANDRA-2457

Modified:
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxyMBean.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1095334&r1=1095333&r2=1095334&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
Wed Apr 20 09:19:32 2011
@@ -25,12 +25,10 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.service.StorageProxy;
@@ -53,7 +51,8 @@ public class CounterMutationVerbHandler 
             if (logger.isDebugEnabled())
               logger.debug("Applying forwarded " + cm);
 
-            StorageProxy.applyCounterMutationOnLeader(cm);
+            String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+            StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter).get();
             WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
             MessagingService.instance().sendReply(responseMessage, id, message.getFrom());

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java?rev=1095334&r1=1095333&r2=1095334&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/RowMutation.java Wed
Apr 20 09:19:32 2011
@@ -226,27 +226,6 @@ public class RowMutation implements IMut
         return new Message(FBUtilities.getLocalAddress(), verb, getSerializedBuffer(version),
version);
     }
 
-    public static RowMutation getRowMutationFromMutations(String keyspace, ByteBuffer key,
Map<String, List<Mutation>> cfmap)
-    {
-        RowMutation rm = new RowMutation(keyspace, key);
-        for (Map.Entry<String, List<Mutation>> entry : cfmap.entrySet())
-        {
-            String cfName = entry.getKey();
-            for (Mutation mutation : entry.getValue())
-            {
-                if (mutation.deletion != null)
-                {
-                    deleteColumnOrSuperColumnToRowMutation(rm, cfName, mutation.deletion);
-                }
-                if (mutation.column_or_supercolumn != null)
-                {
-                    addColumnOrSuperColumnToRowMutation(rm, cfName, mutation.column_or_supercolumn);
-                }
-            }
-        }
-        return rm;
-    }
-
     public synchronized byte[] getSerializedBuffer(int version) throws IOException
     {
         byte[] preserializedBuffer = preserializedBuffers.get(version);
@@ -288,47 +267,47 @@ public class RowMutation implements IMut
         return buff.append("])").toString();
     }
 
-    private static void addColumnOrSuperColumnToRowMutation(RowMutation rm, String cfName,
ColumnOrSuperColumn cosc)
+    public void addColumnOrSuperColumn(String cfName, ColumnOrSuperColumn cosc)
     {
         if (cosc.super_column != null)
         {
             for (org.apache.cassandra.thrift.Column column : cosc.super_column.columns)
             {
-                rm.add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value,
column.timestamp, column.ttl);
+                add(new QueryPath(cfName, cosc.super_column.name, column.name), column.value,
column.timestamp, column.ttl);
             }
         }
         else if (cosc.column != null)
         {
-            rm.add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp,
cosc.column.ttl);
+            add(new QueryPath(cfName, null, cosc.column.name), cosc.column.value, cosc.column.timestamp,
cosc.column.ttl);
         }
         else if (cosc.counter_super_column != null)
         {
             for (org.apache.cassandra.thrift.CounterColumn column : cosc.counter_super_column.columns)
             {
-                rm.addCounter(new QueryPath(cfName, cosc.counter_super_column.name, column.name),
column.value);
+                addCounter(new QueryPath(cfName, cosc.counter_super_column.name, column.name),
column.value);
             }
         }
         else // cosc.counter_column != null
         {
-            rm.addCounter(new QueryPath(cfName, null, cosc.counter_column.name), cosc.counter_column.value);
+            addCounter(new QueryPath(cfName, null, cosc.counter_column.name), cosc.counter_column.value);
         }
     }
 
-    private static void deleteColumnOrSuperColumnToRowMutation(RowMutation rm, String cfName,
Deletion del)
+    public void deleteColumnOrSuperColumn(String cfName, Deletion del)
     {
         if (del.predicate != null && del.predicate.column_names != null)
         {
             for(ByteBuffer c : del.predicate.column_names)
             {
-                if (del.super_column == null && DatabaseDescriptor.getColumnFamilyType(rm.table_,
cfName) == ColumnFamilyType.Super)
-                    rm.delete(new QueryPath(cfName, c), del.timestamp);
+                if (del.super_column == null && DatabaseDescriptor.getColumnFamilyType(table_,
cfName) == ColumnFamilyType.Super)
+                    delete(new QueryPath(cfName, c), del.timestamp);
                 else
-                    rm.delete(new QueryPath(cfName, del.super_column, c), del.timestamp);
+                    delete(new QueryPath(cfName, del.super_column, c), del.timestamp);
             }
         }
         else
         {
-            rm.delete(new QueryPath(cfName, del.super_column), del.timestamp);
+            delete(new QueryPath(cfName, del.super_column), del.timestamp);
         }
     }
 

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1095334&r1=1095333&r2=1095334&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Wed Apr 20 09:19:32 2011
@@ -67,9 +67,6 @@ public class StorageProxy implements Sto
     private static final LatencyTracker readStats = new LatencyTracker();
     private static final LatencyTracker rangeStats = new LatencyTracker();
     private static final LatencyTracker writeStats = new LatencyTracker();
-    // we keep counter latency appart from normal write because write with
-    // consistency > CL.ONE involves a read in the write path
-    private static final LatencyTracker counterWriteStats = new LatencyTracker();
     private static boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
     private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
     public static final String UNREACHABLE = "UNREACHABLE";
@@ -127,7 +124,7 @@ public class StorageProxy implements Sto
     }
 
     /**
-     * Use this method to have these RowMutations applied
+     * Use this method to have these Mutations applied
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
      * the data across to some other replica.
@@ -135,27 +132,7 @@ public class StorageProxy implements Sto
      * @param mutations the mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
     */
-    public static void mutate(List<RowMutation> mutations, ConsistencyLevel consistency_level)
throws UnavailableException, TimeoutException
-    {
-        write(mutations, consistency_level, standardWritePerformer, true);
-    }
-
-    /**
-     * Perform the write of a batch of mutations given a WritePerformer.
-     * For each mutation, gather the list of write endpoints, apply locally and/or
-     * forward the mutation to said write endpoint (deletaged to the actual
-     * WritePerformer) and wait for the responses based on consistency level.
-     *
-     * @param mutations the mutations to be applied
-     * @param consistency_level the consistency level for the write operation
-     * @param performer the WritePerformer in charge of appliying the mutation
-     * given the list of write endpoints (either standardWritePerformer for
-     * standard writes or counterWritePerformer for counter writes).
-     * @param updateStats whether or not to update the writeStats. This must be
-     * true for standard writes but false for counter writes as the latency of
-     * the latter is tracked in mutateCounters() by counterWriteStats.
-     */
-    public static void write(List<? extends IMutation> mutations, ConsistencyLevel
consistency_level, WritePerformer performer, boolean updateStats) throws UnavailableException,
TimeoutException
+    public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel
consistency_level) throws UnavailableException, TimeoutException
     {
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
 
@@ -168,19 +145,14 @@ public class StorageProxy implements Sto
             for (IMutation mutation : mutations)
             {
                 mostRecentMutation = mutation;
-                String table = mutation.getTable();
-                AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
-
-                Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
-                Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
-
-                final IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints,
hintedEndpoints, consistency_level);
-
-                // exit early if we can't fulfill the CL at this time
-                responseHandler.assureSufficientLiveNodes();
-
-                responseHandlers.add(responseHandler);
-                performer.apply(mutation, hintedEndpoints, responseHandler, localDataCenter,
consistency_level);
+                if (mutation instanceof CounterMutation)
+                {
+                    responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
+                }
+                else
+                {
+                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter,
standardWritePerformer));
+                }
             }
             // wait for writes.  throws timeoutexception if necessary
             for (IWriteResponseHandler responseHandler : responseHandlers)
@@ -195,11 +167,39 @@ public class StorageProxy implements Sto
         }
         finally
         {
-            if (updateStats)
-                writeStats.addNano(System.nanoTime() - startTime);
+            writeStats.addNano(System.nanoTime() - startTime);
         }
     }
 
+    /**
+     * Perform the write of a mutation given a WritePerformer.
+     * Gather the list of write endpoints, apply locally and/or forward the mutation to
+     * said write endpoint (deletaged to the actual WritePerformer) and wait for the
+     * responses based on consistency level.
+     *
+     * @param mutations the mutations to be applied
+     * @param consistency_level the consistency level for the write operation
+     * @param performer the WritePerformer in charge of appliying the mutation
+     * given the list of write endpoints (either standardWritePerformer for
+     * standard writes or counterWritePerformer for counter writes).
+     */
+    public static IWriteResponseHandler performWrite(IMutation mutation, ConsistencyLevel
consistency_level, String localDataCenter, WritePerformer performer) throws UnavailableException,
TimeoutException, IOException
+    {
+        String table = mutation.getTable();
+        AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
+
+        Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
+        Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
+
+        IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints,
hintedEndpoints, consistency_level);
+
+        // exit early if we can't fulfill the CL at this time
+        responseHandler.assureSufficientLiveNodes();
+
+        performer.apply(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+        return responseHandler;
+    }
+
     private static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer
key)
     {
         StorageService ss = StorageService.instance;
@@ -351,13 +351,12 @@ public class StorageProxy implements Sto
     }
 
     /**
-     * The equivalent of mutate() for counters.
-     * (Note that each CounterMutation ship the consistency level)
+     * Handle counter mutation on the coordinator host.
      *
      * A counter mutation needs to first be applied to a replica (that we'll call the leader
for the mutation) before being
      * replicated to the other endpoint. To achieve so, there is two case:
      *   1) the coordinator host is a replica: we proceed to applying the update locally
and replicate throug
-     *   applyCounterMutationOnLeader
+     *   applyCounterMutationOnCoordinator
      *   2) the coordinator is not a replica: we forward the (counter)mutation to a chosen
replica (that will proceed through
      *   applyCounterMutationOnLeader upon receive) and wait for its acknowledgment.
      *
@@ -365,60 +364,31 @@ public class StorageProxy implements Sto
      * quicker response and because the WriteResponseHandlers don't make it easy to send
back an error. We also always gather
      * the write latencies at the coordinator node to make gathering point similar to the
case of standard writes.
      */
-    public static void mutateCounters(List<CounterMutation> mutations) throws UnavailableException,
TimeoutException
+    public static IWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter)
throws UnavailableException, TimeoutException, IOException
     {
-        long startTime = System.nanoTime();
-        ArrayList<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
-
-        CounterMutation mostRecentMutation = null;
-        StorageService ss = StorageService.instance;
-
-        try
-        {
-            for (CounterMutation cm : mutations)
-            {
-                mostRecentMutation = cm;
-                InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key());
-
-                if (endpoint.equals(FBUtilities.getLocalAddress()))
-                {
-                    applyCounterMutationOnCoordinator(cm);
-                }
-                else
-                {
-                    // Exit now if we can't fulfill the CL here instead of forwarding to
the leader replica
-                    String table = cm.getTable();
-                    AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
-                    Collection<InetAddress> writeEndpoints = getWriteEndpoints(table,
cm.key());
-                    Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
-                    rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, cm.consistency()).assureSufficientLiveNodes();
-
-                    // Forward the actual update to the chosen leader replica
-                    IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
-                    responseHandlers.add(responseHandler);
+        InetAddress endpoint = findSuitableEndpoint(cm.getTable(), cm.key());
 
-                    Message message = cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
-                    if (logger.isDebugEnabled())
-                        logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key())
+ " to " + endpoint);
-                    MessagingService.instance().sendRR(message, endpoint, responseHandler);
-                }
-            }
-            // wait for writes.  throws timeoutexception if necessary
-            for (IWriteResponseHandler responseHandler : responseHandlers)
-            {
-                responseHandler.get();
-            }
-        }
-        catch (IOException e)
+        if (endpoint.equals(FBUtilities.getLocalAddress()))
         {
-            if (mostRecentMutation == null)
-                throw new RuntimeException("no mutations were seen but found an error during
write anyway", e);
-            else
-                throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()),
e);
+            return applyCounterMutationOnCoordinator(cm, localDataCenter);
         }
-        finally
+        else
         {
-            counterWriteStats.addNano(System.nanoTime() - startTime);
+            // Exit now if we can't fulfill the CL here instead of forwarding to the leader
replica
+            String table = cm.getTable();
+            AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
+            Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key());
+            Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
+            rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, cm.consistency()).assureSufficientLiveNodes();
+
+            // Forward the actual update to the chosen leader replica
+            IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
+
+            Message message = cm.makeMutationMessage(Gossiper.instance.getVersion(endpoint));
+            if (logger.isDebugEnabled())
+                logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key())
+ " to " + endpoint);
+            MessagingService.instance().sendRR(message, endpoint, responseHandler);
+            return responseHandler;
         }
     }
 
@@ -433,16 +403,16 @@ public class StorageProxy implements Sto
 
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
-    public static void applyCounterMutationOnLeader(CounterMutation cm) throws UnavailableException,
TimeoutException, IOException
+    public static IWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm,
String localDataCenter) throws UnavailableException, TimeoutException, IOException
     {
-        write(Collections.singletonList(cm), cm.consistency(), counterWritePerformer, false);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer);
     }
 
     // Same as applyCounterMutationOnLeader but must with the difference that it use the
MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
-    public static void applyCounterMutationOnCoordinator(CounterMutation cm) throws UnavailableException,
TimeoutException, IOException
+    public static IWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation
cm, String localDataCenter) throws UnavailableException, TimeoutException, IOException
     {
-        write(Collections.singletonList(cm), cm.consistency(), counterWriteOnCoordinatorPerformer,
false);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer);
     }
 
     private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress,
InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String
localDataCenter, final ConsistencyLevel consistency_level, boolean executeOnMutationStage)
@@ -948,31 +918,6 @@ public class StorageProxy implements Sto
         return writeStats.getRecentLatencyHistogramMicros();
     }
 
-    public long getCounterWriteOperations()
-    {
-        return counterWriteStats.getOpCount();
-    }
-
-    public long getTotalCounterWriteLatencyMicros()
-    {
-        return counterWriteStats.getTotalLatencyMicros();
-    }
-
-    public double getRecentCounterWriteLatencyMicros()
-    {
-        return counterWriteStats.getRecentLatencyMicros();
-    }
-
-    public long[] getTotalCounterWriteLatencyHistogramMicros()
-    {
-        return counterWriteStats.getTotalLatencyHistogramMicros();
-    }
-
-    public long[] getRecentCounterWriteLatencyHistogramMicros()
-    {
-        return counterWriteStats.getRecentLatencyHistogramMicros();
-    }
-
     public static List<Row> scan(final String keyspace, String column_family, IndexClause
index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws IOException, TimeoutException, UnavailableException
     {

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1095334&r1=1095333&r2=1095334&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxyMBean.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxyMBean.java
Wed Apr 20 09:19:32 2011
@@ -38,12 +38,6 @@ public interface StorageProxyMBean
     public long[] getTotalWriteLatencyHistogramMicros();
     public long[] getRecentWriteLatencyHistogramMicros();
 
-    public long getCounterWriteOperations();
-    public long getTotalCounterWriteLatencyMicros();
-    public double getRecentCounterWriteLatencyMicros();
-    public long[] getTotalCounterWriteLatencyHistogramMicros();
-    public long[] getRecentCounterWriteLatencyHistogramMicros();
-
     public boolean getHintedHandoffEnabled();
     public void setHintedHandoffEnabled(boolean b);
     public int getMaxHintWindow();

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1095334&r1=1095333&r2=1095334&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java
Wed Apr 20 09:19:32 2011
@@ -440,13 +440,18 @@ public class CassandraServer implements 
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         List<String> cfamsSeen = new ArrayList<String>();
-        List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+        List<IMutation> rowMutations = new ArrayList<IMutation>();
         String keyspace = state().getKeyspace();
 
         for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry:
mutation_map.entrySet())
         {
             ByteBuffer key = mutationEntry.getKey();
 
+            // We need to separate row mutation for standard cf and counter cf (that will
be encapsulated in a
+            // CounterMutation) because it doesn't follow the same code path
+            RowMutation rmStandard = null;
+            RowMutation rmCounter = null;
+
             Map<String, List<Mutation>> columnFamilyToMutations = mutationEntry.getValue();
             for (Map.Entry<String, List<Mutation>> columnFamilyMutations : columnFamilyToMutations.entrySet())
             {
@@ -462,17 +467,37 @@ public class CassandraServer implements 
                 CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, cfName);
                 ThriftValidation.validateKey(metadata, key);
 
+                RowMutation rm;
                 if (metadata.getDefaultValidator().isCommutative())
+                {
                     ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
+                    rmCounter = rmCounter == null ? new RowMutation(keyspace, key) : rmCounter;
+                    rm = rmCounter;
+                }
+                else
+                {
+                    rmStandard = rmStandard == null ? new RowMutation(keyspace, key) : rmStandard;
+                    rm = rmStandard;
+                }
 
                 for (Mutation mutation : columnFamilyMutations.getValue())
                 {
                     ThriftValidation.validateMutation(metadata, mutation);
+
+                    if (mutation.deletion != null)
+                    {
+                        rm.deleteColumnOrSuperColumn(cfName, mutation.deletion);
+                    }
+                    if (mutation.column_or_supercolumn != null)
+                    {
+                        rm.addColumnOrSuperColumn(cfName, mutation.column_or_supercolumn);
+                    }
                 }
             }
-            RowMutation rm = RowMutation.getRowMutationFromMutations(keyspace, key, columnFamilyToMutations);
-            if (!rm.isEmpty())
-                rowMutations.add(rm);
+            if (rmStandard != null && !rmStandard.isEmpty())
+                rowMutations.add(rmStandard);
+            if (rmCounter != null && !rmCounter.isEmpty())
+                rowMutations.add(new org.apache.cassandra.db.CounterMutation(rmCounter, consistency_level));
         }
 
         doInsert(consistency_level, rowMutations);
@@ -500,7 +525,10 @@ public class CassandraServer implements 
         RowMutation rm = new RowMutation(state().getKeyspace(), key);
         rm.delete(new QueryPath(column_path), timestamp); 
 
-        doInsert(consistency_level, Arrays.asList(rm));
+        if (isCommutativeOp)
+            doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
+        else
+            doInsert(consistency_level, Arrays.asList(rm));
     }
 
     public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel
consistency_level)
@@ -511,7 +539,7 @@ public class CassandraServer implements 
         internal_remove(key, column_path, timestamp, consistency_level, false);
     }
 
-    private void doInsert(ConsistencyLevel consistency_level, List<RowMutation> mutations)
throws UnavailableException, TimedOutException
+    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation>
mutations) throws UnavailableException, TimedOutException
     {
         try
         {
@@ -520,22 +548,7 @@ public class CassandraServer implements 
             try
             {
                 if (!mutations.isEmpty())
-                {
-                    // FIXME: Mighty ugly but we've made sure above this will always work
-                    if (mutations.iterator().next().getColumnFamilies().iterator().next().metadata().getDefaultValidator().isCommutative())
-                    {
-                        List<org.apache.cassandra.db.CounterMutation> cmutations =
new ArrayList<org.apache.cassandra.db.CounterMutation>(mutations.size());
-                        for (RowMutation mutation : mutations)
-                        {
-                            cmutations.add(new org.apache.cassandra.db.CounterMutation(mutation,
consistency_level));
-                        }
-                        StorageProxy.mutateCounters(cmutations);
-                    }
-                    else
-                    {
-                        StorageProxy.mutate(mutations, consistency_level);
-                    }
-                }
+                    StorageProxy.mutate(mutations, consistency_level);
             }
             catch (TimeoutException e)
             {
@@ -1045,7 +1058,7 @@ public class CassandraServer implements 
         {
             throw new InvalidRequestException(e.getMessage());
         }
-        doInsert(consistency_level, Arrays.asList(rm));
+        doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
     }
 
     public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)



Mime
View raw message