cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1060432 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/
Date Tue, 18 Jan 2011 16:07:33 GMT
Author: jbellis
Date: Tue Jan 18 16:07:32 2011
New Revision: 1060432

URL: http://svn.apache.org/viewvc?rev=1060432&view=rev
Log:
support CL.QUORUM, ALL for counters
patch by slebresne; reviewed by Kelvin Kakugawa and jbellis for CASSANDRA-1944

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/db/ReplicateOnWriteTask.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReplicateOnWriteVerbHandler.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jan 18 16:07:32 2011
@@ -1,6 +1,7 @@
 0.8-dev
  * avoid double RowMutation serialization on write path (CASSANDRA-1800)
- * adds support for columns that act as incr/decr counters (CASSANDRA-1072)
+ * adds support for columns that act as incr/decr counters 
+   (CASSANDRA-1072, 1944)
  * make NetworkTopologyStrategy the default (CASSANDRA-1960)
 
 

Added: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1060432&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Tue Jan 18 16:07:32 2011
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractCommutativeType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+
+public class CounterMutation implements IMutation
+{
+    private static final Logger logger = LoggerFactory.getLogger(CounterMutation.class);
+    private static final CounterMutationSerializer serializer = new CounterMutationSerializer();
+
+    private final RowMutation rowMutation;
+    private final ConsistencyLevel consistency;
+
+    public CounterMutation(RowMutation rowMutation, ConsistencyLevel consistency)
+    {
+        this.rowMutation = rowMutation;
+        this.consistency = consistency;
+    }
+
+    public String getTable()
+    {
+        return rowMutation.getTable();
+    }
+
+    public ByteBuffer key()
+    {
+        return rowMutation.key();
+    }
+
+    public RowMutation rowMutation()
+    {
+        return rowMutation;
+    }
+
+    public ConsistencyLevel consistency()
+    {
+        return consistency;
+    }
+
+    public static CounterMutationSerializer serializer()
+    {
+        return serializer;
+    }
+
+    public RowMutation makeReplicationMutation() throws IOException
+    {
+        List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
+        for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+        {
+            if (!columnFamily.metadata().getReplicateOnWrite())
+                continue;
+
+            // CF type: regular
+            if (!columnFamily.isSuper())
+            {
+                QueryPath queryPath = new QueryPath(columnFamily.metadata().cfName);
+                ReadCommand readCommand = new SliceByNamesReadCommand(rowMutation.getTable(), rowMutation.key(), queryPath, columnFamily.getColumnNames());
+                readCommands.add(readCommand);
+                continue;
+            }
+
+            // CF type: super
+            for (IColumn superColumn : columnFamily.getSortedColumns())
+            {
+                QueryPath queryPath = new QueryPath(columnFamily.metadata().cfName, superColumn.name());
+
+                // construct set of sub-column names
+                Collection<IColumn> subColumns = superColumn.getSubColumns();
+                Collection<ByteBuffer> subColNames = new HashSet<ByteBuffer>(subColumns.size());
+                for (IColumn subCol : subColumns)
+                {
+                    subColNames.add(subCol.name());
+                }
+
+                ReadCommand readCommand = new SliceByNamesReadCommand(rowMutation.getTable(), rowMutation.key(), queryPath, subColNames);
+                readCommands.add(readCommand);
+            }
+        }
+
+        // replicate to non-local replicas
+        List<InetAddress> foreignReplicas = StorageService.instance.getLiveNaturalEndpoints(rowMutation.getTable(), rowMutation.key());
+        foreignReplicas.remove(FBUtilities.getLocalAddress()); // remove local replica
+
+        // create a replication RowMutation
+        RowMutation replicationMutation = new RowMutation(rowMutation.getTable(), rowMutation.key());
+        for (ReadCommand readCommand : readCommands)
+        {
+            Table table = Table.open(readCommand.table);
+            Row row = readCommand.getRow(table);
+            AbstractType defaultValidator = row.cf.metadata().getDefaultValidator();
+            if (defaultValidator.isCommutative())
+            {
+                /**
+                 * Clean out contexts for all nodes we're sending the repair to, otherwise,
+                 * we could send a context which is local to one of the foreign replicas,
+                 * which would then incorrectly add that to its own count, because
+                 * local resolution aggregates.
+                 */
+                // note: the following logic could be optimized
+                for (InetAddress foreignNode : foreignReplicas)
+                {
+                    ((AbstractCommutativeType)defaultValidator).cleanContext(row.cf, foreignNode);
+                }
+            }
+            replicationMutation.add(row.cf);
+        }
+        return replicationMutation;
+    }
+
+    public Message makeMutationMessage() throws IOException
+    {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos);
+        serializer().serialize(this, dos);
+        return new Message(FBUtilities.getLocalAddress(), StorageService.Verb.COUNTER_MUTATION, bos.toByteArray());
+    }
+
+    public boolean shouldReplicateOnWrite()
+    {
+        for (ColumnFamily cf : rowMutation.getColumnFamilies())
+            if (cf.metadata().getReplicateOnWrite())
+                return true;
+        return false;
+    }
+
+    public void apply() throws IOException
+    {
+        rowMutation.updateCommutativeTypes(FBUtilities.getLocalAddress());
+
+        rowMutation.deepCopy().apply();
+    }
+
+    @Override
+    public String toString()
+    {
+        return toString(false);
+    }
+
+    public String toString(boolean shallow)
+    {
+        StringBuilder buff = new StringBuilder("CounterMutation(");
+        buff.append(rowMutation.toString(shallow));
+        buff.append(", ").append(consistency.toString());
+        return buff.append(")").toString();
+    }
+}
+
+class CounterMutationSerializer implements ICompactSerializer<CounterMutation>
+{
+    public void serialize(CounterMutation cm, DataOutputStream dos) throws IOException
+    {
+        RowMutation.serializer().serialize(cm.rowMutation(), dos);
+        dos.writeUTF(cm.consistency().name());
+    }
+
+    public CounterMutation deserialize(DataInputStream dis) throws IOException
+    {
+        RowMutation rm = RowMutation.serializer().deserialize(dis);
+        ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, dis.readUTF());
+        return new CounterMutation(rm, consistency);
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java?rev=1060432&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java Tue Jan 18 16:07:32 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.util.concurrent.TimeoutException;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+public class CounterMutationVerbHandler implements IVerbHandler
+{
+    private static Logger logger = LoggerFactory.getLogger(CounterMutationVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        byte[] bytes = message.getMessageBody();
+        ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
+
+        try
+        {
+            DataInputStream is = new DataInputStream(buffer);
+            CounterMutation cm = CounterMutation.serializer().deserialize(is);
+            if (logger.isDebugEnabled())
+              logger.debug("Applying forwarded " + cm);
+
+            StorageProxy.applyCounterMutationOnLeader(cm);
+            WriteResponse response = new WriteResponse(cm.getTable(), cm.key(), true);
+            Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
+            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+        }
+        catch (UnavailableException e)
+        {
+            // We check for UnavailableException in the coordinator not. It is
+            // hence reasonable to let the coordinator timeout in the very
+            // unlikely case we arrive here
+        }
+        catch (TimeoutException e)
+        {
+            // The coordinator node will have timeout itself so we let that goes
+        }
+        catch (IOException e)
+        {
+            logger.error("Error in counter mutation", e);
+        }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java?rev=1060432&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IMutation.java Tue Jan 18 16:07:32 2011
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+import org.apache.cassandra.net.Message;
+
+public interface IMutation
+{
+    public String getTable();
+    public ByteBuffer key();
+    public String toString(boolean shallow);
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Jan 18 16:07:32 2011
@@ -40,7 +40,7 @@ import org.apache.cassandra.thrift.Mutat
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class RowMutation
+public class RowMutation implements IMutation
 {
     private static RowMutationSerializer serializer_;
     public static final String HINT = "HINT";

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Jan 18 16:07:32 2011
@@ -79,10 +79,6 @@ public class RowMutationVerbHandler impl
             if (logger_.isDebugEnabled())
               logger_.debug(rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
             MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
-
-            // repair-on-write (remote message)
-            ReplicateOnWriteTask replicateOnWriteTask = new ReplicateOnWriteTask(rm);
-            StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(replicateOnWriteTask);
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Jan 18 16:07:32 2011
@@ -41,7 +41,6 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.IPartitioner;
@@ -52,8 +51,12 @@ import org.apache.cassandra.locator.Toke
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.UnavailableException;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -68,9 +71,15 @@ public class StorageProxy implements Sto
     private static final LatencyTracker readStats = new LatencyTracker();
     private static final LatencyTracker rangeStats = new LatencyTracker();
     private static final LatencyTracker writeStats = new LatencyTracker();
+    // we keep counter latency appart from normal write because write with
+    // consistency > CL.ONE involves a read in the write path
+    private static final LatencyTracker counterWriteStats = new LatencyTracker();
     private static boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
     private static final String UNREACHABLE = "UNREACHABLE";
 
+    private static final WritePerformer standardWritePerformer;
+    private static final WritePerformer counterWritePerformer;
+
     private StorageProxy() {}
     static
     {
@@ -83,6 +92,23 @@ public class StorageProxy implements Sto
         {
             throw new RuntimeException(e);
         }
+
+        standardWritePerformer = new WritePerformer()
+        {
+            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter) throws IOException
+            {
+                assert mutation instanceof RowMutation;
+                sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, responseHandler, localDataCenter, true);
+            }
+        };
+
+        counterWritePerformer = new WritePerformer()
+        {
+            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter) throws IOException
+            {
+                applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter);
+            }
+        };
     }
 
     /**
@@ -96,157 +122,147 @@ public class StorageProxy implements Sto
     */
     public static void mutate(List<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
     {
+        write(mutations, consistency_level, standardWritePerformer, true);
+    }
+
+    /**
+     * Perform the write of a batch of mutations given a WritePerformer.
+     * For each mutation, gather the list of write endpoints, apply locally and/or
+     * forward the mutation to said write endpoint (deletaged to the actual
+     * WritePerformer) and wait for the responses based on consistency level.
+     *
+     * @param mutations the mutations to be applied
+     * @param consistency_level the consistency level for the write operation
+     * @param performer the WritePerformer in charge of appliying the mutation
+     * given the list of write endpoints (either standardWritePerformer for
+     * standard writes or counterWritePerformer for counter writes).
+     * @param updateStats whether or not to update the writeStats. This must be
+     * true for standard writes but false for counter writes as the latency of
+     * the latter is tracked in mutateCounters() by counterWriteStats.
+     */
+    public static void write(List<? extends IMutation> mutations, ConsistencyLevel consistency_level, WritePerformer performer, boolean updateStats) throws UnavailableException, TimeoutException
+    {
+        final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
+
         long startTime = System.nanoTime();
         List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
 
-        RowMutation mostRecentRowMutation = null;
-        StorageService ss = StorageService.instance;
-        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getLocalAddress());
-        
+        IMutation mostRecentMutation = null;
         try
         {
-            for (RowMutation rm : mutations)
+            for (IMutation mutation : mutations)
             {
-                mostRecentRowMutation = rm;
-                String table = rm.getTable();
+                mostRecentMutation = mutation;
+                String table = mutation.getTable();
                 AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
 
-                List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, rm.key());
-                Collection<InetAddress> writeEndpoints = ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table, naturalEndpoints);
+                Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
                 Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
-                
+
                 final IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
-                
+
                 // exit early if we can't fulfill the CL at this time
                 responseHandler.assureSufficientLiveNodes();
-                
-                responseHandlers.add(responseHandler);
-                
-                // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-                Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
-                Message unhintedMessage = null;
-
-                //XXX: if commutative value, only allow CL.ONE write
-                updateDestinationForCommutativeTypes(consistency_level, rm, hintedEndpoints);
-
-                for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
-                {
-                    InetAddress destination = entry.getKey();
-                    Collection<InetAddress> targets = entry.getValue();
-
-                    String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-
-                    if (targets.size() == 1 && targets.iterator().next().equals(destination))
-                    {
-                        // only non-hinted writes are supported
-                        rm.updateCommutativeTypes(destination);
-
-                        // unhinted writes
-                        if (destination.equals(FBUtilities.getLocalAddress()))
-                        {
-                            insertLocalMessage(rm, responseHandler);
-                        }
-                        else
-                        {
-                            // belongs on a different server
-                            if (unhintedMessage == null)
-                            {
-                                unhintedMessage = rm.makeRowMutationMessage();
-                                MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId());
-                            }
-                            if (logger.isDebugEnabled())
-                                logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
-                            
-                            
-                            Multimap<Message, InetAddress> messages = dcMessages.get(dc);
-                            if (messages == null)
-                            {
-                               messages = HashMultimap.create();
-                               dcMessages.put(dc, messages);
-                            }
-                            
-                            messages.put(unhintedMessage, destination);
-                        }
-                    }
-                    else
-                    {
-                        // hinted
-                        Message hintedMessage = rm.makeRowMutationMessage();
-                        for (InetAddress target : targets)
-                        {
-                            if (!target.equals(destination))
-                            {
-                                addHintHeader(hintedMessage, target);
-                                if (logger.isDebugEnabled())
-                                    logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
-                            }
-                        }
-                        responseHandler.addHintCallback(hintedMessage, destination);
-                        
-                        Multimap<Message, InetAddress> messages = dcMessages.get(dc);
-                        
-                        if (messages == null)
-                        {
-                           messages = HashMultimap.create();
-                           dcMessages.put(dc, messages);
-                        }
-                        
-                        messages.put(hintedMessage, destination);
-                    }
-                }
 
-                sendMessages(localDataCenter, dcMessages);
+                responseHandlers.add(responseHandler);
+                performer.apply(mutation, hintedEndpoints, responseHandler, localDataCenter);
             }
-                        
             // wait for writes.  throws timeoutexception if necessary
             for (IWriteResponseHandler responseHandler : responseHandlers)
+            {
                 responseHandler.get();
+            }
         }
         catch (IOException e)
         {
-            if (mostRecentRowMutation == null)
-                throw new RuntimeException("no mutations were seen but found an error during write anyway", e);
-            else
-                throw new RuntimeException("error writing key " + FBUtilities.bytesToHex(mostRecentRowMutation.key()), e);
+            assert mostRecentMutation != null;
+            throw new RuntimeException("error writing key " + FBUtilities.bytesToHex(mostRecentMutation.key()), e);
         }
         finally
         {
-            writeStats.addNano(System.nanoTime() - startTime);
+            if (updateStats)
+                writeStats.addNano(System.nanoTime() - startTime);
         }
     }
 
-    /**
-     * Update destination endpoints depending on the clock type.
-     */
-    private static void updateDestinationForCommutativeTypes(ConsistencyLevel consistency_level, RowMutation rm,
-            Multimap<InetAddress, InetAddress> destinationEndpoints)
+    private static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer key)
     {
-        AbstractType defaultValidator = rm.getColumnFamilies().iterator().next().metadata().getDefaultValidator();
-        if (!defaultValidator.isCommutative())
-            return;
-
-        InetAddress randomDestination = pickRandomDestination(destinationEndpoints);
-        destinationEndpoints.clear();
-        destinationEndpoints.put(randomDestination, randomDestination);
+        StorageService ss = StorageService.instance;
+        List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, key);
+        return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key), table, naturalEndpoints);
     }
 
-    /**
-     * @param endpoints potential destinations.
-     * @return one destination randomly chosen from the endpoints unless localhost is in the map, then that is returned.
-     */
-    private static InetAddress pickRandomDestination(Multimap<InetAddress, InetAddress> endpoints)
+    private static void sendToHintedEndpoints(RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages)
+    throws IOException
     {
-        Set<InetAddress> destinationSet = endpoints.keySet();
+        // Multimap that holds onto all the messages and addresses meant for a specific datacenter
+        Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
+        Message unhintedMessage = null;
 
-        if (destinationSet.contains(FBUtilities.getLocalAddress()))
+        for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
         {
-            return FBUtilities.getLocalAddress();
-        }
-        else
-        {
-            InetAddress[] destinations = destinationSet.toArray(new InetAddress[0]);
-            return destinations[random.nextInt(destinations.length)];
+            InetAddress destination = entry.getKey();
+            Collection<InetAddress> targets = entry.getValue();
+
+            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+
+            if (targets.size() == 1 && targets.iterator().next().equals(destination))
+            {
+                // unhinted writes
+                if (destination.equals(FBUtilities.getLocalAddress()))
+                {
+                    if (insertLocalMessages)
+                        insertLocalMessage(rm, responseHandler);
+                }
+                else
+                {
+                    // belongs on a different server
+                    if (unhintedMessage == null)
+                    {
+                        unhintedMessage = rm.makeRowMutationMessage();
+                        MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId());
+                    }
+                    if (logger.isDebugEnabled())
+                        logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
+
+                    Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+                    if (messages == null)
+                    {
+                        messages = HashMultimap.create();
+                        dcMessages.put(dc, messages);
+                    }
+
+                    messages.put(unhintedMessage, destination);
+                }
+            }
+            else
+            {
+                // hinted
+                Message hintedMessage = rm.makeRowMutationMessage();
+                for (InetAddress target : targets)
+                {
+                    if (!target.equals(destination))
+                    {
+                        addHintHeader(hintedMessage, target);
+                        if (logger.isDebugEnabled())
+                            logger.debug("insert writing key " + FBUtilities.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
+                    }
+                }
+                responseHandler.addHintCallback(hintedMessage, destination);
+
+                Multimap<Message, InetAddress> messages = dcMessages.get(dc);
+
+                if (messages == null)
+                {
+                    messages = HashMultimap.create();
+                    dcMessages.put(dc, messages);
+                }
+
+                messages.put(hintedMessage, destination);
+            }
         }
+
+        sendMessages(localDataCenter, dcMessages);
     }
 
     /**
@@ -327,10 +343,122 @@ public class StorageProxy implements Sto
             {
                 rm.deepCopy().apply();
                 responseHandler.response(null);
+            }
+        };
+        StageManager.getStage(Stage.MUTATION).execute(runnable);
+    }
+
+    /**
+     * The equivalent of mutate() for counters.
+     * (Note that each CounterMutation ship the consistency level)
+     *
+     * A counter mutation needs to first be applied to a replica (that we'll call the leader for the mutation) before being
+     * replicated to the other endpoint. To achieve so, there is two case:
+     *   1) the coordinator host is a replica: we proceed to applying the update locally and replicate throug
+     *   applyCounterMutationOnLeader
+     *   2) the coordinator is not a replica: we forward the (counter)mutation to a chosen replica (that will proceed through
+     *   applyCounterMutationOnLeader upon receive) and wait for its acknowledgment.
+     *
+     * Implementation note: We check if we can fulfill the CL on the coordinator host even if he is not a replica to allow
+     * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
+     * the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
+     */
+    public static void mutateCounters(List<CounterMutation> mutations) throws UnavailableException, TimeoutException
+    {
+        long startTime = System.nanoTime();
+        ArrayList<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
+
+        CounterMutation mostRecentMutation = null;
+        StorageService ss = StorageService.instance;
+
+        try
+        {
+            for (CounterMutation cm : mutations)
+            {
+                mostRecentMutation = cm;
+                InetAddress endpoint = ss.findSuitableEndpoint(cm.getTable(), cm.key());
+
+                if (endpoint.equals(FBUtilities.getLocalAddress()))
+                {
+                    applyCounterMutationOnLeader(cm);
+                }
+                else
+                {
+                    // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica 
+                    String table = cm.getTable();
+                    AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
+                    Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key());
+                    Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
+                    rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, cm.consistency()).assureSufficientLiveNodes();
+
+                    // Forward the actual update to the chosen leader replica
+                    IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
+                    responseHandlers.add(responseHandler);
+
+                    Message msg = cm.makeMutationMessage();
+                    MessagingService.instance().addCallback(responseHandler, msg.getMessageId());
+                    if (logger.isDebugEnabled())
+                        logger.debug("forwarding counter update of key " + FBUtilities.bytesToHex(cm.key()) + " to " + msg.getMessageId() + "@" + endpoint);
+                    MessagingService.instance().sendOneWay(msg, endpoint);
+                }
+            }
+            // wait for writes.  throws timeoutexception if necessary
+            for (IWriteResponseHandler responseHandler : responseHandlers)
+            {
+                responseHandler.get();
+            }
+        }
+        catch (IOException e)
+        {
+            if (mostRecentMutation == null)
+                throw new RuntimeException("no mutations were seen but found an error during write anyway", e);
+            else
+                throw new RuntimeException("error writing key " + FBUtilities.bytesToHex(mostRecentMutation.key()), e);
+        }
+        finally
+        {
+            counterWriteStats.addNano(System.nanoTime() - startTime);
+        }
+    }
+
+    // Must be called on a replica of the mutation. This replica becomes the
+    // leader of this mutation.
+    public static void applyCounterMutationOnLeader(CounterMutation cm) throws UnavailableException, TimeoutException, IOException
+    {
+        write(Collections.singletonList(cm), cm.consistency(), counterWritePerformer, false);
+    }
+
+    private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter)
+    {
+        // we apply locally first, then send it to other replica
+        if (logger.isDebugEnabled())
+            logger.debug("insert writing local & replicate " + mutation.toString(true));
+
+        Runnable runnable = new WrappedRunnable()
+        {
+            public void runMayThrow() throws IOException
+            {
+                assert mutation instanceof CounterMutation;
+                final CounterMutation cm = (CounterMutation) mutation;
+
+                // apply mutation
+                cm.apply();
+
+                responseHandler.response(null);
 
-                // repair-on-write (local message)
-                ReplicateOnWriteTask replicateOnWriteTask = new ReplicateOnWriteTask(rm);
-                StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(replicateOnWriteTask);
+                if (cm.shouldReplicateOnWrite())
+                {
+                    // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation) 
+                    // and we want to avoid blocking too much the MUTATION stage
+                    StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable()
+                            {
+                                public void runMayThrow() throws IOException
+                    {
+                        // send mutation to other replica
+                        sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, responseHandler, localDataCenter, false);
+                    }
+                    });
+                }
             }
         };
         StageManager.getStage(Stage.MUTATION).execute(runnable);
@@ -771,6 +899,31 @@ public class StorageProxy implements Sto
         return writeStats.getRecentLatencyHistogramMicros();
     }
 
+    public long getCounterWriteOperations()
+    {
+        return counterWriteStats.getOpCount();
+    }
+
+    public long getTotalCounterWriteLatencyMicros()
+    {
+        return counterWriteStats.getTotalLatencyMicros();
+    }
+
+    public double getRecentCounterWriteLatencyMicros()
+    {
+        return counterWriteStats.getRecentLatencyMicros();
+    }
+
+    public long[] getTotalCounterWriteLatencyHistogramMicros()
+    {
+        return counterWriteStats.getTotalLatencyHistogramMicros();
+    }
+
+    public long[] getRecentCounterWriteLatencyHistogramMicros()
+    {
+        return counterWriteStats.getRecentLatencyHistogramMicros();
+    }
+
     public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws IOException, TimeoutException, UnavailableException
     {
@@ -923,4 +1076,9 @@ public class StorageProxy implements Sto
             }
         }
     }
+
+    private interface WritePerformer
+    {
+        public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter) throws IOException;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java Tue Jan 18 16:07:32 2011
@@ -38,6 +38,12 @@ public interface StorageProxyMBean
     public long[] getTotalWriteLatencyHistogramMicros();
     public long[] getRecentWriteLatencyHistogramMicros();
 
+    public long getCounterWriteOperations();
+    public long getTotalCounterWriteLatencyMicros();
+    public double getRecentCounterWriteLatencyMicros();
+    public long[] getTotalCounterWriteLatencyHistogramMicros();
+    public long[] getRecentCounterWriteLatencyHistogramMicros();
+
     public boolean getHintedHandoffEnabled();
     public void setHintedHandoffEnabled(boolean b);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Jan 18 16:07:32 2011
@@ -107,7 +107,7 @@ public class StorageService implements I
         INDEX_SCAN,
         REPLICATION_FINISHED,
         INTERNAL_RESPONSE, // responses to internal calls
-        REPLICATE_ON_WRITE,
+        COUNTER_MUTATION,
         ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
@@ -136,7 +136,7 @@ public class StorageService implements I
         put(Verb.INDEX_SCAN, Stage.READ);
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
         put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
-        put(Verb.REPLICATE_ON_WRITE, Stage.REPLICATE_ON_WRITE);
+        put(Verb.COUNTER_MUTATION, Stage.MUTATION);
     }};
 
 
@@ -224,7 +224,7 @@ public class StorageService implements I
         MessagingService.instance().registerVerbHandlers(Verb.READ, new ReadVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.RANGE_SLICE, new RangeSliceVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.INDEX_SCAN, new IndexScanVerbHandler());
-        MessagingService.instance().registerVerbHandlers(Verb.REPLICATE_ON_WRITE, new ReplicateOnWriteVerbHandler());
+        MessagingService.instance().registerVerbHandlers(Verb.COUNTER_MUTATION, new CounterMutationVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance().registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance().registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler());

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Jan 18 16:07:32 2011
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.thrift;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -397,7 +396,9 @@ public class CassandraServer implements 
                     ThriftValidation.validateMutation(state().getKeyspace(), cfName, mutation);
                 }
             }
-            rowMutations.add(RowMutation.getRowMutationFromMutations(state().getKeyspace(), key, columnFamilyToMutations));
+            RowMutation rm = RowMutation.getRowMutationFromMutations(state().getKeyspace(), key, columnFamilyToMutations);
+            if (!rm.isEmpty())
+                rowMutations.add(rm);
         }
 
         doInsert(consistency_level, rowMutations);
@@ -441,7 +442,23 @@ public class CassandraServer implements 
 
             try
             {
-                StorageProxy.mutate(mutations, consistency_level);
+                if (!mutations.isEmpty())
+                {
+                    // FIXME: Mighty ugly but we've made sure above this will always work
+                    if (mutations.iterator().next().getColumnFamilies().iterator().next().metadata().getDefaultValidator().isCommutative())
+                    {
+                        List<org.apache.cassandra.db.CounterMutation> cmutations = new ArrayList<org.apache.cassandra.db.CounterMutation>(mutations.size());
+                        for (RowMutation mutation : mutations)
+                        {
+                            cmutations.add(new org.apache.cassandra.db.CounterMutation(mutation, consistency_level));
+                        }
+                        StorageProxy.mutateCounters(cmutations);
+                    }
+                    else
+                    {
+                        StorageProxy.mutate(mutations, consistency_level);
+                    }
+                }
             }
             catch (TimeoutException e)
             {
@@ -961,13 +978,8 @@ public class CassandraServer implements 
     {
         logger.debug("add");
 
-        if (ConsistencyLevel.ONE != consistency_level)
-        {
-            throw new InvalidRequestException("Commutative CFs only support ConsistencyLevel.ONE");
-        }
-
         String keyspace = state().getKeyspace();
-        ThriftValidation.validateCommutative(keyspace, column_parent.column_family);
+        ThriftValidation.validateCommutativeForWrite(keyspace, column_parent.column_family, consistency_level);
 
         internal_insert(key, column_parent, getCounterColumn(column), consistency_level);
     }
@@ -1017,11 +1029,6 @@ public class CassandraServer implements 
     {
         logger.debug("batch_add");
 
-        if (ConsistencyLevel.ONE != consistency_level)
-        {
-            throw new InvalidRequestException("Commutative CFs only support ConsistencyLevel.ONE");
-        }
-
         String keyspace = state().getKeyspace();
         
         Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map = new HashMap<ByteBuffer,Map<String,List<Mutation>>>();
@@ -1032,7 +1039,7 @@ public class CassandraServer implements 
             
             for (Entry<String, List<CounterMutation>> innerEntry : entry.getValue().entrySet())
             {
-                ThriftValidation.validateCommutative(keyspace, innerEntry.getKey());
+                ThriftValidation.validateCommutativeForWrite(keyspace, innerEntry.getKey(), consistency_level);
                 
                 List<Mutation> mutations = new ArrayList<Mutation>(innerEntry.getValue().size());
                 for (CounterMutation cm : innerEntry.getValue())

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1060432&r1=1060431&r2=1060432&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Tue Jan 18 16:07:32 2011
@@ -445,7 +445,7 @@ public class ThriftValidation
         return metadata;
     }
 
-    static void validateCommutative(String tablename, String cfName) throws InvalidRequestException
+    public static CFMetaData validateCommutative(String tablename, String cfName) throws InvalidRequestException
     {
         validateTable(tablename);
         CFMetaData metadata = validateCFMetaData(tablename, cfName);
@@ -453,5 +453,15 @@ public class ThriftValidation
         {
             throw new InvalidRequestException("not commutative columnfamily " + cfName);
         }
+        return metadata;
+    }
+
+    public static void validateCommutativeForWrite(String tablename, String cfName, ConsistencyLevel consistency) throws InvalidRequestException
+    {
+        CFMetaData metadata = validateCommutative(tablename, cfName);
+        if (!metadata.getReplicateOnWrite() && consistency != ConsistencyLevel.ONE)
+        {
+            throw new InvalidRequestException("cannot achieve CL > CL.ONE without replicate_on_write on columnfamily " + cfName);
+        }
     }
 }



Mime
View raw message