beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] incubator-beam git commit: Fix a bug in SplittableDoFn Checkpointing
Date Fri, 09 Dec 2016 01:18:10 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 409b5dfcf -> 40bd27602


Fix a bug in SplittableDoFn Checkpointing

Call checkpoint() only once if the SDF emits output several times per
claim call.

Calling checkpoint multiple times would clobber an existing checkpoint,
and the second call would only ever return an empty residual, losing all
of the initial residual.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf1fba45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf1fba45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf1fba45

Branch: refs/heads/master
Commit: bf1fba450e6b5fd6c98d006b381472eee8db7b72
Parents: 409b5df
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue Dec 6 18:00:03 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Thu Dec 8 17:16:30 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDo.java      |  9 ++-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  8 ++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 58 +++++++++++++++++++-
 4 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 0bf882b..8a9bfcd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -590,9 +590,14 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         }
 
         private void noteOutput() {
-          if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE) {
+          // Take the checkpoint only if it hasn't been taken yet, because:
+          // 1) otherwise we'd lose the previous checkpoint stored in residualRestrictionHolder
+          // 2) it's not allowed to checkpoint a RestrictionTracker twice, since the first
call
+          // by definition already maximally narrows its restriction, so a second checkpoint
would
+          // have produced a useless empty residual restriction anyway.
+          if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE && residualRestrictionHolder[0]
== null) {
             // Request a checkpoint. The fn *may* produce more output, but hopefully not
too much.
-            residualRestrictionHolder[0] = tracker.checkpoint();
+            residualRestrictionHolder[0] = checkNotNull(tracker.checkpoint());
           }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index e60c536..167f5fa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -787,7 +787,9 @@ public class ParDo {
     @Override
     public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
+          !isSplittable(getOldFn()),
+          "%s does not support Splittable DoFn",
+          input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
       return PCollection.<OutputT>createPrimitiveOutputInternal(
               input.getPipeline(),
@@ -1044,7 +1046,9 @@ public class ParDo {
     @Override
     public PCollectionTuple expand(PCollection<? extends InputT> input) {
       checkArgument(
-          !isSplittable(getOldFn()), "Splittable DoFn not supported by the current runner");
+          !isSplittable(getOldFn()),
+          "%s does not support Splittable DoFn",
+          input.getPipeline().getOptions().getRunner().getName());
       validateWindowType(input, fn);
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 36666b2..2d118e4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1717,7 +1717,8 @@ public class ParDoTest implements Serializable {
     Pipeline p = TestPipeline.create();
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Splittable DoFn not supported by the current runner");
+    thrown.expectMessage(p.getRunner().getClass().getName());
+    thrown.expectMessage("does not support Splittable DoFn");
 
     p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new TestSplittableDoFn()));
   }
@@ -1729,7 +1730,8 @@ public class ParDoTest implements Serializable {
     Pipeline p = TestPipeline.create();
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Splittable DoFn not supported by the current runner");
+    thrown.expectMessage(p.getRunner().getClass().getName());
+    thrown.expectMessage("does not support Splittable DoFn");
 
     p.apply(Create.of(1, 2, 3))
         .apply(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index 82bd3a3..022c2e5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.transforms;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -36,6 +38,7 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesSplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -117,10 +120,10 @@ public class SplittableDoFnTest {
       for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); ++i) {
         c.output(KV.of(c.element(), i));
         if (i % 3 == 0) {
-          return ProcessContinuation.resume();
+          return resume();
         }
       }
-      return ProcessContinuation.stop();
+      return stop();
     }
 
     @GetInitialRestriction
@@ -231,6 +234,57 @@ public class SplittableDoFnTest {
     p.run();
   }
 
+  @BoundedPerElement
+  private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer>
{
+    private static final int MAX_INDEX = 98765;
+
+    private static int snapToNextBlock(int index, int[] blockStarts) {
+      for (int i = 1; i < blockStarts.length; ++i) {
+        if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
+          return i;
+        }
+      }
+      throw new IllegalStateException("Shouldn't get here");
+    }
+
+    @ProcessElement
+    public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker)
{
+      int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
+      int trueStart = snapToNextBlock(tracker.currentRestriction().from, blockStarts);
+      int trueEnd = snapToNextBlock(tracker.currentRestriction().to, blockStarts);
+      for (int i = trueStart; i < trueEnd; ++i) {
+        if (!tracker.tryClaim(blockStarts[i])) {
+          return resume();
+        }
+        for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
+          c.output(index);
+        }
+      }
+      return stop();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(String element) {
+      return new OffsetRange(0, MAX_INDEX);
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(OffsetRange range) {
+      return new OffsetRangeTracker(range);
+    }
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesSplittableParDo.class})
+  public void testOutputAfterCheckpoint() throws Exception {
+    Pipeline p = TestPipeline.create();
+    PCollection<Integer> outputs = p.apply(Create.of("foo"))
+        .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
+    PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
+        .isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX);
+    p.run();
+  }
+
   private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> {
     private final PCollectionView<String> sideInput;
     private final TupleTag<String> sideOutput;


Mime
View raw message