Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 829E310DD0 for ; Mon, 24 Mar 2014 12:56:02 +0000 (UTC) Received: (qmail 95560 invoked by uid 500); 24 Mar 2014 12:56:00 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 95497 invoked by uid 500); 24 Mar 2014 12:55:59 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 95471 invoked by uid 99); 24 Mar 2014 12:55:58 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Mar 2014 12:55:58 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5D7BA988C9D; Mon, 24 Mar 2014 12:55:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Mon, 24 Mar 2014 12:56:00 -0000 Message-Id: In-Reply-To: <9c15e5f909b3460580315f717902e946@git.apache.org> References: <9c15e5f909b3460580315f717902e946@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/9] git commit: Fix SSTable not released if stream fails before it starts Fix SSTable not released if stream fails before it starts patch by yukim; reviewed by Richard Low for CASSANDRA-6818 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35d4b5de Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35d4b5de Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35d4b5de Branch: refs/heads/trunk Commit: 35d4b5de8f3ee18ec98b01f3aa0951df0e11e8d2 Parents: b7bb2fb Author: Yuki Morishita Authored: Mon Mar 24 07:44:19 2014 -0500 Committer: Yuki Morishita Committed: Mon Mar 24 07:44:19 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/streaming/AbstractStreamSession.java | 2 -- src/java/org/apache/cassandra/streaming/StreamInSession.java | 5 +++++ src/java/org/apache/cassandra/streaming/StreamOutSession.java | 5 +++++ 4 files changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 960b0e9..fa46c2e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -23,6 +23,7 @@ * Avoid NPEs when receiving table changes for an unknown keyspace (CASSANDRA-5631) * Fix bootstrapping when there is no schema (CASSANDRA-6685) * Fix truncating compression metadata (CASSANDRA-6791) + * Fix SSTable not released if stream session fails before starts (CASSANDRA-6818) 1.2.15 http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java index 89fbf5f..f8de827 100644 --- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java +++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java @@ -44,8 +44,6 @@ public abstract class AbstractStreamSession implements IEndpointStateChangeSubsc this.sessionId = sessionId; this.table = table; this.callback = callback; - Gossiper.instance.register(this); - FailureDetector.instance.registerFailureDetectionEventListener(this); } public UUID getSessionId() http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index e83a5b6..f9cdc31 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -24,6 +24,7 @@ import java.net.Socket; import java.util.*; import java.util.concurrent.ConcurrentMap; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.SSTableWriter; import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.cliffc.high_scale_lib.NonBlockingHashSet; @@ -61,6 +62,8 @@ public class StreamInSession extends AbstractStreamSession public static StreamInSession create(InetAddress host, IStreamCallback callback) { StreamInSession session = new StreamInSession(host, UUIDGen.getTimeUUID(), callback); + Gossiper.instance.register(session); + FailureDetector.instance.registerFailureDetectionEventListener(session); sessions.put(session.getSessionId(), session); return session; } @@ -71,6 +74,8 @@ public class StreamInSession extends AbstractStreamSession if (session == null) { StreamInSession possibleNew = new StreamInSession(host, sessionId, null); + Gossiper.instance.register(possibleNew); + FailureDetector.instance.registerFailureDetectionEventListener(possibleNew); if ((session = sessions.putIfAbsent(sessionId, possibleNew)) == null) session = possibleNew; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/35d4b5de/src/java/org/apache/cassandra/streaming/StreamOutSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java index edc07ca..c4d7695 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java @@ -25,6 +25,8 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.UUIDGen; @@ -154,6 +156,9 @@ public class StreamOutSession extends AbstractStreamSession public void begin() { + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); + PendingFile first = files.isEmpty() ? null : files.values().iterator().next(); currentFile = first == null ? null : first.getFilename(); StreamHeader header = new StreamHeader(table, getSessionId(), first, files.values());