cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [01/17] git commit: Backport CASSANDRA-6747
Date Wed, 30 Jul 2014 17:00:18 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 7e1adb497 -> 440d23603
  refs/heads/cassandra-2.1 307f91617 -> 6554bcc0f
  refs/heads/cassandra-2.1.0 4669ab969 -> cc3e0dbda
  refs/heads/trunk 57cecd96a -> 4626fc439


Backport CASSANDRA-6747

patch by yukim; reviewed by krummas for CASSANDRA-7560


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e1adb49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e1adb49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e1adb49

Branch: refs/heads/cassandra-2.1
Commit: 7e1adb4976470b48a361ec6dcca7cbbcdb86d85f
Parents: b44bbb8
Author: Yuki Morishita <yukim@apache.org>
Authored: Tue Jul 29 16:02:38 2014 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Tue Jul 29 16:02:38 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/net/CallbackInfo.java  | 15 ++++++++-
 .../net/IAsyncCallbackWithFailure.java          | 28 ++++++++++++++++
 .../cassandra/net/MessageDeliveryTask.java      | 16 ++++++++-
 .../org/apache/cassandra/net/MessageIn.java     | 10 ++++++
 .../apache/cassandra/net/MessagingService.java  | 34 +++++++++++++++-----
 .../cassandra/net/ResponseVerbHandler.java      | 12 +++++--
 .../apache/cassandra/repair/SnapshotTask.java   | 21 +++++++-----
 8 files changed, 117 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 26d94f7..7f7d2bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596)
  * Add option to disable STCS in L0 (CASSANDRA-6621)
  * Fix error when doing reversed queries with static columns (CASSANDRA-7490)
+ * Backport CASSANDRA-6747 (CASSANDRA-7560)
 Merged from 1.2:
  * Set correct stream ID on responses when non-Exception Throwables
    are thrown while handling native protocol messages (CASSANDRA-7470)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index 3e584b4..b61210c 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -31,6 +31,12 @@ public class CallbackInfo
     protected final InetAddress target;
     protected final IAsyncCallback callback;
     protected final IVersionedSerializer<?> serializer;
+    private final boolean failureCallback;
+
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?>
serializer)
+    {
+        this(target, callback, serializer, false);
+    }
 
     /**
      * Create CallbackInfo without sent message
@@ -39,11 +45,12 @@ public class CallbackInfo
      * @param callback
      * @param serializer serializer to deserialize response message
      */
-    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?>
serializer)
+    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?>
serializer, boolean failureCallback)
     {
         this.target = target;
         this.callback = callback;
         this.serializer = serializer;
+        this.failureCallback = failureCallback;
     }
 
     public boolean shouldHint()
@@ -51,12 +58,18 @@ public class CallbackInfo
         return false;
     }
 
+    public boolean isFailureCallback()
+    {
+        return failureCallback;
+    }
+
     public String toString()
     {
         return "CallbackInfo(" +
                "target=" + target +
                ", callback=" + callback +
                ", serializer=" + serializer +
+               ", failureCallback=" + failureCallback +
                ')';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
new file mode 100644
index 0000000..1f95579
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/IAsyncCallbackWithFailure.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+public interface IAsyncCallbackWithFailure<T> extends IAsyncCallback<T>
+{
+    /**
+     * Called when there is an exception on the remote node or timeout happens
+     */
+    public void onFailure(InetAddress from);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index e49b93c..982f17e 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -57,7 +57,21 @@ public class MessageDeliveryTask implements Runnable
             return;
         }
 
-        verbHandler.doVerb(message, id);
+        try
+        {
+            verbHandler.doVerb(message, id);
+        }
+        catch (Throwable t)
+        {
+            if (message.doCallbackOnFailure())
+            {
+                MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+                                                    .withParameter(MessagingService.FAILURE_RESPONSE_PARAM,
MessagingService.ONE_BYTE);
+                MessagingService.instance().sendReply(response, id, message.from);
+            }
+
+            throw t;
+        }
         if (GOSSIP_VERBS.contains(message.verb))
             Gossiper.instance.setLastProcessedMessageAt(constructionTime);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index e0efefe..10260c2 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -105,6 +105,16 @@ public class MessageIn<T>
         return MessagingService.verbStages.get(verb);
     }
 
+    public boolean doCallbackOnFailure()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_CALLBACK_PARAM);
+    }
+
+    public boolean isFailureResponse()
+    {
+        return parameters.containsKey(MessagingService.FAILURE_RESPONSE_PARAM);
+    }
+
     public long getTimeout()
     {
         return DatabaseDescriptor.getTimeout(verb);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 4a5df29..0bb1b17 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -75,6 +75,10 @@ public final class MessagingService implements MessagingServiceMBean
 
     public boolean allNodesAtLeast20 = true;
 
+    public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
+    public static final byte[] ONE_BYTE = new byte[1];
+    public static final String FAILURE_RESPONSE_PARAM = "FAIL";
+
     /**
      * we preface every message with this number so the recipient can validate the sender
is sane
      */
@@ -166,7 +170,6 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
         put(Verb.INDEX_SCAN, Stage.READ);
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
-        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
         put(Verb.COUNTER_MUTATION, Stage.MUTATION);
         put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.ECHO, Stage.GOSSIP);
@@ -329,10 +332,19 @@ public final class MessagingService implements MessagingServiceMBean
         {
             public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>
pair)
             {
-                CallbackInfo expiredCallbackInfo = pair.right.value;
+                final CallbackInfo expiredCallbackInfo = pair.right.value;
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target,
pair.right.timeout);
                 ConnectionMetrics.totalTimeouts.mark();
                 getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
+                if (expiredCallbackInfo.isFailureCallback())
+                {
+                    StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable()
{
+                        @Override
+                        public void run() {
+                            ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target);
+                        }
+                    });
+                }
 
                 if (expiredCallbackInfo.shouldHint())
                 {
@@ -537,11 +549,11 @@ public final class MessagingService implements MessagingServiceMBean
         return verbHandlers.get(type);
     }
 
-    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout)
+    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout,
boolean failureCallback)
     {
         assert message.verb != Verb.MUTATION; // mutations need to call the overload with
a ConsistencyLevel
         int messageId = nextId();
-        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb)),
timeout);
+        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb),
failureCallback), timeout);
         assert previous == null : String.format("Callback already exists for id %d! (%s)",
messageId, previous);
         return messageId;
     }
@@ -576,7 +588,12 @@ public final class MessagingService implements MessagingServiceMBean
 
     public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
     {
-        return sendRR(message, to, cb, message.getTimeout());
+        return sendRR(message, to, cb, message.getTimeout(), false);
+    }
+
+    public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure
cb)
+    {
+        return sendRR(message, to, cb, message.getTimeout(), true);
     }
 
     /**
@@ -588,12 +605,13 @@ public final class MessagingService implements MessagingServiceMBean
      * @param cb      callback interface which is used to pass the responses or
      *                suggest that a timeout occurred to the invoker of the send().
      * @param timeout the timeout used for expiration
+     * @param failureCallback true if given cb has failure callback
      * @return an reference to message id used to match with the result
      */
-    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout)
+    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout,
boolean failureCallback)
     {
-        int id = addCallback(cb, message, to, timeout);
-        sendOneWay(message, id, to);
+        int id = addCallback(cb, message, to, timeout, failureCallback);
+        sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE)
: message, id, to);
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 132e574..1d9aa98 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -42,7 +42,15 @@ public class ResponseVerbHandler implements IVerbHandler
 
         Tracing.trace("Processing response from {}", message.from);
         IAsyncCallback cb = callbackInfo.callback;
-        MessagingService.instance().maybeAddLatency(cb, message.from, latency);
-        cb.response(message);
+        if (message.isFailureResponse())
+        {
+            ((IAsyncCallbackWithFailure) cb).onFailure(message.from);
+        }
+        else
+        {
+            //TODO: Should we add latency only in success cases?
+            MessagingService.instance().maybeAddLatency(cb, message.from, latency);
+            cb.response(message);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e1adb49/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index 1a9d324..09e8104 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -24,7 +24,7 @@ import java.util.concurrent.RunnableFuture;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.db.SnapshotCommand;
-import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 
@@ -44,18 +44,18 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements
Runnabl
 
     public void run()
     {
-        MessagingService.instance().sendRR(new SnapshotCommand(desc.keyspace,
-                                                               desc.columnFamily,
-                                                               desc.sessionId.toString(),
-                                                               false).createMessage(),
-                                           endpoint,
-                                           new SnapshotCallback(this));
+        MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace,
+                                                                          desc.columnFamily,
+                                                                          desc.sessionId.toString(),
+                                                                          false).createMessage(),
+                                                      endpoint,
+                                                      new SnapshotCallback(this));
     }
 
     /**
      * Callback for snapshot request. Run on INTERNAL_RESPONSE stage.
      */
-    static class SnapshotCallback implements IAsyncCallback
+    static class SnapshotCallback implements IAsyncCallbackWithFailure
     {
         final SnapshotTask task;
 
@@ -75,5 +75,10 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements
Runnabl
         }
 
         public boolean isLatencyForSnitch() { return false; }
+
+        public void onFailure(InetAddress from)
+        {
+            task.setException(new RuntimeException("Could not create snapshot at " + from));
+        }
     }
 }


Mime
View raw message