Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 436B7200C46 for ; Tue, 14 Mar 2017 18:46:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 41FB1160B63; Tue, 14 Mar 2017 17:46:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 92FAB160B7E for ; Tue, 14 Mar 2017 18:46:00 +0100 (CET) Received: (qmail 33341 invoked by uid 500); 14 Mar 2017 17:45:59 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 33325 invoked by uid 99); 14 Mar 2017 17:45:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Mar 2017 17:45:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9D307DFE20; Tue, 14 Mar 2017 17:45:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.apache.org Date: Tue, 14 Mar 2017 17:46:00 -0000 Message-Id: <104f416e66cb4c1e86ca97088b4a2f2e@git.apache.org> In-Reply-To: <54a79e3ff3a2438b8f54f7b3630dcf35@git.apache.org> References: <54a79e3ff3a2438b8f54f7b3630dcf35@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] beam git commit: Stop Double-finalizing checkpoints in the DirectRunner archived-at: Tue, 14 Mar 2017 17:46:01 -0000 Stop Double-finalizing checkpoints in the DirectRunner Checkpoints don't need to be finalized before we restore from them. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d56d7451 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d56d7451 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d56d7451 Branch: refs/heads/master Commit: d56d7451a5d7c304d3cb1b5551d918773aec1c65 Parents: e362e6b Author: Thomas Groh Authored: Mon Mar 13 15:22:24 2017 -0700 Committer: Thomas Groh Committed: Tue Mar 14 10:45:48 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/UnboundedReadEvaluatorFactory.java | 3 --- .../direct/UnboundedReadEvaluatorFactoryTest.java | 13 ++++++++----- 2 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index ff59390..69e6920 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -170,9 +170,6 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { UnboundedReader existing = shard.getExistingReader(); if (existing == null) { CheckpointMarkT checkpoint = shard.getCheckpoint(); - if (checkpoint != null) { - checkpoint.finalizeCheckpoint(); - } return shard .getSource() .createReader(evaluationContext.getPipelineOptions(), checkpoint); http://git-wip-us.apache.org/repos/asf/beam/blob/d56d7451/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 987f927..7e2d85d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.runners.direct.DirectGraphs.getProducer; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -373,6 +374,9 @@ public class UnboundedReadEvaluatorFactoryTest { secondEvaluator.finishBundle(); assertThat(TestUnboundedSource.readerClosedCount, equalTo(2)); + assertThat( + Iterables.getOnlyElement(residual.getElements()).getValue().getCheckpoint().isFinalized(), + is(true)); } /** @@ -415,9 +419,6 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public UnboundedSource.UnboundedReader createReader( PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) { - if (checkpointMark != null) { - assertThat(checkpointMark.isFinalized(), is(true)); - } return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index); } @@ -517,6 +518,8 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public void finalizeCheckpoint() throws IOException { + checkState( + !finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName()); finalized = true; } @@ -530,14 +533,14 @@ public class UnboundedReadEvaluatorFactoryTest { TestCheckpointMark value, OutputStream outStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { + throws IOException { VarInt.encode(value.index, outStream); } @Override public TestCheckpointMark decode( InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { + throws IOException { return new TestCheckpointMark(VarInt.decodeInt(inStream)); } }