beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/3] beam git commit: Clone before Resume in DirectRunner Unbounded Reads
Date Fri, 24 Mar 2017 23:46:06 GMT
Clone before Resume in DirectRunner Unbounded Reads

This exercises the CheckpointMarkCoder of all Unbounded Sources in the
DirectRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5630f92
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5630f92
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5630f92

Branch: refs/heads/master
Commit: a5630f9213fe25471b730e2021289c0d1de0a215
Parents: 4c1f2e4
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Mar 24 14:36:46 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri Mar 24 16:45:55 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/UnboundedReadEvaluatorFactory.java   |  4 ++++
 .../direct/UnboundedReadEvaluatorFactoryTest.java       | 12 +++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a5630f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 7c3d50a..f0eef58 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -187,6 +188,9 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
       UnboundedReader<OutputT> existing = shard.getExistingReader();
       if (existing == null) {
         CheckpointMarkT checkpoint = shard.getCheckpoint();
+        if (checkpoint != null) {
+          checkpoint = CoderUtils.clone(shard.getSource().getCheckpointMarkCoder(), checkpoint);
+        }
         return shard
             .getSource()
             .createReader(evaluationContext.getPipelineOptions(), checkpoint);

http://git-wip-us.apache.org/repos/asf/beam/blob/a5630f92/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index cdb362f..8707f31 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -458,6 +458,9 @@ public class UnboundedReadEvaluatorFactoryTest {
     @Override
     public UnboundedSource.UnboundedReader<T> createReader(
         PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
+      checkState(
+          checkpointMark == null || checkpointMark.decoded,
+          "Cannot resume from a checkpoint that has not been decoded");
       return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index);
     }
 
@@ -564,6 +567,7 @@ public class UnboundedReadEvaluatorFactoryTest {
   private static class TestCheckpointMark implements CheckpointMark {
     final int index;
     private boolean finalized = false;
+    private boolean decoded = false;
 
     private TestCheckpointMark(int index) {
       this.index = index;
@@ -573,6 +577,10 @@ public class UnboundedReadEvaluatorFactoryTest {
     public void finalizeCheckpoint() throws IOException {
       checkState(
           !finalized, "%s was finalized more than once", TestCheckpointMark.class.getSimpleName());
+      checkState(
+          !decoded,
+          "%s was finalized after being decoded",
+          TestCheckpointMark.class.getSimpleName());
       finalized = true;
     }
 
@@ -594,7 +602,9 @@ public class UnboundedReadEvaluatorFactoryTest {
       public TestCheckpointMark decode(
           InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
           throws IOException {
-        return new TestCheckpointMark(VarInt.decodeInt(inStream));
+        TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream));
+        decoded.decoded = true;
+        return decoded;
       }
     }
   }


Mime
View raw message