cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] git commit: Remove netty buffer ref-counting
Date Tue, 12 Aug 2014 00:40:23 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 5d9621cd9 -> d61443e98


Remove netty buffer ref-counting

Patch by tjake; reviewed by benedict for CASSANDRA-7735


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1348aa2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1348aa2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1348aa2

Branch: refs/heads/cassandra-2.1
Commit: a1348aa2986989eaaafdac3efa57fb15c9a54d7c
Parents: 59806a8
Author: Jake Luciani <jake@apache.org>
Authored: Mon Aug 11 20:36:59 2014 -0400
Committer: Jake Luciani <jake@apache.org>
Committed: Mon Aug 11 20:39:42 2014 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/BatchStatement.java         |  13 +-
 .../cql3/statements/ModificationStatement.java  |   8 +-
 .../apache/cassandra/db/CounterMutation.java    |  12 --
 src/java/org/apache/cassandra/db/IMutation.java |  11 --
 src/java/org/apache/cassandra/db/Mutation.java  |  31 ---
 .../apache/cassandra/net/MessagingService.java  |  14 +-
 .../cassandra/net/ResponseVerbHandler.java      |   6 -
 .../apache/cassandra/service/QueryState.java    |  12 --
 .../apache/cassandra/service/StorageProxy.java  | 190 +++++++------------
 .../org/apache/cassandra/transport/CBUtil.java  |  14 +-
 .../org/apache/cassandra/transport/Message.java |   1 -
 12 files changed, 80 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a426df4..a180df9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.0-rc6
+ * Remove netty buffer ref-counting (CASSANDRA-7735)
  * Pass mutated cf to index updater for use by PRSI (CASSANDRA-7742)
  * Include stress yaml example in release and deb (CASSANDRA-7717)
  * workaround for netty issue causing corrupted data off the wire (CASSANDRA-7695)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 0521485..88d23ca 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -23,7 +23,6 @@ import java.util.*;
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.transport.Frame;
 import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -167,7 +166,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchQueryOptions options,
boolean local, long now, Frame sourceFrame)
+    private Collection<? extends IMutation> getMutations(BatchQueryOptions options,
boolean local, long now)
     throws RequestExecutionException, RequestValidationException
     {
         Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>();
@@ -176,7 +175,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             ModificationStatement statement = statements.get(i);
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(now, statementOptions);
-            addStatementMutations(statement, statementOptions, local, timestamp, mutations,
sourceFrame);
+            addStatementMutations(statement, statementOptions, local, timestamp, mutations);
         }
         return unzipMutations(mutations);
     }
@@ -197,8 +196,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
                                        QueryOptions options,
                                        boolean local,
                                        long now,
-                                       Map<String, Map<ByteBuffer, IMutation>>
mutations,
-                                       Frame sourceFrame)
+                                       Map<String, Map<ByteBuffer, IMutation>>
mutations)
     throws RequestExecutionException, RequestValidationException
     {
         String ksName = statement.keyspace();
@@ -223,7 +221,6 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
             if (mutation == null)
             {
                 mut = new Mutation(ksName, key);
-                mut.setSourceFrame(sourceFrame);
                 mutation = statement.cfm.isCounter() ? new CounterMutation(mut, options.getConsistency())
: mut;
                 ksMap.put(key, mutation);
             }
@@ -280,7 +277,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
         if (hasConditions)
             return executeWithConditions(options, now);
 
-        executeWithoutConditions(getMutations(options, local, now, queryState.getSourceFrame()),
options.getConsistency());
+        executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
         return new ResultMessage.Void();
     }
 
@@ -356,7 +353,7 @@ public class BatchStatement implements CQLStatement, MeasurableForPreparedCache
     public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws
RequestValidationException, RequestExecutionException
     {
         assert !hasConditions;
-        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options),
true, queryState.getTimestamp(), queryState.getSourceFrame()))
+        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options),
true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index f0ab603..478f596 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -22,7 +22,6 @@ import java.util.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import org.apache.cassandra.transport.Frame;
 import org.github.jamm.MemoryMeter;
 
 import org.apache.cassandra.auth.Permission;
@@ -498,7 +497,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState),
queryState.getSourceFrame());
+        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
         if (!mutations.isEmpty())
             StorageProxy.mutateWithTriggers(mutations, cl, false);
 
@@ -636,7 +635,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         if (hasConditions())
             throw new UnsupportedOperationException();
 
-        for (IMutation mutation : getMutations(options, true, queryState.getTimestamp(),
queryState.getSourceFrame()))
+        for (IMutation mutation : getMutations(options, true, queryState.getTimestamp()))
         {
             // We don't use counters internally.
             assert mutation instanceof Mutation;
@@ -656,7 +655,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
      * @return list of the mutations
      * @throws InvalidRequestException on invalid requests
      */
-    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean
local, long now, Frame sourceFrame)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean
local, long now)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
@@ -671,7 +670,6 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
             Mutation mut = new Mutation(cfm.ksName, key, cf);
-            mut.setSourceFrame(sourceFrame);
 
             mutations.add(isCounter() ? new CounterMutation(mut, options.getConsistency())
: mut);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 2bfdd4e..58717b4 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -72,18 +72,6 @@ public class CounterMutation implements IMutation
         return mutation.getColumnFamilies();
     }
 
-    @Override
-    public void retain()
-    {
-        mutation.retain();
-    }
-
-    @Override
-    public void release()
-    {
-        mutation.release();
-    }
-
     public Mutation getMutation()
     {
         return mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index 3e037c3..44df104 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -30,15 +30,4 @@ public interface IMutation
     public String toString(boolean shallow);
     public void addAll(IMutation m);
     public Collection<ColumnFamily> getColumnFamilies();
-
-    /**
-     * Call to increment underlying network buffer refcount
-     * So we can avoid recycling too soon
-     */
-    public void retain();
-
-    /**
-     * Call to decrement underlying network buffer refcount
-     */
-    public void release();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6eb56b7..a6d23cb 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.transport.Frame;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -56,8 +55,6 @@ public class Mutation implements IMutation
     // map of column family id to mutations for that column family.
     private final Map<UUID, ColumnFamily> modifications;
 
-    private Frame sourceFrame;
-
     public Mutation(String keyspaceName, ByteBuffer key)
     {
         this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
@@ -88,8 +85,6 @@ public class Mutation implements IMutation
     public Mutation copy()
     {
         Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications));
-        copy.setSourceFrame(getSourceFrame());
-
         return copy;
     }
 
@@ -113,20 +108,6 @@ public class Mutation implements IMutation
         return modifications.values();
     }
 
-    @Override
-    public void retain()
-    {
-        if (sourceFrame != null)
-            sourceFrame.retain();
-    }
-
-    @Override
-    public void release()
-    {
-        if (sourceFrame != null)
-            sourceFrame.release();
-    }
-
     public ColumnFamily getColumnFamily(UUID cfId)
     {
         return modifications.get(cfId);
@@ -229,8 +210,6 @@ public class Mutation implements IMutation
      */
     public void apply()
     {
-        assert sourceFrame == null || sourceFrame.body.refCnt() > 0;
-
         Keyspace ks = Keyspace.open(keyspaceName);
         ks.apply(this, ks.metadata.durableWrites);
     }
@@ -290,16 +269,6 @@ public class Mutation implements IMutation
         return mutation;
     }
 
-    public Frame getSourceFrame()
-    {
-        return sourceFrame;
-    }
-
-    public void setSourceFrame(Frame sourceFrame)
-    {
-        this.sourceFrame = sourceFrame;
-    }
-
     public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws
IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 9da247d..10cee8d 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -353,15 +353,7 @@ public final class MessagingService implements MessagingServiceMBean
                 {
                     Mutation mutation = (Mutation) ((WriteCallbackInfo) expiredCallbackInfo).sentMessage.payload;
 
-                    try
-                    {
-                        return StorageProxy.submitHint(mutation, expiredCallbackInfo.target,
null);
-                    }
-                    finally
-                    {
-                        //We serialized a hint so we don't need this mutation anymore
-                        mutation.release();
-                    }
+                    return StorageProxy.submitHint(mutation, expiredCallbackInfo.target,
null);
                 }
 
                 return null;
@@ -580,10 +572,6 @@ public final class MessagingService implements MessagingServiceMBean
         assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION;
         int messageId = nextId();
 
-        //keep the underlying buffer around till the request completes or times out and
-        //a hint is stored
-        message.payload.retain();
-
         CallbackInfo previous = callbacks.put(messageId,
                                               new WriteCallbackInfo(to,
                                                                     cb,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 1e1a278..0ec91c6 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -53,11 +53,5 @@ public class ResponseVerbHandler implements IVerbHandler
             MessagingService.instance().maybeAddLatency(cb, message.from, latency);
             cb.response(message);
         }
-
-        // We don't need to track the mutation anymore since write succeeded
-        if (callbackInfo instanceof WriteCallbackInfo)
-        {
-            ((IMutation)((WriteCallbackInfo) callbackInfo).sentMessage.payload).release();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index f2e0809..12fc392 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service;
 import java.util.UUID;
 
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.Frame;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -31,7 +30,6 @@ public class QueryState
     private final ClientState clientState;
     private volatile long clock;
     private volatile UUID preparedTracingSession;
-    private Frame sourceFrame;
 
     public QueryState(ClientState clientState)
     {
@@ -62,16 +60,6 @@ public class QueryState
         return clock;
     }
 
-    public Frame getSourceFrame()
-    {
-        return sourceFrame;
-    }
-
-    public void setSourceFrame(Frame sourceFrame)
-    {
-        this.sourceFrame = sourceFrame;
-    }
-
     public boolean traceNextQuery()
     {
         if (preparedTracingSession != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 65ce413..63dc391 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -457,8 +457,6 @@ public class StorageProxy implements StorageProxyMBean
         {
             for (IMutation mutation : mutations)
             {
-                mutation.retain();
-
                 if (mutation instanceof CounterMutation)
                 {
                     responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
@@ -524,13 +522,6 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
-            //Release the mutations we dispatched so far.
-            //An exception may be thrown at anytime.
-            //We can infer the mutations that were dispatched from this list
-            Iterator<? extends IMutation> it = mutations.iterator();
-            for (int i = 0; i < responseHandlers.size(); i++)
-                it.next().release();
-            
             writeMetrics.addNano(System.nanoTime() - startTime);
         }
     }
@@ -797,77 +788,69 @@ public class StorageProxy implements StorageProxyMBean
         boolean insertLocal = false;
 
 
-        mutation.retain();
-        try        
+        for (InetAddress destination : targets)
         {
-            for (InetAddress destination : targets)
+            // avoid OOMing due to excess hints.  we need to do this check even for "live"
nodes, since we can
+            // still generate hints for those if it's overloaded or simply dead but not yet
known-to-be-dead.
+            // The idea is that if we have over maxHintsInProgress hints in flight, this
is probably due to
+            // a small number of nodes causing problems, so we should avoid shutting down
writes completely to
+            // healthy nodes.  Any node with no hintsInProgress is considered healthy.
+            if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress
+                    && (getHintsInProgressFor(destination).get() > 0 &&
shouldHint(destination)))
             {
-                // avoid OOMing due to excess hints.  we need to do this check even for "live"
nodes, since we can
-                // still generate hints for those if it's overloaded or simply dead but not
yet known-to-be-dead.
-                // The idea is that if we have over maxHintsInProgress hints in flight, this
is probably due to
-                // a small number of nodes causing problems, so we should avoid shutting
down writes completely to
-                // healthy nodes.  Any node with no hintsInProgress is considered healthy.
-                if (StorageMetrics.totalHintsInProgress.count() > maxHintsInProgress
-                        && (getHintsInProgressFor(destination).get() > 0 &&
shouldHint(destination)))
-                {
-                    throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count());
-                }
+                throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.count());
+            }
 
-                if (FailureDetector.instance.isAlive(destination))
+            if (FailureDetector.instance.isAlive(destination))
+            {
+                if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+                {
+                    insertLocal = true;
+                } else
                 {
-                    if (destination.equals(FBUtilities.getBroadcastAddress()) &&
OPTIMIZE_LOCAL_REQUESTS)
+                    // belongs on a different server
+                    if (message == null)
+                        message = mutation.createMessage();
+                    String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+                    // direct writes to local DC or old Cassandra versions
+                    // (1.1 knows how to forward old-style String message IDs; updated to
int in 2.0)
+                    if (localDataCenter.equals(dc))
                     {
-                        insertLocal = true;
+                        MessagingService.instance().sendRR(message, destination, responseHandler,
true);
                     } else
                     {
-                        // belongs on a different server
-                        if (message == null)
-                            message = mutation.createMessage();
-                        String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-                        // direct writes to local DC or old Cassandra versions
-                        // (1.1 knows how to forward old-style String message IDs; updated
to int in 2.0)
-                        if (localDataCenter.equals(dc))
+                        Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc)
: null;
+                        if (messages == null)
                         {
-                            MessagingService.instance().sendRR(message, destination, responseHandler,
true);
-                        } else
-                        {
-                            Collection<InetAddress> messages = (dcGroups != null) ?
dcGroups.get(dc) : null;
-                            if (messages == null)
-                            {
-                                messages = new ArrayList<InetAddress>(3); // most DCs
will have <= 3 replicas
-                                if (dcGroups == null)
-                                    dcGroups = new HashMap<String, Collection<InetAddress>>();
-                                dcGroups.put(dc, messages);
-                            }
-                            messages.add(destination);
+                            messages = new ArrayList<InetAddress>(3); // most DCs will
have <= 3 replicas
+                            if (dcGroups == null)
+                                dcGroups = new HashMap<String, Collection<InetAddress>>();
+                            dcGroups.put(dc, messages);
                         }
+                        messages.add(destination);
                     }
-                } else
-                {
-                    if (!shouldHint(destination))
-                        continue;
-
-                    // Schedule a local hint
-                    submitHint(mutation, destination, responseHandler);
                 }
-            }
-
-            if (insertLocal)
-                insertLocal(mutation, responseHandler);
-
-            if (dcGroups != null)
+            } else
             {
-                // for each datacenter, send the message to one node to relay the write to
other replicas
-                if (message == null)
-                    message = mutation.createMessage();
+                if (!shouldHint(destination))
+                    continue;
 
-                for (Collection<InetAddress> dcTargets : dcGroups.values())
-                    sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
+                // Schedule a local hint
+                submitHint(mutation, destination, responseHandler);
             }
         }
-        finally
+
+        if (insertLocal)
+            insertLocal(mutation, responseHandler);
+
+        if (dcGroups != null)
         {
-            mutation.release();
+            // for each datacenter, send the message to one node to relay the write to other
replicas
+            if (message == null)
+                message = mutation.createMessage();
+
+            for (Collection<InetAddress> dcTargets : dcGroups.values())
+                sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
         }
     }
 
@@ -889,30 +872,22 @@ public class StorageProxy implements StorageProxyMBean
     {
         // local write that time out should be handled by LocalMutationRunnable
         assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
-        mutation.retain();
 
         HintRunnable runnable = new HintRunnable(target)
         {
             public void runMayThrow()
             {
-                try
+                int ttl = HintedHandOffManager.calculateHintTTL(mutation);
+                if (ttl > 0)
                 {
-                    int ttl = HintedHandOffManager.calculateHintTTL(mutation);
-                    if (ttl > 0)
-                    {
-                        logger.debug("Adding hint for {}", target);
-                        writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
-                        // Notify the handler only for CL == ANY
-                        if (responseHandler != null && responseHandler.consistencyLevel
== ConsistencyLevel.ANY)
-                            responseHandler.response(null);
-                    } else
-                    {
-                        logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
-                    }
-                }
-                finally
+                    logger.debug("Adding hint for {}", target);
+                    writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target);
+                    // Notify the handler only for CL == ANY
+                    if (responseHandler != null && responseHandler.consistencyLevel
== ConsistencyLevel.ANY)
+                        responseHandler.response(null);
+                } else
                 {
-                    mutation.release();
+                    logger.debug("Skipped writing hint for {} (ttl {})", target, ttl);
                 }
             }
         };
@@ -976,24 +951,16 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler
responseHandler)
     {
-        mutation.retain();
 
         StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
         {
             public void runMayThrow()
             {
-                try
-                {
-                    IMutation processed = SinkManager.processWriteRequest(mutation);
-                    if (processed != null)
-                    {
-                        ((Mutation) processed).apply();
-                        responseHandler.response(null);
-                    }
-                }
-                finally
+                IMutation processed = SinkManager.processWriteRequest(mutation);
+                if (processed != null)
                 {
-                    mutation.release();
+                    ((Mutation) processed).apply();
+                    responseHandler.response(null);
                 }
             }
         });
@@ -1099,8 +1066,6 @@ public class StorageProxy implements StorageProxyMBean
                                              final AbstractWriteResponseHandler responseHandler,
                                              final String localDataCenter)
     {
-        mutation.retain();
-
         return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
         {
             @Override
@@ -1121,12 +1086,6 @@ public class StorageProxy implements StorageProxyMBean
                 if (!remotes.isEmpty())
                     sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter);
             }
-
-            @Override
-            public void cleanup()
-            {
-                mutation.release();
-            }
         };
     }
 
@@ -2052,32 +2011,21 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            try
-            {
-                if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) >
DatabaseDescriptor.getTimeout(verb))
-                {
-                    MessagingService.instance().incrementDroppedMessages(verb);
-                    return;
-                }
 
-                try
-                {
-                    runMayThrow();
-                } catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
+            if (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - constructionTime) >
DatabaseDescriptor.getTimeout(verb))
+            {
+                MessagingService.instance().incrementDroppedMessages(verb);
+                return;
             }
-            finally
+            try
+            {
+                runMayThrow();
+            } catch (Exception e)
             {
-                cleanup();
+                throw new RuntimeException(e);
             }
         }
 
-        public void cleanup()
-        {
-        }
-
         abstract protected void runMayThrow() throws Exception;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/transport/CBUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java
index 6cc6d47..450dc17 100644
--- a/src/java/org/apache/cassandra/transport/CBUtil.java
+++ b/src/java/org/apache/cassandra/transport/CBUtil.java
@@ -297,11 +297,8 @@ public abstract class CBUtil
         if (length < 0)
             return null;
         ByteBuf slice = cb.readSlice(length);
-        if (slice.nioBufferCount() == 1)
-            return slice.nioBuffer();
-        else
-            return ByteBuffer.wrap(readRawBytes(slice));
 
+        return ByteBuffer.wrap(readRawBytes(slice));
     }
 
     public static void writeValue(byte[] bytes, ByteBuf cb)
@@ -417,18 +414,9 @@ public abstract class CBUtil
 
     /*
      * Reads *all* readable bytes from {@code cb} and return them.
-     * If {@code cb} is backed by an array, this will return the underlying array directly,
without copy.
      */
     public static byte[] readRawBytes(ByteBuf cb)
     {
-        if (cb.hasArray() && cb.readableBytes() == cb.array().length)
-        {
-            // Move the readerIndex just so we consistenly consume the input
-            cb.readerIndex(cb.writerIndex());
-            return cb.array();
-        }
-
-        // Otherwise, just read the bytes in a new array
         byte[] bytes = new byte[cb.readableBytes()];
         cb.readBytes(bytes);
         return bytes;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1348aa2/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index b02b176..9a89454 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -414,7 +414,6 @@ public abstract class Message
                 assert request.connection() instanceof ServerConnection;
                 connection = (ServerConnection)request.connection();
                 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(),
request.getStreamId());
-                qstate.setSourceFrame(request.getSourceFrame());
 
                 logger.debug("Received: {}, v={}", request, connection.getVersion());
 


Mime
View raw message