Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1AAAA200C01 for ; Thu, 19 Jan 2017 16:55:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 19448160B5C; Thu, 19 Jan 2017 15:55:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E3DBF160B42 for ; Thu, 19 Jan 2017 16:55:30 +0100 (CET) Received: (qmail 60789 invoked by uid 500); 19 Jan 2017 15:55:30 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 60754 invoked by uid 99); 19 Jan 2017 15:55:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 15:55:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E2FFEDFB86; Thu, 19 Jan 2017 15:55:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Thu, 19 Jan 2017 15:55:29 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/6] cassandra git commit: Read repair is not blocking repair to finish in foreground repair archived-at: Thu, 19 Jan 2017 15:55:32 -0000 Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 3f41d7a76 -> 48fed8016 refs/heads/cassandra-3.11 affa68fd1 -> 74559de50 refs/heads/trunk 52df6a58d -> c3d724445 Read repair is not blocking repair to finish in foreground repair patch by Sylvain Lebresne; reviewed by Xiaolong Jiang and Jason Brown for CASSANDRA-13115 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48fed801 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48fed801 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48fed801 Branch: refs/heads/cassandra-3.0 Commit: 48fed80162592f741bf29298e2064452d53de4d8 Parents: 3f41d7a Author: Sylvain Lebresne Authored: Thu Jan 12 10:03:11 2017 +0100 Committer: Sylvain Lebresne Committed: Thu Jan 19 16:49:14 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../UnfilteredPartitionIterators.java | 1 + .../cassandra/service/AsyncRepairCallback.java | 5 +- .../apache/cassandra/service/DataResolver.java | 14 ++- .../cassandra/service/DigestResolver.java | 9 +- .../apache/cassandra/service/ReadCallback.java | 4 +- .../cassandra/service/ResponseResolver.java | 12 ++ .../cassandra/service/DataResolverTest.java | 117 +++++++++++++------ 8 files changed, 119 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 97d49af..6293cfa 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.11 + * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115) * Stress daemon help is incorrect (CASSANDRA-12563) * Remove ALTER TYPE support (CASSANDRA-12443) * Fix assertion for certain legacy range tombstone pattern (CASSANDRA-12203) http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 41b1424..1abbb19 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -159,6 +159,7 @@ public abstract class UnfilteredPartitionIterators public void close() { merged.close(); + listener.close(); } }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/AsyncRepairCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java index dec5319..d613f3d 100644 --- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java +++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.service; -import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.concurrent.Stage; @@ -46,9 +45,9 @@ public class AsyncRepairCallback implements IAsyncCallback { StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable() { - protected void runMayThrow() throws DigestMismatchException, IOException + protected void runMayThrow() { - repairResolver.resolve(); + repairResolver.compareResponses(); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 4e5bfb8..01953e1 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -21,6 +21,8 @@ import java.net.InetAddress; import java.util.*; import java.util.concurrent.TimeoutException; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -40,7 +42,8 @@ import org.apache.cassandra.utils.FBUtilities; public class DataResolver extends ResponseResolver { - private final List repairResults = Collections.synchronizedList(new ArrayList<>()); + @VisibleForTesting + final List repairResults = Collections.synchronizedList(new ArrayList<>()); public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) { @@ -73,6 +76,15 @@ public class DataResolver extends ResponseResolver return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter)); } + public void compareResponses() + { + // We need to fully consume the results to trigger read repairs if appropriate + try (PartitionIterator iterator = resolve()) + { + PartitionIterators.consume(iterator); + } + } + private PartitionIterator mergeWithShortReadProtection(List results, InetAddress[] sources, DataLimits.Counter resultCounter) { // If we have only one results, there is no read repair to do and we can't get short reads http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java index 4a918a3..6a528e9 100644 --- a/src/java/org/apache/cassandra/service/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/DigestResolver.java @@ -69,6 +69,13 @@ public class DigestResolver extends ResponseResolver if (logger.isTraceEnabled()) logger.trace("resolving {} responses", responses.size()); + compareResponses(); + + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec()); + } + + public void compareResponses() throws DigestMismatchException + { long start = System.nanoTime(); // validate digests against each other; throw immediately on mismatch. @@ -87,8 +94,6 @@ public class DigestResolver extends ResponseResolver if (logger.isTraceEnabled()) logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - - return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec()); } public boolean isDataPresent() http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ReadCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java index 8747004..516384a 100644 --- a/src/java/org/apache/cassandra/service/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/ReadCallback.java @@ -219,10 +219,10 @@ public class ReadCallback implements IAsyncCallbackWithFailure { // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch. // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never - // get a digest mismatch) + // get a digest mismatch). try { - resolver.resolve(); + resolver.compareResponses(); } catch (DigestMismatchException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/src/java/org/apache/cassandra/service/ResponseResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java index e7c94a1..81b18b6 100644 --- a/src/java/org/apache/cassandra/service/ResponseResolver.java +++ b/src/java/org/apache/cassandra/service/ResponseResolver.java @@ -47,6 +47,18 @@ public abstract class ResponseResolver public abstract PartitionIterator getData(); public abstract PartitionIterator resolve() throws DigestMismatchException; + /** + * Compares received responses, potentially triggering a digest mismatch (for a digest resolver) and read-repairs + * (for a data resolver). + *

+ * This is functionally equivalent to calling {@link #resolve()} and consuming the result, but can be slightly more + * efficient in some case due to the fact that we don't care about the result itself. This is used when doing + * asynchronous read-repairs. + * + * @throws DigestMismatchException if it's a digest resolver and the responses don't match. + */ + public abstract void compareResponses() throws DigestMismatchException; + public abstract boolean isDataPresent(); public void preprocess(MessageIn message) http://git-wip-us.apache.org/repos/asf/cassandra/blob/48fed801/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index c9878d4..fd1e54e 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -129,6 +129,21 @@ public class DataResolverTest MessagingService.instance().clearMessageSinks(); } + /** + * Checks that the provided data resolver has the expected number of repair futures created. + * This method also "release" those future by faking replica responses to those repair, which is necessary or + * every test would timeout when closing the result of resolver.resolve(), since it waits on those futures. + */ + private void assertRepairFuture(DataResolver resolver, int expectedRepairs) + { + assertEquals(expectedRepairs, resolver.repairResults.size()); + + // Signal all future. We pass a completely fake response message, but it doesn't matter as we just want + // AsyncOneResponse to signal success, and it only cares about a non-null MessageIn (it collects the payload). + for (AsyncOneResponse future : resolver.repairResults) + future.response(MessageIn.create(null, null, null, null, -1)); + } + @Test public void testResolveNewerSingleRow() throws UnknownHostException { @@ -142,12 +157,15 @@ public class DataResolverTest .add("c1", "v2") .buildUpdate()))); - try(PartitionIterator data = resolver.resolve(); - RowIterator rows = Iterators.getOnlyElement(data)) + try(PartitionIterator data = resolver.resolve()) { - Row row = Iterators.getOnlyElement(rows); - assertColumns(row, "c1"); - assertColumn(cfm, row, "c1", "v2", 1); + try (RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "c1"); + assertColumn(cfm, row, "c1", "v2", 1); + } + assertRepairFuture(resolver, 1); } assertEquals(1, messageRecorder.sent.size()); @@ -172,13 +190,16 @@ public class DataResolverTest .add("c2", "v2") .buildUpdate()))); - try(PartitionIterator data = resolver.resolve(); - RowIterator rows = Iterators.getOnlyElement(data)) + try(PartitionIterator data = resolver.resolve()) { - Row row = Iterators.getOnlyElement(rows); - assertColumns(row, "c1", "c2"); - assertColumn(cfm, row, "c1", "v1", 0); - assertColumn(cfm, row, "c2", "v2", 1); + try (RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "c1", "c2"); + assertColumn(cfm, row, "c1", "v1", 0); + assertColumn(cfm, row, "c2", "v2", 1); + } + assertRepairFuture(resolver, 2); } assertEquals(2, messageRecorder.sent.size()); @@ -224,6 +245,7 @@ public class DataResolverTest assertFalse(rows.hasNext()); assertFalse(data.hasNext()); } + assertRepairFuture(resolver, 2); } assertEquals(2, messageRecorder.sent.size()); @@ -289,6 +311,7 @@ public class DataResolverTest assertFalse(rows.hasNext()); } + assertRepairFuture(resolver, 4); } assertEquals(4, messageRecorder.sent.size()); @@ -330,12 +353,15 @@ public class DataResolverTest InetAddress peer2 = peer(); resolver.preprocess(readResponseMessage(peer2, EmptyIterators.unfilteredPartition(cfm, false))); - try(PartitionIterator data = resolver.resolve(); - RowIterator rows = Iterators.getOnlyElement(data)) + try(PartitionIterator data = resolver.resolve()) { - Row row = Iterators.getOnlyElement(rows); - assertColumns(row, "c2"); - assertColumn(cfm, row, "c2", "v2", 1); + try (RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "c2"); + assertColumn(cfm, row, "c2", "v2", 1); + } + assertRepairFuture(resolver, 1); } assertEquals(1, messageRecorder.sent.size()); @@ -356,6 +382,7 @@ public class DataResolverTest try(PartitionIterator data = resolver.resolve()) { assertFalse(data.hasNext()); + assertRepairFuture(resolver, 0); } assertTrue(messageRecorder.sent.isEmpty()); @@ -376,6 +403,7 @@ public class DataResolverTest try (PartitionIterator data = resolver.resolve()) { assertFalse(data.hasNext()); + assertRepairFuture(resolver, 1); } // peer1 should get the deletion from peer2 @@ -407,12 +435,15 @@ public class DataResolverTest InetAddress peer4 = peer(); resolver.preprocess(readResponseMessage(peer4, fullPartitionDelete(cfm, dk, 2, nowInSec))); - try(PartitionIterator data = resolver.resolve(); - RowIterator rows = Iterators.getOnlyElement(data)) + try(PartitionIterator data = resolver.resolve()) { - Row row = Iterators.getOnlyElement(rows); - assertColumns(row, "two"); - assertColumn(cfm, row, "two", "B", 3); + try (RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "two"); + assertColumn(cfm, row, "two", "B", 3); + } + assertRepairFuture(resolver, 4); } // peer 1 needs to get the partition delete from peer 4 and the row from peer 3 @@ -498,6 +529,7 @@ public class DataResolverTest try (PartitionIterator data = resolver.resolve()) { assertFalse(data.hasNext()); + assertRepairFuture(resolver, 2); } assertEquals(2, messageRecorder.sent.size()); @@ -575,12 +607,16 @@ public class DataResolverTest InetAddress peer2 = peer(); resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); - try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data)) + try(PartitionIterator data = resolver.resolve()) { - Row row = Iterators.getOnlyElement(rows); - assertColumns(row, "m"); - Assert.assertNull(row.getCell(m, CellPath.create(bb(0)))); - Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1)))); + try (RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "m"); + Assert.assertNull(row.getCell(m, CellPath.create(bb(0)))); + Assert.assertNotNull(row.getCell(m, CellPath.create(bb(1)))); + } + assertRepairFuture(resolver, 1); } MessageOut msg; @@ -625,6 +661,7 @@ public class DataResolverTest try(PartitionIterator data = resolver.resolve()) { assertFalse(data.hasNext()); + assertRepairFuture(resolver, 1); } MessageOut msg; @@ -665,12 +702,16 @@ public class DataResolverTest InetAddress peer2 = peer(); resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.emptyUpdate(cfm2, dk)))); - try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data)) + try(PartitionIterator data = resolver.resolve()) { - Row row = Iterators.getOnlyElement(rows); - assertColumns(row, "m"); - ComplexColumnData cd = row.getComplexColumnData(m); - assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + try (RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "m"); + ComplexColumnData cd = row.getComplexColumnData(m); + assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + } + assertRepairFuture(resolver, 1); } Assert.assertNull(messageRecorder.sent.get(peer1)); @@ -714,12 +755,16 @@ public class DataResolverTest InetAddress peer2 = peer(); resolver.preprocess(readResponseMessage(peer2, iter(PartitionUpdate.singleRowUpdate(cfm2, dk, builder.build())), cmd)); - try(PartitionIterator data = resolver.resolve(); RowIterator rows = Iterators.getOnlyElement(data)) + try(PartitionIterator data = resolver.resolve()) { - Row row = Iterators.getOnlyElement(rows); - assertColumns(row, "m"); - ComplexColumnData cd = row.getComplexColumnData(m); - assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + try (RowIterator rows = Iterators.getOnlyElement(data)) + { + Row row = Iterators.getOnlyElement(rows); + assertColumns(row, "m"); + ComplexColumnData cd = row.getComplexColumnData(m); + assertEquals(Collections.singleton(expectedCell), Sets.newHashSet(cd)); + } + assertRepairFuture(resolver, 1); } MessageOut msg;