Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3EF751891E for ; Thu, 10 Mar 2016 19:04:34 +0000 (UTC) Received: (qmail 86832 invoked by uid 500); 10 Mar 2016 19:04:34 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 86792 invoked by uid 500); 10 Mar 2016 19:04:34 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 86770 invoked by uid 99); 10 Mar 2016 19:04:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Mar 2016 19:04:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DB0FADFBDE; Thu, 10 Mar 2016 19:04:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Thu, 10 Mar 2016 19:04:33 -0000 Message-Id: <9cf199f5a9a94fbb864ec3e8299f712c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/15] cassandra git commit: Fix streaming_socket_timeout_in_ms not enforced Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 e94a2a034 -> 561000aa3 refs/heads/cassandra-2.2 7e220bc98 -> b9ff7fe13 refs/heads/cassandra-3.0 232e12b94 -> c9d2b7add refs/heads/cassandra-3.5 f27ab2908 -> 10d61d46d refs/heads/trunk bdd9e3b16 -> 9d527ed25 Fix streaming_socket_timeout_in_ms not enforced Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11286 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/561000aa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/561000aa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/561000aa Branch: refs/heads/cassandra-2.1 Commit: 561000aa3094699bab29766d9644ff50f6cb74f3 Parents: e94a2a0 Author: Paulo Motta Authored: Fri Feb 12 12:17:01 2016 -0300 Committer: Yuki Morishita Committed: Thu Mar 10 12:54:24 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 7 +++---- .../net/IncomingStreamingConnection.java | 7 ++++++- .../cassandra/streaming/ConnectionHandler.java | 21 +++++++++++++------- .../cassandra/streaming/StreamSession.java | 2 ++ 5 files changed, 26 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e7c997a..4b505f8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.14 + * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286) * Avoid dropping message too quickly due to missing unit conversion (CASSANDRA-11302) * COPY FROM on large datasets: fix progress report and debug performance (CASSANDRA-11053) * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176) http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 1fa04e6..0da4800 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -630,10 +630,9 @@ request_timeout_in_ms: 10000 # and the times are synchronized between the nodes. cross_node_timeout: false -# Enable socket timeout for streaming operation. -# When a timeout occurs during streaming, streaming is retried from the start -# of the current file. This _can_ involve re-streaming an important amount of -# data, so you should avoid setting the value too low. +# Set socket timeout for streaming operation. +# The stream session is failed if no data is received by any of the +# participants within that period. # Default value is 3600000, which means streams timeout after an hour. # streaming_socket_timeout_in_ms: 3600000 http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index 1f98bc4..5ced786 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -27,6 +27,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; @@ -62,6 +63,10 @@ public class IncomingStreamingConnection extends Thread implements Closeable DataInput input = new DataInputStream(socket.getInputStream()); StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version); + //Set SO_TIMEOUT on follower side + if (!init.isForOutgoing) + socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); + // The initiator makes two connections, one for incoming and one for outgoing. // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to @@ -74,7 +79,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable close(); } } - + @Override public void close() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index ac267f9..52268b2 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -105,12 +105,22 @@ public class ConnectionHandler { logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer); - ListenableFuture inClosed = incoming == null ? Futures.immediateFuture(null) : incoming.close(); - ListenableFuture outClosed = outgoing == null ? Futures.immediateFuture(null) : outgoing.close(); + ListenableFuture inClosed = closeIncoming(); + ListenableFuture outClosed = closeOutgoing(); return Futures.allAsList(inClosed, outClosed); } + public ListenableFuture closeOutgoing() + { + return outgoing == null ? Futures.immediateFuture(null) : outgoing.close(); + } + + public ListenableFuture closeIncoming() + { + return incoming == null ? Futures.immediateFuture(null) : incoming.close(); + } + /** * Enqueue messages to be sent. * @@ -165,11 +175,8 @@ public class ConnectionHandler protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException { - ReadableByteChannel in = socket.getChannel(); - // socket channel is null when encrypted(SSL) - return in == null - ? Channels.newChannel(socket.getInputStream()) - : in; + //we do this instead of socket.getChannel() so socketSoTimeout is respected + return Channels.newChannel(socket.getInputStream()); } public void sendInitMessage(Socket socket, boolean isForOutgoing) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/561000aa/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 98a6f1f..642e837 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -609,6 +609,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber else { state(State.WAIT_COMPLETE); + handler.closeIncoming(); } } @@ -696,6 +697,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber handler.sendMessage(new CompleteMessage()); completeSent = true; state(State.WAIT_COMPLETE); + handler.closeOutgoing(); } } return completed;