cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [11/24] cassandra git commit: Assert the local node is never hinted and make PAXOS commit not hint (2.2 version)
Date Wed, 13 Jan 2016 10:38:12 GMT
Assert the local node is never hinted and make PAXOS commit not hint (2.2 version)

patch by aweisberg; reviewed by slebresne for CASSANDRA-10477


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f13a7dfb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f13a7dfb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f13a7dfb

Branch: refs/heads/cassandra-3.3
Commit: f13a7dfb149a8dd8ef9da050b73504a04c20fadc
Parents: 9562154
Author: Ariel Weisberg <ariel.weisberg@datastax.com>
Authored: Tue Nov 17 17:34:43 2015 -0500
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Jan 13 11:29:39 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/net/WriteCallbackInfo.java |  3 +
 .../cassandra/service/AbstractReadExecutor.java |  2 +-
 .../apache/cassandra/service/StorageProxy.java  | 91 +++++++++++++++-----
 4 files changed, 74 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f895139..21c5b27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 Merged from 2.1:
+ * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477)
  * If CompactionMetadata is not in stats file, use index summary instead (CASSANDRA-10676)
  * Retry sending gossip syn multiple times during shadow round (CASSANDRA-8072)
  * Fix pending range calculation during moves (CASSANDRA-10887)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 582298c..c1fb98d 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class WriteCallbackInfo extends CallbackInfo
 {
@@ -44,6 +45,8 @@ public class WriteCallbackInfo extends CallbackInfo
         this.sentMessage = message;
         this.consistencyLevel = consistencyLevel;
         this.allowHints = allowHints;
+        //Local writes shouldn't go through messaging service (https://issues.apache.org/jira/browse/CASSANDRA-10477)
+        assert (!target.equals(FBUtilities.getBroadcastAddress()));
     }
 
     Mutation mutation()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3aab12f..2bfd059 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -77,7 +77,7 @@ public abstract class AbstractReadExecutor
 
     private static boolean isLocalRequest(InetAddress replica)
     {
-        return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
+        return replica.equals(FBUtilities.getBroadcastAddress());
     }
 
     protected void makeDataRequests(Iterable<InetAddress> endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f13a7dfb/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 71a3d6b..88253e3 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.net.MessagingService.Verb;
 import org.apache.cassandra.service.paxos.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
@@ -68,7 +69,6 @@ public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
-    static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice
path on single node
 
     public static final String UNREACHABLE = "UNREACHABLE";
 
@@ -497,12 +497,20 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT,
proposal, Commit.serializer);
         for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
+
             if (FailureDetector.instance.isAlive(destination))
             {
                 if (shouldBlock)
-                    MessagingService.instance().sendRR(message, destination, responseHandler,
shouldHint);
+                {
+                    if (destination.equals(FBUtilities.getBroadcastAddress()))
+                        commitPaxosLocal(message, responseHandler);
+                    else
+                        MessagingService.instance().sendRR(message, destination, responseHandler,
shouldHint);
+                }
                 else
+                {
                     MessagingService.instance().sendOneWay(message, destination);
+                }
             }
             else if (shouldHint)
             {
@@ -515,6 +523,30 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
+     * Commit a PAXOS task locally, and if the task times out rather then submitting a real
hint
+     * submit a fake one that executes immediately on the mutation stage, but generates the
necessary backpressure
+     * signal for hints
+     */
+    private static void commitPaxosLocal(final MessageOut<Commit> message, final AbstractWriteResponseHandler
responseHandler)
+    {
+        StageManager.getStage(MessagingService.verbStages.get(MessagingService.Verb.PAXOS_COMMIT)).maybeExecuteImmediately(new
LocalMutationRunnable()
+        {
+            public void runMayThrow()
+            {
+                PaxosState.commit(message.payload);
+                if (responseHandler != null)
+                    responseHandler.response(null);
+            }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.PAXOS_COMMIT;
+            }
+        });
+    }
+
+    /**
      * Use this method to have these Mutations applied
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
@@ -718,7 +750,7 @@ public class StorageProxy implements StorageProxyMBean
         for (InetAddress target : endpoints)
         {
             int targetVersion = MessagingService.instance().getVersion(target);
-            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+            if (target.equals(FBUtilities.getBroadcastAddress()))
             {
                 insertLocal(message.payload, handler);
             }
@@ -752,7 +784,7 @@ public class StorageProxy implements StorageProxyMBean
         MessageOut<Mutation> message = mutation.createMessage();
         for (InetAddress target : endpoints)
         {
-            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+            if (target.equals(FBUtilities.getBroadcastAddress()))
                 insertLocal(message.payload, handler);
             else
                 MessagingService.instance().sendRR(message, target, handler, false);
@@ -875,7 +907,7 @@ public class StorageProxy implements StorageProxyMBean
      * | off            |       ANY      | --> DO NOT fire hints. And DO NOT wait for
them to complete.
      * }
      * </pre>
-     * 
+     *
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedEndpoints(final Mutation mutation,
@@ -894,20 +926,11 @@ public class StorageProxy implements StorageProxyMBean
 
         for (InetAddress destination : targets)
         {
-            // avoid OOMing due to excess hints.  we need to do this check even for "live"
nodes, since we can
-            // still generate hints for those if it's overloaded or simply dead but not yet
known-to-be-dead.
-            // The idea is that if we have over maxHintsInProgress hints in flight, this
is probably due to
-            // a small number of nodes causing problems, so we should avoid shutting down
writes completely to
-            // healthy nodes.  Any node with no hintsInProgress is considered healthy.
-            if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
-                    && (getHintsInProgressFor(destination).get() > 0 &&
shouldHint(destination)))
-            {
-                throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount());
-            }
+            checkHintOverload(destination);
 
             if (FailureDetector.instance.isAlive(destination))
             {
-                if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+                if (destination.equals(FBUtilities.getBroadcastAddress()))
                 {
                     insertLocal = true;
                 } else
@@ -958,6 +981,22 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    private static void checkHintOverload(InetAddress destination) throws OverloadedException
+    {
+        // avoid OOMing due to excess hints.  we need to do this check even for "live" nodes,
since we can
+        // still generate hints for those if it's overloaded or simply dead but not yet known-to-be-dead.
+        // The idea is that if we have over maxHintsInProgress hints in flight, this is probably
due to
+        // a small number of nodes causing problems, so we should avoid shutting down writes
completely to
+        // healthy nodes.  Any node with no hintsInProgress is considered healthy.
+        if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
+                && (getHintsInProgressFor(destination).get() > 0 && shouldHint(destination)))
+        {
+            throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount()
+
+                                          " destination: " + destination +
+                                          " destination hints: " + getHintsInProgressFor(destination).get());
+        }
+    }
+
     private static AtomicInteger getHintsInProgressFor(InetAddress destination)
     {
         try
@@ -1077,6 +1116,12 @@ public class StorageProxy implements StorageProxyMBean
                     responseHandler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
+
+            @Override
+            protected Verb verb()
+            {
+                return MessagingService.Verb.MUTATION;
+            }
         });
     }
 
@@ -1795,8 +1840,7 @@ public class StorageProxy implements StorageProxyMBean
                     handler.assureSufficientLiveNodes();
                     resolver.setSources(filteredEndpoints);
                     if (filteredEndpoints.size() == 1
-                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
-                        && OPTIMIZE_LOCAL_REQUESTS)
+                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
                     {
                         StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd,
handler), Tracing.instance.get());
                     }
@@ -2206,9 +2250,11 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            final MessagingService.Verb verb = verb();
+            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(verb))
             {
-                MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
+                if (MessagingService.DROPPABLE_VERBS.contains(verb()))
+                    MessagingService.instance().incrementDroppedMessages(verb);
                 HintRunnable runnable = new HintRunnable(FBUtilities.getBroadcastAddress())
                 {
                     protected void runMayThrow() throws Exception
@@ -2230,6 +2276,7 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
+        abstract protected MessagingService.Verb verb();
         abstract protected void runMayThrow() throws Exception;
     }
 
@@ -2324,11 +2371,11 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairAttempted() {
         return ReadRepairMetrics.attempted.getCount();
     }
-    
+
     public long getReadRepairRepairedBlocking() {
         return ReadRepairMetrics.repairedBlocking.getCount();
     }
-    
+
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }


Mime
View raw message