cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/7] cassandra git commit: Add keep-alive to streaming
Date Wed, 31 Aug 2016 21:08:11 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 b39d984f7 -> 77924b37c
  refs/heads/cassandra-3.0 e4a53f4d3 -> b56def49f
  refs/heads/trunk 0cd48f76d -> 42bbe0031


Add keep-alive to streaming

patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11841


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8af61ac3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8af61ac3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8af61ac3

Branch: refs/heads/trunk
Commit: 8af61ac311b8b6e8bbff410f5f0e29bc73ca8daa
Parents: 0cd48f7
Author: Paulo Motta <pauloricardomg@gmail.com>
Authored: Wed Aug 31 16:03:00 2016 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Aug 31 16:03:00 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 conf/cassandra.yaml                             |  14 +--
 .../org/apache/cassandra/config/Config.java     |   6 ++
 .../cassandra/config/DatabaseDescriptor.java    |  10 ++
 src/java/org/apache/cassandra/gms/Gossiper.java |  13 +++
 .../net/IncomingStreamingConnection.java        |   5 +-
 .../apache/cassandra/net/MessagingService.java  |   4 +-
 .../cassandra/streaming/ConnectionHandler.java  |  59 ++++++++---
 .../cassandra/streaming/StreamReader.java       |   2 +-
 .../cassandra/streaming/StreamSession.java      | 103 +++++++++++++++++--
 .../streaming/messages/KeepAliveMessage.java    |  49 +++++++++
 .../streaming/messages/StreamMessage.java       |  15 ++-
 .../cassandra/utils/CassandraVersion.java       |   2 +-
 .../cassandra/utils/CassandraVersionTest.java   |  17 +++
 15 files changed, 263 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0edfc76..4919d19 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Add keep-alive to streaming (CASSANDRA-11841)
  * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
  * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
  * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index bb21c3c..ddb1263 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -89,6 +89,9 @@ Upgrading
       snitch, encryption context. Client initialization just applies the configuration but
does not
       setup anything. Instead of using Config.setClientMode() or Config.isClientMode(), which
are
       deprecated now, use one of the appropiate new methods in DatabaseDescriptor.
+    - Application layer keep-alives were added to the streaming protocol to prevent idle
incoming connections from
+      timing out and failing the stream session (CASSANDRA-11839). This effectively deprecates
the streaming_socket_timeout_in_ms
+      property in favor of streaming_keep_alive_period_in_secs. See cassandra.yaml for more
details about this property.
 
 3.8
 ===

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5fb44cf..a25e084 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -870,13 +870,13 @@ slow_query_log_timeout_in_ms: 500
 # and the times are synchronized between the nodes.
 cross_node_timeout: false
 
-# Set socket timeout for streaming operation.
-# The stream session is failed if no data/ack is received by any of the participants
-# within that period, which means this should also be sufficient to stream a large
-# sstable or rebuild table indexes.
-# Default value is 86400000ms, which means stale streams timeout after 24 hours.
-# A value of zero means stream sockets should never time out.
-# streaming_socket_timeout_in_ms: 86400000
+# Set keep-alive period for streaming
+# This node will send a keep-alive message periodically with this period.
+# If the node does not receive a keep-alive message from the peer for
+# 2 keep-alive cycles the stream session times out and fail
+# Default value is 300s (5 minutes), which means stalled stream
+# times out in 10 minutes by default
+# streaming_keep_alive_period_in_secs: 300
 
 # phi value that must be reached for a host to be marked down.
 # most users should never need to adjust this.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 87f664d..f2f21ad 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -96,8 +96,14 @@ public class Config
 
     public volatile Long truncate_request_timeout_in_ms = 60000L;
 
+    /**
+     * @deprecated use {@link this#streaming_keep_alive_period_in_secs} instead
+     */
+    @Deprecated
     public Integer streaming_socket_timeout_in_ms = 86400000; //24 hours
 
+    public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes
+
     public boolean cross_node_timeout = false;
 
     public volatile long slow_query_log_timeout_in_ms = 500L;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4568649..dc4cc36 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2130,11 +2130,21 @@ public class DatabaseDescriptor
         conf.streaming_socket_timeout_in_ms = value;
     }
 
+    /**
+     * @deprecated use {@link this#getStreamingKeepAlivePeriod()} instead
+     * @return streaming_socket_timeout_in_ms property
+     */
+    @Deprecated
     public static int getStreamingSocketTimeout()
     {
         return conf.streaming_socket_timeout_in_ms;
     }
 
+    public static int getStreamingKeepAlivePeriod()
+    {
+        return conf.streaming_keep_alive_period_in_secs;
+    }
+
     public static String getLocalDataCenter()
     {
         return localDC;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 18de598..20a9192 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -32,6 +32,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Uninterruptibles;
 
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1600,4 +1601,16 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return System.currentTimeMillis() + Gossiper.aVeryLongTime;
     }
 
+    public CassandraVersion getReleaseVersion(InetAddress ep)
+    {
+        EndpointState state = getEndpointStateForEndpoint(ep);
+        if (state != null)
+        {
+            VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
+            if (applicationState != null)
+                return new CassandraVersion(applicationState.value);
+        }
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 4d23375..b97b836 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -75,10 +75,9 @@ public class IncomingStreamingConnection extends Thread implements Closeable
             // parallelize said streams and the socket is blocking, so we might deadlock.
             StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description,
init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental);
         }
-        catch (IOException e)
+        catch (Throwable t)
         {
-            logger.error(String.format("IOException while reading from socket from %s, closing:
%s",
-                                       socket.getRemoteSocketAddress(), e));
+            logger.error("Error while reading from socket from {}.", socket.getRemoteSocketAddress(),
t);
             close();
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 7cbe315..e72a9a2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1233,9 +1233,9 @@ public final class MessagingService implements MessagingServiceMBean
                     logger.error("SSL handshake error for inbound connection from " + socket,
e);
                     FileUtils.closeQuietly(socket);
                 }
-                catch (IOException e)
+                catch (Throwable t)
                 {
-                    logger.trace("Error reading the socket " + socket, e);
+                    logger.trace("Error reading the socket {}", socket, t);
                     FileUtils.closeQuietly(socket);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 71ea7bd..f2e0f9c 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -61,14 +61,15 @@ public class ConnectionHandler
     private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class);
 
     private final StreamSession session;
+    private int incomingSocketTimeout;
 
     private IncomingMessageHandler incoming;
     private OutgoingMessageHandler outgoing;
 
-    ConnectionHandler(StreamSession session)
+    ConnectionHandler(StreamSession session, int incomingSocketTimeout)
     {
         this.session = session;
-        this.incoming = new IncomingMessageHandler(session);
+        this.incoming = new IncomingMessageHandler(session, incomingSocketTimeout);
         this.outgoing = new OutgoingMessageHandler(session);
     }
 
@@ -84,13 +85,13 @@ public class ConnectionHandler
     {
         logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId());
         Socket incomingSocket = session.createConnection();
-        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION);
-        incoming.sendInitMessage(incomingSocket, true);
+        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
+        incomingSocket.shutdownOutput();
 
         logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId());
         Socket outgoingSocket = session.createConnection();
-        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION);
-        outgoing.sendInitMessage(outgoingSocket, false);
+        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
+        outgoingSocket.shutdownInput();
     }
 
     /**
@@ -103,9 +104,15 @@ public class ConnectionHandler
     public void initiateOnReceivingSide(IncomingStreamingConnection connection, boolean isForOutgoing,
int version) throws IOException
     {
         if (isForOutgoing)
+        {
             outgoing.start(connection, version);
+            outgoing.socket.shutdownInput();
+        }
         else
+        {
             incoming.start(connection, version);
+            incoming.socket.shutdownOutput();
+        }
     }
 
     public ListenableFuture<?> close()
@@ -160,14 +167,16 @@ public class ConnectionHandler
         protected final StreamSession session;
 
         protected int protocolVersion;
+        private final boolean isOutgoingHandler;
         protected Socket socket;
 
         private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>();
         private IncomingStreamingConnection incomingConnection;
 
-        protected MessageHandler(StreamSession session)
+        protected MessageHandler(StreamSession session, boolean isOutgoingHandler)
         {
             this.session = session;
+            this.isOutgoingHandler = isOutgoingHandler;
         }
 
         protected abstract String name();
@@ -189,14 +198,14 @@ public class ConnectionHandler
         }
 
         @SuppressWarnings("resource")
-        public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException
+        private void sendInitMessage() throws IOException
         {
             StreamInitMessage message = new StreamInitMessage(
                     FBUtilities.getBroadcastAddress(),
                     session.sessionIndex(),
                     session.planId(),
                     session.description(),
-                    isForOutgoing,
+                    !isOutgoingHandler,
                     session.keepSSTableLevel(),
                     session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
@@ -205,16 +214,18 @@ public class ConnectionHandler
             out.flush();
         }
 
-        public void start(IncomingStreamingConnection connection, int protocolVersion)
+        public void start(IncomingStreamingConnection connection, int protocolVersion) throws
IOException
         {
             this.incomingConnection = connection;
-            start(connection.socket, protocolVersion);
+            start(connection.socket, protocolVersion, false);
         }
 
-        public void start(Socket socket, int protocolVersion)
+        public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException
         {
             this.socket = socket;
             this.protocolVersion = protocolVersion;
+            if (initiator)
+                sendInitMessage();
 
             new FastThreadLocalThread(this, name() + "-" + socket.getRemoteSocketAddress()).start();
         }
@@ -270,9 +281,26 @@ public class ConnectionHandler
      */
     static class IncomingMessageHandler extends MessageHandler
     {
-        IncomingMessageHandler(StreamSession session)
+        private final int socketTimeout;
+
+        IncomingMessageHandler(StreamSession session, int socketTimeout)
+        {
+            super(session, false);
+            this.socketTimeout = socketTimeout;
+        }
+
+        @Override
+        public void start(Socket socket, int version, boolean initiator) throws IOException
         {
-            super(session);
+            try
+            {
+                socket.setSoTimeout(socketTimeout);
+            }
+            catch (SocketException e)
+            {
+                logger.warn("Could not set incoming socket timeout to {}", socketTimeout,
e);
+            }
+            super.start(socket, version, initiator);
         }
 
         protected String name()
@@ -332,7 +360,7 @@ public class ConnectionHandler
 
         OutgoingMessageHandler(StreamSession session)
         {
-            super(session);
+            super(session, true);
         }
 
         protected String name()
@@ -388,6 +416,7 @@ public class ConnectionHandler
             {
                 StreamMessage.serialize(message, out, protocolVersion, session);
                 out.flush();
+                message.sent();
             }
             catch (SocketException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 816f028..6465bf7 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -128,7 +128,7 @@ public class StreamReader
         {
             if (deserializer != null)
                 logger.warn("[Stream {}] Error while reading partition {} from stream on
ks='{}' and table='{}'.",
-                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(),
cfs.getTableName());
+                            session.planId(), deserializer.partitionKey(), cfs.keyspace.getName(),
cfs.getTableName(), e);
             if (writer != null)
             {
                 writer.abort(e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 3af31f8..736d30f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
@@ -46,6 +47,7 @@ import org.apache.cassandra.gms.*;
 import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.messages.*;
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
@@ -116,7 +118,17 @@ import org.apache.cassandra.utils.concurrent.Refs;
  */
 public class StreamSession implements IEndpointStateChangeSubscriber
 {
+
+    /**
+     * Version where keep-alive support was added
+     */
+    private static final CassandraVersion STREAM_KEEP_ALIVE = new CassandraVersion("3.10");
     private static final Logger logger = LoggerFactory.getLogger(StreamSession.class);
+    private static final DebuggableScheduledThreadPoolExecutor keepAliveExecutor = new DebuggableScheduledThreadPoolExecutor("StreamKeepAliveExecutor");
+    static {
+        // Immediately remove keep-alive task when cancelled.
+        keepAliveExecutor.setRemoveOnCancelPolicy(true);
+    }
 
     /**
      * Streaming endpoint.
@@ -149,6 +161,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     private AtomicBoolean isAborted = new AtomicBoolean(false);
     private final boolean keepSSTableLevel;
     private final boolean isIncremental;
+    private ScheduledFuture<?> keepAliveFuture = null;
 
     public static enum State
     {
@@ -176,7 +189,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         this.connecting = connecting;
         this.index = index;
         this.factory = factory;
-        this.handler = new ConnectionHandler(this);
+        this.handler = new ConnectionHandler(this, isKeepAliveSupported()?
+                                                   (int)TimeUnit.SECONDS.toMillis(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod())
:
+                                                   DatabaseDescriptor.getStreamingSocketTimeout());
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
         this.isIncremental = isIncremental;
@@ -214,6 +229,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return receivers.get(cfId).getTransaction();
     }
 
+    private boolean isKeepAliveSupported()
+    {
+        CassandraVersion peerVersion = Gossiper.instance.getReleaseVersion(peer);
+        return STREAM_KEEP_ALIVE.isSupportedBy(peerVersion);
+    }
+
     /**
      * Bind this session to report to specific {@link StreamResultFuture} and
      * perform pre-streaming initialization.
@@ -224,6 +245,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     {
         this.streamResult = streamResult;
         StreamHook.instance.reportStreamFuture(this, streamResult);
+
+        if (isKeepAliveSupported())
+            scheduleKeepAliveTask();
+        else
+            logger.debug("Peer {} does not support keep-alive.", peer);
     }
 
     public void start()
@@ -441,6 +467,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                     task.abort();
             }
 
+            if (keepAliveFuture != null)
+            {
+                logger.debug("[Stream #{}] Finishing keep-alive task.", planId());
+                keepAliveFuture.cancel(false);
+                keepAliveFuture = null;
+            }
+
             // Note that we shouldn't block on this close because this method is called on
the handler
             // incoming thread (so we would deadlock).
             handler.close();
@@ -530,12 +563,30 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     public void onError(Throwable e)
     {
+        logError(e);
+        // send session failure message
+        if (handler.isOutgoingConnected())
+            handler.sendMessage(new SessionFailedMessage());
+        // fail session
+        closeSession(State.FAILED);
+    }
+
+    private void logError(Throwable e)
+    {
         if (e instanceof SocketTimeoutException)
         {
-            logger.error("[Stream #{}] Streaming socket timed out. This means the session
peer stopped responding or " +
-                         "is still processing received data. If there is no sign of failure
in the other end or a very " +
-                         "dense table is being transferred you may want to increase streaming_socket_timeout_in_ms
" +
-                         "property. Current value is {}ms.", planId(), DatabaseDescriptor.getStreamingSocketTimeout(),
e);
+            if (isKeepAliveSupported())
+                logger.error("[Stream #{}] Did not receive response from peer {}{} for {}
secs. Is peer down? " +
+                             "If not, maybe try increasing streaming_keep_alive_period_in_secs.",
planId(),
+                             peer.getHostAddress(),
+                             peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
+                             2 * DatabaseDescriptor.getStreamingKeepAlivePeriod(),
+                             e);
+            else
+                logger.error("[Stream #{}] Streaming socket timed out. This means the session
peer stopped responding or " +
+                             "is still processing received data. If there is no sign of failure
in the other end or a very " +
+                             "dense table is being transferred you may want to increase streaming_socket_timeout_in_ms
" +
+                             "property. Current value is {}ms.", planId(), DatabaseDescriptor.getStreamingSocketTimeout(),
e);
         }
         else
         {
@@ -544,11 +595,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                                                                                         
   peer.equals(connecting) ? "" : " through " + connecting.getHostAddress(),
                                                                                         
   e);
         }
-        // send session failure message
-        if (handler.isOutgoingConnected())
-            handler.sendMessage(new SessionFailedMessage());
-        // fail session
-        closeSession(State.FAILED);
     }
 
     /**
@@ -642,6 +688,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         }
     }
 
+    private synchronized void scheduleKeepAliveTask()
+    {
+        if (keepAliveFuture == null)
+        {
+            int keepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod();
+            logger.debug("[Stream #{}] Scheduling keep-alive task with {}s period.", planId(),
keepAlivePeriod);
+            keepAliveFuture = keepAliveExecutor.scheduleAtFixedRate(new KeepAliveTask(),
0, keepAlivePeriod, TimeUnit.SECONDS);
+        }
+    }
+
     /**
      * Call back on receiving {@code StreamMessage.Type.SESSION_FAILED} message.
      */
@@ -754,4 +810,31 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 taskCompleted(task); // there is no file to send
         }
     }
+
+    class KeepAliveTask implements Runnable
+    {
+        private KeepAliveMessage last = null;
+
+        public void run()
+        {
+            //to avoid jamming the message queue, we only send if the last one was sent
+            if (last == null || last.wasSent())
+            {
+                logger.trace("[Stream #{}] Sending keep-alive to {}.", planId(), peer);
+                last = new KeepAliveMessage();
+                try
+                {
+                    handler.sendMessage(last);
+                }
+                catch (RuntimeException e) //connection handler is closed
+                {
+                    logger.debug("[Stream #{}] Could not send keep-alive message (perhaps
stream session is finished?).", planId(), e);
+                }
+            }
+            else
+            {
+                logger.trace("[Stream #{}] Skip sending keep-alive to {} (previous was not
yet sent).", planId(), peer);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
new file mode 100644
index 0000000..bfdc72e
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/messages/KeepAliveMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.messages;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.streaming.StreamSession;
+
+public class KeepAliveMessage extends StreamMessage
+{
+    public static Serializer<KeepAliveMessage> serializer = new Serializer<KeepAliveMessage>()
+    {
+        public KeepAliveMessage deserialize(ReadableByteChannel in, int version, StreamSession
session) throws IOException
+        {
+            return new KeepAliveMessage();
+        }
+
+        public void serialize(KeepAliveMessage message, DataOutputStreamPlus out, int version,
StreamSession session)
+        {}
+    };
+
+    public KeepAliveMessage()
+    {
+        super(Type.KEEP_ALIVE);
+    }
+
+    public String toString()
+    {
+        return "keep-alive";
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index eb7086f..7487aaf 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -38,6 +38,8 @@ public abstract class StreamMessage
     public static final int VERSION_30 = 4;
     public static final int CURRENT_VERSION = VERSION_30;
 
+    private transient volatile boolean sent = false;
+
     public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version,
StreamSession session) throws IOException
     {
         ByteBuffer buff = ByteBuffer.allocate(1);
@@ -70,6 +72,16 @@ public abstract class StreamMessage
         }
     }
 
+    public void sent()
+    {
+        sent = true;
+    }
+
+    public boolean wasSent()
+    {
+        return sent;
+    }
+
     /** StreamMessage serializer */
     public static interface Serializer<V extends StreamMessage>
     {
@@ -85,7 +97,8 @@ public abstract class StreamMessage
         RECEIVED(3, 4, ReceivedMessage.serializer),
         RETRY(4, 4, RetryMessage.serializer),
         COMPLETE(5, 1, CompleteMessage.serializer),
-        SESSION_FAILED(6, 5, SessionFailedMessage.serializer);
+        SESSION_FAILED(6, 5, SessionFailedMessage.serializer),
+        KEEP_ALIVE(7, 5, KeepAliveMessage.serializer);
 
         public static Type get(byte type)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/src/java/org/apache/cassandra/utils/CassandraVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java
index 759ca97..aed0fe7 100644
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@ -142,7 +142,7 @@ public class CassandraVersion implements Comparable<CassandraVersion>
 
     public boolean isSupportedBy(CassandraVersion version)
     {
-        return major == version.major && this.compareTo(version) <= 0;
+        return version != null && major == version.major && this.compareTo(version)
<= 0;
     }
 
     private static int compareIdentifiers(String[] ids1, String[] ids2, int defaultPred)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8af61ac3/test/unit/org/apache/cassandra/utils/CassandraVersionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CassandraVersionTest.java b/test/unit/org/apache/cassandra/utils/CassandraVersionTest.java
index 73562b7..d8d0a8a 100644
--- a/test/unit/org/apache/cassandra/utils/CassandraVersionTest.java
+++ b/test/unit/org/apache/cassandra/utils/CassandraVersionTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -112,6 +113,22 @@ public class CassandraVersionTest
         v2 = new CassandraVersion("3.0.1");
         assertTrue(!v1.isSupportedBy(v2));
         assertTrue(v2.isSupportedBy(v1));
+
+        v1 = new CassandraVersion("3.7");
+        v2 = new CassandraVersion("3.8");
+        assertTrue(v1.isSupportedBy(v2));
+        assertTrue(!v2.isSupportedBy(v1));
+
+        v1 = new CassandraVersion("3.0.8");
+        v2 = new CassandraVersion("3.8");
+        assertTrue(v1.isSupportedBy(v2));
+        assertTrue(!v2.isSupportedBy(v1));
+        assertTrue(v2.isSupportedBy(v2));
+
+        v1 = new CassandraVersion("3.8");
+        v2 = new CassandraVersion("3.8-SNAPSHOT");
+        assertTrue(v1.isSupportedBy(v2));
+        assertTrue(v2.isSupportedBy(v1));
     }
 
     @Test


Mime
View raw message