Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0E3EC200BC8 for ; Wed, 23 Nov 2016 07:52:03 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0CFA9160AFD; Wed, 23 Nov 2016 06:52:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 33D7C160B1E for ; Wed, 23 Nov 2016 07:52:02 +0100 (CET) Received: (qmail 92518 invoked by uid 500); 23 Nov 2016 06:52:01 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 92503 invoked by uid 99); 23 Nov 2016 06:52:01 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2016 06:52:01 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EFAF318036E for ; Wed, 23 Nov 2016 06:52:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id FpSNljbOxvcc for ; Wed, 23 Nov 2016 06:51:59 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 329675F23D for ; Wed, 23 Nov 2016 06:51:58 +0000 (UTC) Received: (qmail 91306 invoked by uid 99); 23 Nov 2016 06:51:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2016 06:51:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50495E055D; Wed, 23 Nov 2016 06:51:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Wed, 23 Nov 2016 06:52:00 -0000 Message-Id: <303d3d57999a4220b62cc84ace32b46e@git.apache.org> In-Reply-To: <98a37ba0c6ff4dc58f74aecf69f81eca@git.apache.org> References: <98a37ba0c6ff4dc58f74aecf69f81eca@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/50] incubator-beam git commit: Block earlier in BoundedReadEvaluatorFactoryTest archived-at: Wed, 23 Nov 2016 06:52:03 -0000 Block earlier in BoundedReadEvaluatorFactoryTest This ensures that the reader doesn't claim the split point, which in turn ensures the dynamic split request will not be refused by the OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits flakes, as if the reader is faster than the split thread it can run past the point at which the splitter thread will attempt to split the source, which causes the reader to read all of the elements. Sleep within TestReader#advanceImpl if the reader is being dynamically split, to ensure that the dynamic split fully completes before continuing a call to advance. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a8d32e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a8d32e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a8d32e5 Branch: refs/heads/gearpump-runner Commit: 4a8d32e5d3726b851329d507a8d0392cc03f6e85 Parents: 1543ea9 Author: Thomas Groh Authored: Thu Nov 17 10:56:49 2016 -0800 Committer: Thomas Groh Committed: Thu Nov 17 14:37:47 2016 -0800 ---------------------------------------------------------------------- .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a8d32e5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index 9d8503a..e956c34 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.OffsetBasedSource; import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader; import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -142,9 +143,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); AppliedPTransform transform = read.getProducingTransformInternal(); Collection> unreadInputs = - new BoundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(transform, - 1); + new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1); Collection> outputs = new ArrayList<>(); int numIterations = 0; @@ -155,8 +154,7 @@ public class BoundedReadEvaluatorFactoryTest { Collection> newUnreadInputs = new ArrayList<>(); for (CommittedBundle shardBundle : unreadInputs) { - TransformEvaluator evaluator = - factory.forApplication(transform, null); + TransformEvaluator evaluator = factory.forApplication(transform, null); for (WindowedValue shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } @@ -178,8 +176,6 @@ public class BoundedReadEvaluatorFactoryTest { unreadInputs = newUnreadInputs; } - // We produced at least one split before we read 1000 elements, as we will attempt to split as - // quickly as possible. assertThat(numIterations, greaterThan(1)); WindowedValue[] expectedValues = new WindowedValue[numElements]; for (long i = 0L; i < numElements; i++) { @@ -343,7 +339,7 @@ public class BoundedReadEvaluatorFactoryTest { private static boolean readerClosed; private final Coder coder; private final T[] elems; - private final int awaitSplitIndex; + private final int firstSplitIndex; private transient CountDownLatch subrangesCompleted; @@ -351,11 +347,11 @@ public class BoundedReadEvaluatorFactoryTest { this(coder, elems.length, elems); } - public TestSource(Coder coder, int awaitSplitIndex, T... elems) { + public TestSource(Coder coder, int firstSplitIndex, T... elems) { super(0L, elems.length, 1L); this.elems = elems; this.coder = coder; - this.awaitSplitIndex = awaitSplitIndex; + this.firstSplitIndex = firstSplitIndex; readerClosed = false; subrangesCompleted = new CountDownLatch(2); @@ -380,7 +376,7 @@ public class BoundedReadEvaluatorFactoryTest { @Override public BoundedSource.BoundedReader createReader(PipelineOptions options) throws IOException { subrangesCompleted = new CountDownLatch(2); - return new TestReader<>(this, awaitSplitIndex, subrangesCompleted); + return new TestReader<>(this, firstSplitIndex, subrangesCompleted); } @Override @@ -405,6 +401,7 @@ public class BoundedReadEvaluatorFactoryTest { } private static class TestReader extends OffsetBasedReader { + private final Source initialSource; private final int sleepIndex; private final CountDownLatch dynamicallySplit; @@ -412,6 +409,7 @@ public class BoundedReadEvaluatorFactoryTest { TestReader(OffsetBasedSource source, int sleepIndex, CountDownLatch dynamicallySplit) { super(source); + this.initialSource = source; this.sleepIndex = sleepIndex; this.dynamicallySplit = dynamicallySplit; this.index = -1; @@ -434,9 +432,13 @@ public class BoundedReadEvaluatorFactoryTest { @Override public boolean advanceImpl() throws IOException { - if (index == sleepIndex) { + // Sleep before the sleep/split index is claimed so long as it will be claimed + if (index + 1 == sleepIndex && sleepIndex < getCurrentSource().elems.length) { try { dynamicallySplit.await(); + while (initialSource.equals(getCurrentSource())) { + // Spin until the current source is updated + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e);