Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 97720117D5 for ; Wed, 20 Aug 2014 15:10:25 +0000 (UTC) Received: (qmail 2191 invoked by uid 500); 20 Aug 2014 15:10:15 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 2170 invoked by uid 500); 20 Aug 2014 15:10:15 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 2089 invoked by uid 99); 20 Aug 2014 15:10:15 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Aug 2014 15:10:15 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4FEEE954777; Wed, 20 Aug 2014 15:10:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Wed, 20 Aug 2014 15:10:17 -0000 Message-Id: In-Reply-To: <6ea58766ba904cc4ac0e5a178c8914b6@git.apache.org> References: <6ea58766ba904cc4ac0e5a178c8914b6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/10] git commit: Make StreamReceiveTask thread safe and gc friendly Make StreamReceiveTask thread safe and gc friendly patch by yukim; reviewed by benedict for CASSANDRA-7795 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4fc417c4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4fc417c4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4fc417c4 Branch: refs/heads/cassandra-2.1 Commit: 4fc417c404aab0713a7d9747d22ce7eceb777859 Parents: eeb0d4c Author: Yuki Morishita Authored: Tue Aug 19 13:31:30 2014 -0500 Committer: Yuki Morishita Committed: Wed Aug 20 09:49:39 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/StreamReceiveTask.java | 56 ++++++++++++-------- 2 files changed, 35 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4fc417c4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2b2930e..fe9f4e0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -60,6 +60,7 @@ * Track max/min timestamps for range tombstones (CASSANDRA-7647) * Fix NPE when listing saved caches dir (CASSANDRA-7632) * Fix sstableloader unable to connect encrypted node (CASSANDRA-7585) + * Make StreamReceiveTask thread safe and gc friendly (CASSANDRA-7795) Merged from 1.2: * Validate empty cell names from counter updates (CASSANDRA-7798) * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4fc417c4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 9a2568d..223a46e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -21,13 +21,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@ -35,11 +38,17 @@ import org.apache.cassandra.utils.Pair; */ public class StreamReceiveTask extends StreamTask { + private static final ThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("StreamReceiveTask", + FBUtilities.getAvailableProcessors(), + 60, TimeUnit.SECONDS); + // number of files to receive private final int totalFiles; // total size of files to receive private final long totalSize; - private volatile boolean aborted; + + // true if task is done (either completed or aborted) + private boolean done = false; // holds references to SSTables received protected Collection sstables; @@ -57,14 +66,19 @@ public class StreamReceiveTask extends StreamTask * * @param sstable SSTable file received. */ - public void received(SSTableWriter sstable) + public synchronized void received(SSTableWriter sstable) { + if (done) + return; + assert cfId.equals(sstable.metadata.cfId); - assert !aborted; sstables.add(sstable); if (sstables.size() == totalFiles) - complete(); + { + done = true; + executor.submit(new OnCompletionRunnable(this)); + } } public int getTotalNumberOfFiles() @@ -77,12 +91,6 @@ public class StreamReceiveTask extends StreamTask return totalSize; } - private void complete() - { - if (!sstables.isEmpty()) - StorageService.tasks.submit(new OnCompletionRunnable(this)); - } - private static class OnCompletionRunnable implements Runnable { private final StreamReceiveTask task; @@ -103,6 +111,7 @@ public class StreamReceiveTask extends StreamTask for (SSTableWriter writer : task.sstables) readers.add(writer.closeAndOpenReader()); lockfile.delete(); + task.sstables.clear(); if (!SSTableReader.acquireReferences(readers)) throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred"); @@ -121,17 +130,20 @@ public class StreamReceiveTask extends StreamTask } } - public void abort() + /** + * Abort this task. + * If the task already received all files and + * {@link org.apache.cassandra.streaming.StreamReceiveTask.OnCompletionRunnable} task is submitted, + * then task cannot be aborted. + */ + public synchronized void abort() { - aborted = true; - Runnable r = new Runnable() - { - public void run() - { - for (SSTableWriter writer : sstables) - writer.abort(); - } - }; - StorageService.tasks.submit(r); + if (done) + return; + + done = true; + for (SSTableWriter writer : sstables) + writer.abort(); + sstables.clear(); } }