Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A754E200B3C for ; Tue, 28 Jun 2016 10:27:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A60C2160A73; Tue, 28 Jun 2016 08:27:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A04E8160A6F for ; Tue, 28 Jun 2016 10:27:58 +0200 (CEST) Received: (qmail 89026 invoked by uid 500); 28 Jun 2016 08:27:56 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 88499 invoked by uid 99); 28 Jun 2016 08:27:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2016 08:27:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4269CED22A; Tue, 28 Jun 2016 08:27:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Tue, 28 Jun 2016 08:28:06 -0000 Message-Id: In-Reply-To: <61e5c437549f4222a87f5aa3ccb9f03a@git.apache.org> References: <61e5c437549f4222a87f5aa3ccb9f03a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/21] cassandra git commit: Remove finished incoming streaming connections from MessagingService archived-at: Tue, 28 Jun 2016 08:27:59 -0000 Remove finished incoming streaming connections from MessagingService patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11854 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2811f15b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2811f15b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2811f15b Branch: refs/heads/trunk Commit: 2811f15bc9117fed4fb38de490d25d68df4e85b7 Parents: 341b3fb Author: Paulo Motta Authored: Mon Jun 27 12:17:33 2016 -0300 Committer: Marcus Eriksson Committed: Tue Jun 28 10:13:24 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../net/IncomingStreamingConnection.java | 4 +- .../apache/cassandra/net/MessagingService.java | 12 +++++- .../cassandra/streaming/ConnectionHandler.java | 39 +++++++++++++++----- .../cassandra/streaming/StreamResultFuture.java | 27 +++++++------- .../streaming/StreamingTransferTest.java | 23 ++++++++++++ 6 files changed, 79 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 620568d..5741241 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854) * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) * Prevent select statements with clustering key > 64k (CASSANDRA-11882) * Avoid marking too many sstables as repaired (CASSANDRA-11696) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index 5ced786..bfe92f9 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -40,7 +40,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class); private final int version; - private final Socket socket; + public final Socket socket; private final Set group; public IncomingStreamingConnection(int version, Socket socket, Set group) @@ -71,7 +71,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, socket, init.isForOutgoing, version); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 82320b1..ac8ad79 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -955,10 +955,12 @@ public final class MessagingService implements MessagingServiceMBean return ret; } - private static class SocketThread extends Thread + @VisibleForTesting + public static class SocketThread extends Thread { private final ServerSocket server; - private final Set connections = Sets.newConcurrentHashSet(); + @VisibleForTesting + public final Set connections = Sets.newConcurrentHashSet(); SocketThread(ServerSocket server, String name) { @@ -1145,4 +1147,10 @@ public final class MessagingService implements MessagingServiceMBean } return result; } + + @VisibleForTesting + public List getSocketThreads() + { + return socketThreads; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 52268b2..60ce11e 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.io.util.DataOutputStreamAndChannel; +import org.apache.cassandra.net.IncomingStreamingConnection; import org.apache.cassandra.streaming.messages.StreamInitMessage; import org.apache.cassandra.streaming.messages.StreamMessage; import org.apache.cassandra.utils.FBUtilities; @@ -89,16 +90,16 @@ public class ConnectionHandler /** * Set up outgoing message handler on receiving side. * - * @param socket socket to use for {@link org.apache.cassandra.streaming.ConnectionHandler.OutgoingMessageHandler}. + * @param connection Incoming connection to use for {@link OutgoingMessageHandler}. * @param version Streaming message version * @throws IOException */ - public void initiateOnReceivingSide(Socket socket, boolean isForOutgoing, int version) throws IOException + public void initiateOnReceivingSide(IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException { if (isForOutgoing) - outgoing.start(socket, version); + outgoing.start(connection, version); else - incoming.start(socket, version); + incoming.start(connection, version); } public ListenableFuture close() @@ -156,6 +157,7 @@ public class ConnectionHandler protected Socket socket; private final AtomicReference> closeFuture = new AtomicReference<>(); + private IncomingStreamingConnection incomingConnection; protected MessageHandler(StreamSession session) { @@ -191,6 +193,12 @@ public class ConnectionHandler getWriteChannel(socket).write(messageBuf); } + public void start(IncomingStreamingConnection connection, int protocolVersion) + { + this.incomingConnection = connection; + start(connection.socket, protocolVersion); + } + public void start(Socket socket, int protocolVersion) { this.socket = socket; @@ -218,15 +226,26 @@ public class ConnectionHandler closeFuture.get().set(null); // We can now close the socket - try + if (incomingConnection != null) { - socket.close(); + //this will close the underlying socket and remove it + //from active MessagingService connections (CASSANDRA-11854) + incomingConnection.close(); } - catch (IOException e) + else { - // Erroring out while closing shouldn't happen but is not really a big deal, so just log - // it at DEBUG and ignore otherwise. - logger.debug("Unexpected error while closing streaming connection", e); + //this is an outgoing connection not registered in the MessagingService + //so we can close the socket directly + try + { + socket.close(); + } + catch (IOException e) + { + // Erroring out while closing shouldn't happen but is not really a big deal, so just log + // it at DEBUG and ignore otherwise. + logger.debug("Unexpected error while closing streaming connection", e); + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 6a6f2b9..5c9c6de 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -19,7 +19,6 @@ package org.apache.cassandra.streaming; import java.io.IOException; import java.net.InetAddress; -import java.net.Socket; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; @@ -28,6 +27,8 @@ import com.google.common.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.net.IncomingStreamingConnection; + /** * A future on the result ({@link StreamState}) of a streaming plan. * @@ -83,7 +84,7 @@ public final class StreamResultFuture extends AbstractFuture future.addEventListener(listener); } - logger.info("[Stream #{}] Executing streaming plan for {}", planId, description); + logger.info("[Stream #{}] Executing streaming plan for {}", planId, description); // Initialize and start all sessions for (final StreamSession session : coordinator.getAllStreamSessions()) @@ -99,7 +100,7 @@ public final class StreamResultFuture extends AbstractFuture UUID planId, String description, InetAddress from, - Socket socket, + IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException { @@ -112,7 +113,7 @@ public final class StreamResultFuture extends AbstractFuture future = new StreamResultFuture(planId, description); StreamManager.instance.registerReceiving(future); } - future.attachSocket(from, sessionIndex, socket, isForOutgoing, version); + future.attachConnection(from, sessionIndex, connection, isForOutgoing, version); logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description); return future; } @@ -124,11 +125,11 @@ public final class StreamResultFuture extends AbstractFuture return future; } - private void attachSocket(InetAddress from, int sessionIndex, Socket socket, boolean isForOutgoing, int version) throws IOException + private void attachConnection(InetAddress from, int sessionIndex, IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException { - StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, socket.getInetAddress()); + StreamSession session = coordinator.getOrCreateSessionById(from, sessionIndex, connection.socket.getInetAddress()); session.init(this); - session.handler.initiateOnReceivingSide(socket, isForOutgoing, version); + session.handler.initiateOnReceivingSide(connection, isForOutgoing, version); } public void addEventListener(StreamEventHandler listener) @@ -164,12 +165,12 @@ public final class StreamResultFuture extends AbstractFuture { SessionInfo sessionInfo = session.getSessionInfo(); logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {} files({} bytes), sending {} files({} bytes)", - session.planId(), - session.sessionIndex(), - sessionInfo.getTotalFilesToReceive(), - sessionInfo.getTotalSizeToReceive(), - sessionInfo.getTotalFilesToSend(), - sessionInfo.getTotalSizeToSend()); + session.planId(), + session.sessionIndex(), + sessionInfo.getTotalFilesToReceive(), + sessionInfo.getTotalSizeToReceive(), + sessionInfo.getTotalFilesToSend(), + sessionInfo.getTotalSizeToSend()); StreamEvent.SessionPreparedEvent event = new StreamEvent.SessionPreparedEvent(planId, sessionInfo); coordinator.addSessionInfo(sessionInfo); fireStreamEvent(event); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2811f15b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 31dc492..abff812 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -52,6 +52,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableUtils; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CounterId; @@ -206,11 +207,33 @@ public class StreamingTransferTest extends SchemaLoader // wrapped range ranges.add(new Range(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getColumnFamilyName()).execute().get(); + verifyConnectionsAreClosed(); } private void transfer(SSTableReader sstable, List> ranges) throws Exception { new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))).execute().get(); + verifyConnectionsAreClosed(); + } + + /** + * Test that finished incoming connections are removed from MessagingService (CASSANDRA-11854) + */ + private void verifyConnectionsAreClosed() throws InterruptedException + { + //after stream session is finished, message handlers may take several milliseconds to be closed + outer: + for (int i = 0; i <= 10; i++) + { + for (MessagingService.SocketThread socketThread : MessagingService.instance().getSocketThreads()) + if (!socketThread.connections.isEmpty()) + { + Thread.sleep(100); + continue outer; + } + return; + } + fail("Streaming connections remain registered in MessagingService"); } private Collection makeStreamingDetails(List> ranges, Refs sstables)