Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6C3BC200C3D for ; Tue, 14 Mar 2017 18:46:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6AC6C160B63; Tue, 14 Mar 2017 17:46:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 94541160B89 for ; Tue, 14 Mar 2017 18:46:28 +0100 (CET) Received: (qmail 35177 invoked by uid 500); 14 Mar 2017 17:46:27 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 34941 invoked by uid 99); 14 Mar 2017 17:46:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Mar 2017 17:46:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D207EDFE20; Tue, 14 Mar 2017 17:46:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.apache.org Date: Tue, 14 Mar 2017 17:46:27 -0000 Message-Id: <7a4322fbb1fa4e7f9bc4673b07f72e88@git.apache.org> In-Reply-To: <112ec13151c54693961c0f51675ed021@git.apache.org> References: <112ec13151c54693961c0f51675ed021@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] beam git commit: Prevent Double-Close in UnboundedReadEvaluatorFactory archived-at: Tue, 14 Mar 2017 17:46:29 -0000 Prevent Double-Close in UnboundedReadEvaluatorFactory Move the actual "close-and-resume" to the overall try block, to ensure that the reader cannot be double-closed if the first call to close() throws an IOException. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/869002c3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/869002c3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/869002c3 Branch: refs/heads/master Commit: 869002c397b3a360ab9a9afe0a342a6ac2fe7f9e Parents: 30033cc Author: Thomas Groh Authored: Tue Mar 14 09:18:57 2017 -0700 Committer: Thomas Groh Committed: Tue Mar 14 10:46:16 2017 -0700 ---------------------------------------------------------------------- .../direct/UnboundedReadEvaluatorFactory.java | 30 ++++++---- .../UnboundedReadEvaluatorFactoryTest.java | 61 ++++++++++++++++++-- 2 files changed, 77 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 69e6920..7c3d50a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -139,7 +139,24 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { numElements++; } while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance()); Instant watermark = reader.getWatermark(); - UnboundedSourceShard residual = finishRead(reader, shard); + + CheckpointMarkT finishedCheckpoint = finishRead(reader, shard); + UnboundedSourceShard residual; + // Sometimes resume from a checkpoint even if it's not required + if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) { + UnboundedReader toClose = reader; + // Prevent double-close. UnboundedReader is AutoCloseable, which does not require + // idempotency of close. Nulling out the reader here prevents trying to re-close it + // if the call to close throws an IOException. + reader = null; + toClose.close(); + residual = + UnboundedSourceShard.of( + shard.getSource(), shard.getDeduplicator(), null, finishedCheckpoint); + } else { + residual = shard.withCheckpoint(finishedCheckpoint); + } + resultBuilder .addOutput(output) .addUnprocessedElements( @@ -192,7 +209,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * Checkpoint the current reader, finalize the previous checkpoint, and return the residual * {@link UnboundedSourceShard}. */ - private UnboundedSourceShard finishRead( + private CheckpointMarkT finishRead( UnboundedReader reader, UnboundedSourceShard shard) throws IOException { final CheckpointMark oldMark = shard.getCheckpoint(); @@ -223,14 +240,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { } }); } - - // Sometimes resume from a checkpoint even if it's not required - if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) { - reader.close(); - return UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), null, mark); - } else { - return shard.withCheckpoint(mark); - } + return mark; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/869002c3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 7e2d85d..cdb362f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -76,6 +76,7 @@ import org.joda.time.ReadableInstant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.invocation.InvocationOnMock; @@ -96,8 +97,8 @@ public class UnboundedReadEvaluatorFactoryTest { private UnboundedSource source; private DirectGraph graph; - @Rule - public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Before public void setup() { @@ -379,6 +380,38 @@ public class UnboundedReadEvaluatorFactoryTest { is(true)); } + @Test + public void evaluatorThrowsInCloseRethrows() throws Exception { + ContiguousSet elems = ContiguousSet.create(Range.closed(0L, 20L), DiscreteDomain.longs()); + TestUnboundedSource source = + new TestUnboundedSource<>(BigEndianLongCoder.of(), elems.toArray(new Long[0])) + .throwsOnClose(); + + PCollection pcollection = p.apply(Read.from(source)); + AppliedPTransform sourceTransform = + DirectGraphs.getGraph(p).getProducer(pcollection); + + when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); + UncommittedBundle output = bundleFactory.createBundle(pcollection); + when(context.createBundle(pcollection)).thenReturn(output); + + WindowedValue> shard = + WindowedValue.valueInGlobalWindow( + UnboundedSourceShard.unstarted(source, NeverDeduplicator.create())); + CommittedBundle> inputBundle = + bundleFactory + .>createRootBundle() + .add(shard) + .commit(Instant.now()); + UnboundedReadEvaluatorFactory factory = + new UnboundedReadEvaluatorFactory(context, 0.0 /* never reuse */); + TransformEvaluator> evaluator = + factory.forApplication(sourceTransform, inputBundle); + thrown.expect(IOException.class); + thrown.expectMessage("throws on close"); + evaluator.processElement(shard); + } + /** * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where * the timestamp is the epoch offset by the value of the element. @@ -402,12 +435,18 @@ public class UnboundedReadEvaluatorFactoryTest { private final Coder coder; private final List elems; private boolean dedupes = false; + private boolean throwOnClose; public TestUnboundedSource(Coder coder, T... elems) { + this(coder, false, Arrays.asList(elems)); + } + + private TestUnboundedSource(Coder coder, boolean throwOnClose, List elems) { readerAdvancedCount = 0; readerClosedCount = 0; this.coder = coder; - this.elems = Arrays.asList(elems); + this.elems = elems; + this.throwOnClose = throwOnClose; } @Override @@ -441,9 +480,14 @@ public class UnboundedReadEvaluatorFactoryTest { return coder; } + public TestUnboundedSource throwsOnClose() { + return new TestUnboundedSource<>(coder, true, elems); + } + private class TestUnboundedReader extends UnboundedReader { private final List elems; private int index; + private boolean closed = false; public TestUnboundedReader(List elems, int startIndex) { this.elems = elems; @@ -503,7 +547,16 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public void close() throws IOException { - readerClosedCount++; + try { + readerClosedCount++; + // Enforce the AutoCloseable contract. Close is not idempotent. + assertThat(closed, is(false)); + if (throwOnClose) { + throw new IOException(String.format("%s throws on close", TestUnboundedSource.this)); + } + } finally { + closed = true; + } } } }