Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0F9217721 for ; Tue, 8 Sep 2015 12:45:50 +0000 (UTC) Received: (qmail 67138 invoked by uid 500); 8 Sep 2015 12:45:50 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 67025 invoked by uid 500); 8 Sep 2015 12:45:50 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 66912 invoked by uid 99); 8 Sep 2015 12:45:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Sep 2015 12:45:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B9D87DF99C; Tue, 8 Sep 2015 12:45:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Tue, 08 Sep 2015 12:45:52 -0000 Message-Id: In-Reply-To: <74274b7195084cf28de728789fbce5b6@git.apache.org> References: <74274b7195084cf28de728789fbce5b6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/7] cassandra git commit: Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274) Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274) If an error occured during cleanup, scrub or upgrade (or any parallelAllSSTableOperation), the caller was immediately notified of the problem, and the method exited, executing the finally block that unmarked all of the sstables as compacting. Since the operations happen in parallel, many may still be running or waiting to run, and so another operation may operate over the same sstables, breaking the required mutual exclusivity. This patch ensures the method is not exited until all operations have completed, at which point the caller is notified of any exceptions. patch by benedict; reviewed by marcus for CASSANDRA-10274 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d769fcb3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d769fcb3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d769fcb3 Branch: refs/heads/cassandra-3.0 Commit: d769fcb397b6c5937561194b9e8f9dd596ffcd18 Parents: 0c0f1ff Author: Benedict Elliott Smith Authored: Mon Sep 7 12:23:57 2015 +0100 Committer: Benedict Elliott Smith Committed: Tue Sep 8 13:39:56 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 6 +++ .../db/compaction/CompactionManager.java | 51 +++++--------------- .../org/apache/cassandra/utils/FBUtilities.java | 19 ++++++-- .../org/apache/cassandra/utils/Throwables.java | 16 ++++++ 4 files changed, 51 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5dffb9b..fdba8ed 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,4 @@ +<<<<<<< HEAD 2.2.2 * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222) @@ -6,6 +7,11 @@ * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks (CASSANDRA-10199) Merged from 2.1: +======= +2.1.10 + * Scrub, Cleanup and Upgrade do not unmark compacting until all operations + have completed, regardless of the occurence of exceptions (CASSANDRA-10274) +>>>>>>> 04e789b... Cleanup, scrub and upgrade may unmark compacting early (CASSANDRA-10274) * Fix handling of streaming EOF (CASSANDRA-10206) * Only check KeyCache when it is enabled * Change streaming_socket_timeout_in_ms default to 1 hour (CASSANDRA-8611) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 5e1b31c..495c5ab 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -41,8 +41,6 @@ import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.*; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -71,13 +69,7 @@ import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.CloseableIterator; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.MerkleTree; -import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.WrappedRunnable; -import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Refs; @@ -246,6 +238,7 @@ public class CompactionManager implements CompactionManagerMBean @SuppressWarnings("resource") private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType operationType) throws ExecutionException, InterruptedException { + List transactions = new ArrayList<>(); try (LifecycleTransaction compacting = cfs.markAllCompacting(operationType);) { Iterable sstables = Lists.newArrayList(operation.filterSSTables(compacting)); @@ -255,7 +248,7 @@ public class CompactionManager implements CompactionManagerMBean return AllSSTableOpStatus.SUCCESSFUL; } - List>> futures = new ArrayList<>(); + List> futures = new ArrayList<>(); for (final SSTableReader sstable : sstables) { @@ -266,7 +259,8 @@ public class CompactionManager implements CompactionManagerMBean } final LifecycleTransaction txn = compacting.split(singleton(sstable)); - futures.add(Pair.create(txn,executor.submit(new Callable() + transactions.add(txn); + futures.add(executor.submit(new Callable() { @Override public Object call() throws Exception @@ -274,39 +268,20 @@ public class CompactionManager implements CompactionManagerMBean operation.execute(txn); return this; } - }))); + })); } assert compacting.originals().isEmpty(); - - //Collect all exceptions - Exception exception = null; - - for (Pair> f : futures) - { - try - { - f.right.get(); - } - catch (InterruptedException | ExecutionException e) - { - if (exception == null) - exception = new Exception(); - - exception.addSuppressed(e); - } - finally - { - f.left.close(); - } - } - - if (exception != null) - Throwables.propagate(exception); - + FBUtilities.waitOnFutures(futures); return AllSSTableOpStatus.SUCCESSFUL; } + finally + { + Throwable fail = Throwables.close(null, transactions); + if (fail != null) + logger.error("Failed to cleanup lifecycle transactions {}", fail); + } } private static interface OneSSTableOperation http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index ce118b9..b41bdab 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -334,10 +334,23 @@ public class FBUtilities return System.currentTimeMillis() * 1000; } - public static void waitOnFutures(Iterable> futures) + public static List waitOnFutures(Iterable> futures) { - for (Future f : futures) - waitOnFuture(f); + List results = new ArrayList<>(); + Throwable fail = null; + for (Future f : futures) + { + try + { + results.add(f.get()); + } + catch (InterruptedException | ExecutionException e) + { + fail = Throwables.merge(fail, e); + } + } + Throwables.maybeFail(fail); + return results; } public static T waitOnFuture(Future future) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d769fcb3/src/java/org/apache/cassandra/utils/Throwables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java index 0a2bd28..a895f31 100644 --- a/src/java/org/apache/cassandra/utils/Throwables.java +++ b/src/java/org/apache/cassandra/utils/Throwables.java @@ -34,4 +34,20 @@ public class Throwables if (fail != null) com.google.common.base.Throwables.propagate(fail); } + + public static Throwable close(Throwable accumulate, Iterable closeables) + { + for (AutoCloseable closeable : closeables) + { + try + { + closeable.close(); + } + catch (Throwable t) + { + accumulate = merge(accumulate, t); + } + } + return accumulate; + } }