Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6AB4D10E30 for ; Fri, 28 Feb 2014 21:33:50 +0000 (UTC) Received: (qmail 11179 invoked by uid 500); 28 Feb 2014 21:33:45 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 11149 invoked by uid 500); 28 Feb 2014 21:33:45 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 11140 invoked by uid 99); 28 Feb 2014 21:33:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Feb 2014 21:33:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 283D1931691; Fri, 28 Feb 2014 21:33:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Fri, 28 Feb 2014 21:33:44 -0000 Message-Id: <3d10ac3442294d61903f3228e1da2756@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/6] git commit: Fix NPE on BulkLoader caused by losing StreamEvent Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 fd53628cb -> b3a9a4434 refs/heads/cassandra-2.1 bbad16b7e -> dcca99684 refs/heads/trunk e449450b8 -> f6dff616f Fix NPE on BulkLoader caused by losing StreamEvent patch by yukim; reviewed by sankalp kohli for CASSANDRA-6636 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b3a9a443 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b3a9a443 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b3a9a443 Branch: refs/heads/cassandra-2.0 Commit: b3a9a443433a271fee33bede60d4892e0c8ffb03 Parents: fd53628 Author: Yuki Morishita Authored: Fri Feb 28 15:28:25 2014 -0600 Committer: Yuki Morishita Committed: Fri Feb 28 15:28:25 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/io/sstable/SSTableLoader.java | 7 +++---- src/java/org/apache/cassandra/streaming/StreamPlan.java | 11 ++++++++++- .../apache/cassandra/streaming/StreamResultFuture.java | 7 ++++++- src/java/org/apache/cassandra/tools/BulkLoader.java | 7 ++++--- 5 files changed, 24 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d6c9ae6..3e73f91 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,7 @@ * Optimize single partition batch statements (CASSANDRA-6737) * Disallow post-query re-ordering when paging (CASSANDRA-6722) * Fix potential paging bug with deleted columns (CASSANDRA-6748) + * Fix NPE on BulkLoader caused by losing StreamEvent (CASSANDRA-6636) Merged from 1.2: * Add CMSClassUnloadingEnabled JVM option (CASSANDRA-6541) * Catch memtable flush exceptions during shutdown (CASSANDRA-6735) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index f867317..1ea4c55 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -144,7 +144,7 @@ public class SSTableLoader implements StreamEventHandler return stream(Collections.emptySet()); } - public StreamResultFuture stream(Set toIgnore) + public StreamResultFuture stream(Set toIgnore, StreamEventHandler... listeners) { client.init(keyspace); outputHandler.output("Established connection to initial hosts"); @@ -175,9 +175,8 @@ public class SSTableLoader implements StreamEventHandler plan.transferFiles(remote, streamingDetails.get(remote)); } - StreamResultFuture bulkResult = plan.execute(); - bulkResult.addEventListener(this); - return bulkResult; + plan.listeners(this, listeners); + return plan.execute(); } public void onSuccess(StreamState finalState) {} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 288929c..740ad66 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -33,6 +33,7 @@ public class StreamPlan { private final UUID planId = UUIDGen.getTimeUUID(); private final String description; + private final List handlers = new ArrayList<>(); // sessions per InetAddress of the other end. private final Map sessions = new HashMap<>(); @@ -121,6 +122,14 @@ public class StreamPlan return this; } + public StreamPlan listeners(StreamEventHandler handler, StreamEventHandler... handlers) + { + this.handlers.add(handler); + if (handlers != null) + Collections.addAll(this.handlers, handlers); + return this; + } + /** * @return true if this plan has no plan to execute */ @@ -136,7 +145,7 @@ public class StreamPlan */ public StreamResultFuture execute() { - return StreamResultFuture.init(planId, description, sessions.values()); + return StreamResultFuture.init(planId, description, sessions.values(), handlers); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index ccd3c92..dcffaff 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -75,9 +75,14 @@ public final class StreamResultFuture extends AbstractFuture set(getCurrentState()); } - static StreamResultFuture init(UUID planId, String description, Collection sessions) + static StreamResultFuture init(UUID planId, String description, Collection sessions, Collection listeners) { StreamResultFuture future = createAndRegister(planId, description, sessions); + if (listeners != null) + { + for (StreamEventHandler listener : listeners) + future.addEventListener(listener); + } logger.info("[Stream #{}] Executing streaming plan for {}", planId, description); // start sessions http://git-wip-us.apache.org/repos/asf/cassandra/blob/b3a9a443/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 6c157e2..37ec635 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -79,7 +79,10 @@ public class BulkLoader StreamResultFuture future = null; try { - future = loader.stream(options.ignores); + if (options.noProgress) + future = loader.stream(options.ignores); + else + future = loader.stream(options.ignores, new ProgressIndicator()); } catch (Exception e) { @@ -94,8 +97,6 @@ public class BulkLoader } handler.output(String.format("Streaming session ID: %s", future.planId)); - if (!options.noProgress) - future.addEventListener(new ProgressIndicator()); try {