cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [4/4] git commit: Don't fail streams on failure detector downs
Date Tue, 03 Jun 2014 06:14:31 GMT
Don't fail streams on failure detector downs

Patch by JoshuaMcKenzie; reviewed by marcuse for CASSANDRA-3569


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0f2d7d0b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0f2d7d0b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0f2d7d0b

Branch: refs/heads/cassandra-2.1
Commit: 0f2d7d0b9540efa3ea3dfe4f8270c3635afdc63c
Parents: 878990c
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Tue Jun 3 08:11:56 2014 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Tue Jun 3 08:11:56 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 debian/cassandra-sysctl.conf                    |  1 +
 .../org/apache/cassandra/repair/RepairJob.java  | 15 ++++++--
 .../apache/cassandra/repair/RepairSession.java  | 37 ++++++++++++++++----
 .../cassandra/service/ActiveRepairService.java  |  1 -
 .../cassandra/streaming/ConnectionHandler.java  |  1 +
 .../cassandra/streaming/StreamSession.java      | 22 ++----------
 .../cassandra/streaming/StreamTransferTask.java | 12 +++++--
 8 files changed, 58 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 333606a..b5c2feb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 2.1.0
  * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
  * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
+ * Don't fail streams on failure detector downs (CASSANDRA-3569)
 Merged from 2.0:
  * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
  * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/debian/cassandra-sysctl.conf
----------------------------------------------------------------------
diff --git a/debian/cassandra-sysctl.conf b/debian/cassandra-sysctl.conf
index 2173765..443e83f 100644
--- a/debian/cassandra-sysctl.conf
+++ b/debian/cassandra-sysctl.conf
@@ -1 +1,2 @@
 vm.max_map_count = 1048575
+net.ipv4.tcp_keepalive_time=300

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index af00403..8057ed5 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -58,11 +58,21 @@ public class RepairJob
     /* Count down as sync completes */
     private AtomicInteger waitForSync;
 
+    private final IRepairJobEventListener listener;
+
     /**
      * Create repair job to run on specific columnfamily
      */
-    public RepairJob(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily,
Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor)
+    public RepairJob(IRepairJobEventListener listener,
+                     UUID parentSessionId,
+                     UUID sessionId,
+                     String keyspace,
+                     String columnFamily,
+                     Range<Token> range,
+                     boolean isSequential,
+                     ListeningExecutorService taskExecutor)
     {
+        this.listener = listener;
         this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily,
range);
         this.isSequential = isSequential;
         this.taskExecutor = taskExecutor;
@@ -114,7 +124,8 @@ public class RepairJob
                 public void onFailure(Throwable throwable)
                 {
                     // TODO need to propagate error to RepairSession
-                    logger.error("Error while snapshot", throwable);
+                    logger.error("Error occurred during snapshot phase", throwable);
+                    listener.failedSnapshot();
                     failed = true;
                 }
             }, taskExecutor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 507dafa..346f3f4 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -74,7 +74,9 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
  * Similarly, if a job is sequential, it will handle one Differencer at a time, but will
handle
  * all of them in parallel otherwise.
  */
-public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener
+public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
+                                                              IFailureDetectionEventListener,
+                                                              IRepairJobEventListener
 {
     private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
@@ -89,9 +91,11 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
 
     private volatile Exception exception;
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
+    private final AtomicBoolean fdUnregistered = new AtomicBoolean(false);
 
     // First, all RepairJobs are added to this queue,
     final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>();
+
     // and after receiving all validation, the job is moved to
     // this map, keyed by CF name.
     final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>();
@@ -169,23 +173,32 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         assert job.desc.equals(desc);
         if (job.addTree(endpoint, tree) == 0)
         {
-            logger.debug("All response received for {}/{}", getId(), desc.columnFamily);
+            logger.debug("All responses received for {}/{}", getId(), desc.columnFamily);
             if (!job.isFailed())
             {
                 syncingJobs.put(job.desc.columnFamily, job);
                 job.submitDifferencers();
             }
 
-            // This job is complete, switching to next in line (note that only
-            // one thread will can ever do this)
+            // This job is complete, switching to next in line (note that only one thread
will ever do this)
             jobs.poll();
             RepairJob nextJob = jobs.peek();
             if (nextJob == null)
+            {
+                // Unregister from FailureDetector once we've completed synchronizing Merkle
trees.
+                // After this point, we rely on tcp_keepalive for individual sockets to notify
us when a connection is down.
+                // See CASSANDRA-3569
+                if (fdUnregistered.compareAndSet(false, true))
+                    FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
                 // We are done with this repair session as far as differencing
                 // is considered. Just inform the session
                 differencingDone.signalAll();
+            }
             else
+            {
                 nextJob.sendTreeRequests(endpoints);
+            }
         }
     }
 
@@ -271,7 +284,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(parentRepairSession, id, keyspace, cfname,
range, isSequential, taskExecutor);
+                RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname,
range, isSequential, taskExecutor);
                 jobs.offer(job);
             }
             logger.debug("Sending tree requests to endpoints {}", endpoints);
@@ -299,7 +312,13 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         {
             // mark this session as terminated
             terminate();
+
             ActiveRepairService.instance.removeFromActiveSessions(this);
+
+            // If we've reached here in an exception state without completing Merkle Tree
sync, we'll still be registered
+            // with the FailureDetector.
+            if (fdUnregistered.compareAndSet(false, true))
+                FailureDetector.instance.unregisterFailureDetectionEventListener(this);
         }
     }
 
@@ -320,11 +339,17 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         completed.signalAll();
     }
 
+    public void failedSnapshot()
+    {
+        exception = new IOException("Failed during snapshot creation.");
+        forceShutdown();
+    }
+
     void failedNode(InetAddress remote)
     {
         String errorMsg = String.format("Endpoint %s died", remote);
         exception = new IOException(errorMsg);
-        // If a node failed, we stop everything (though there could still be some activity
in the background)
+        // If a node failed during Merkle creation, we stop everything (though there could
still be some activity in the background)
         forceShutdown();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b300547..7f7325b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -135,7 +135,6 @@ public class ActiveRepairService
 
     public void removeFromActiveSessions(RepairSession session)
     {
-        FailureDetector.instance.unregisterFailureDetectionEventListener(session);
         Gossiper.instance.unregister(session);
         sessions.remove(session.getId());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 5484c83..5716ae9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -123,6 +123,7 @@ public class ConnectionHandler
             {
                 Socket socket = OutboundTcpConnectionPool.newSocket(peer);
                 socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
+                socket.setKeepAlive(true);
                 return socket;
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 411f969..1afc07e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -108,7 +108,7 @@ import org.apache.cassandra.utils.Pair;
  *       session is done is is closed (closeSession()). Otherwise, the node switch to the
WAIT_COMPLETE state and
  *       send a CompleteMessage to the other side.
  */
-public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
     public final InetAddress peer;
@@ -181,10 +181,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
     public void init(StreamResultFuture streamResult)
     {
         this.streamResult = streamResult;
-
-        // register to gossiper/FD to fail on node failure
-        Gossiper.instance.register(this);
-        FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
     public void start()
@@ -358,8 +354,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
             // incoming thread (so we would deadlock).
             handler.close();
 
-            Gossiper.instance.unregister(this);
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
             streamResult.handleSessionComplete(this);
         }
     }
@@ -613,23 +607,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
 
     public void onRemove(InetAddress endpoint)
     {
-        convict(endpoint, Double.MAX_VALUE);
+        closeSession(State.FAILED);
     }
 
     public void onRestart(InetAddress endpoint, EndpointState epState)
     {
-        convict(endpoint, Double.MAX_VALUE);
-    }
-
-    public void convict(InetAddress endpoint, double phi)
-    {
-        if (!endpoint.equals(peer))
-            return;
-
-        // We want a higher confidence in the failure detection than usual because failing
a streaming wrongly has a high cost.
-        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
-            return;
-
         closeSession(State.FAILED);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0f2d7d0b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 2fe75fa..48a7d89 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
@@ -33,6 +34,7 @@ public class StreamTransferTask extends StreamTask
     private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
 
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+    private AtomicBoolean aborted = new AtomicBoolean(false);
 
     private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
 
@@ -75,11 +77,15 @@ public class StreamTransferTask extends StreamTask
 
     public void abort()
     {
-        for (OutgoingFileMessage file : files.values())
+        // Prevent releasing reference multiple times
+        if (aborted.compareAndSet(false, true))
         {
-            file.sstable.releaseReference();
+            for (OutgoingFileMessage file : files.values())
+            {
+                file.sstable.releaseReference();
+            }
+            timeoutExecutor.shutdownNow();
         }
-        timeoutExecutor.shutdownNow();
     }
 
     public int getTotalNumberOfFiles()


Mime
View raw message