beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Add CommittedResult
Date Fri, 29 Apr 2016 00:51:01 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master baae9013c -> 4d5303d8a


Add CommittedResult

Return as the output to InProcessEvaluationContext#handleResult(). This
allows a richer return type to improve possible behaviors when a result
is returned.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2618aa68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2618aa68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2618aa68

Branch: refs/heads/master
Commit: 2618aa68ecad95f77a6a42f8eddfe63e511edf23
Parents: a9387fc
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Apr 28 12:22:47 2016 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Thu Apr 28 13:18:26 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/inprocess/CommittedResult.java  | 47 ++++++++++++
 .../runners/inprocess/CompletionCallback.java   |  2 +-
 .../ExecutorServiceParallelExecutor.java        | 16 ++--
 .../inprocess/InProcessEvaluationContext.java   |  4 +-
 .../runners/inprocess/TransformExecutor.java    |  4 +-
 .../runners/inprocess/CommittedResultTest.java  | 78 ++++++++++++++++++++
 .../InProcessEvaluationContextTest.java         |  4 +-
 .../inprocess/TransformExecutorTest.java        |  4 +-
 8 files changed, 142 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
new file mode 100644
index 0000000..3ad0ae6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CommittedResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.runners.inprocess;
+
+import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * A {@link InProcessTransformResult} that has been committed.
+ */
+@AutoValue
+abstract class CommittedResult {
+  /**
+   * Returns the {@link AppliedPTransform} that produced this result.
+   */
+  public abstract AppliedPTransform<?, ?, ?> getTransform();
+
+  /**
+   * Returns the outputs produced by the transform.
+   */
+  public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
+
+  public static CommittedResult create(
+      InProcessTransformResult original, Iterable<? extends CommittedBundle<?>>
outputs) {
+    return new AutoValue_CommittedResult(original.getTransform(),
+        outputs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
index 90c488e..30a2b92 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/CompletionCallback.java
@@ -26,7 +26,7 @@ interface CompletionCallback {
   /**
    * Handle a successful result, returning the committed outputs of the result.
    */
-  Iterable<? extends CommittedBundle<?>> handleResult(
+  CommittedResult handleResult(
       CommittedBundle<?> inputBundle, InProcessTransformResult result);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index 3463d08..19bf35d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -216,14 +216,14 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
    */
   private class DefaultCompletionCallback implements CompletionCallback {
     @Override
-    public Iterable<? extends CommittedBundle<?>> handleResult(
+    public CommittedResult handleResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      Iterable<? extends CommittedBundle<?>> resultBundles =
+      CommittedResult committedResult =
           evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(),
result);
-      for (CommittedBundle<?> outputBundle : resultBundles) {
+      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
       }
-      return resultBundles;
+      return committedResult;
     }
 
     @Override
@@ -246,14 +246,14 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
     }
 
     @Override
-    public Iterable<? extends CommittedBundle<?>> handleResult(
+    public CommittedResult handleResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      Iterable<? extends CommittedBundle<?>> resultBundles =
+      CommittedResult committedResult =
           evaluationContext.handleResult(inputBundle, timers, result);
-      for (CommittedBundle<?> outputBundle : resultBundles) {
+      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
       }
-      return resultBundles;
+      return committedResult;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
index 3990f0d..7c0dcee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -145,7 +145,7 @@ class InProcessEvaluationContext {
    * @param result the result of evaluating the input bundle
    * @return the committed bundles contained within the handled {@code result}
    */
-  public synchronized Iterable<? extends CommittedBundle<?>> handleResult(
+  public synchronized CommittedResult handleResult(
       @Nullable CommittedBundle<?> completedBundle,
       Iterable<TimerData> completedTimers,
       InProcessTransformResult result) {
@@ -176,7 +176,7 @@ class InProcessEvaluationContext {
         applicationStateInternals.remove(stepAndKey);
       }
     }
-    return committedBundles;
+    return CommittedResult.create(result, committedBundles);
   }
 
   private Iterable<? extends CommittedBundle<?>> commitBundles(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
index 3a7bedc..a93c7b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/TransformExecutor.java
@@ -158,9 +158,9 @@ class TransformExecutor<T> implements Callable<InProcessTransformResult>
{
       TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>>
enforcements)
       throws Exception {
     InProcessTransformResult result = evaluator.finishBundle();
-    Iterable<? extends CommittedBundle<?>> outputs = onComplete.handleResult(inputBundle,
result);
+    CommittedResult outputs = onComplete.handleResult(inputBundle, result);
     for (ModelEnforcement<T> enforcement : enforcements) {
-      enforcement.afterFinish(inputBundle, result, outputs);
+      enforcement.afterFinish(inputBundle, result, outputs.getOutputs());
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
new file mode 100644
index 0000000..7fad647
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/CommittedResultTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.runners.inprocess;
+
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests for {@link CommittedResult}.
+ */
+@RunWith(JUnit4.class)
+public class CommittedResultTest implements Serializable {
+  private transient TestPipeline p = TestPipeline.create();
+  private transient AppliedPTransform<?, ?, ?> transform =
+      AppliedPTransform.of("foo", p.begin(), PDone.in(p), new PTransform<PBegin, PDone>()
{
+      });
+  private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+  @Test
+  public void getTransformExtractsFromResult() {
+    CommittedResult result =
+        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+            Collections.<InProcessPipelineRunner.CommittedBundle<?>>emptyList());
+
+    assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
+  }
+
+  @Test
+  public void getOutputsEqualInput() {
+    List<? extends InProcessPipelineRunner.CommittedBundle<?>> outputs =
+        ImmutableList.of(bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
+            WindowingStrategy.globalDefault(),
+            PCollection.IsBounded.BOUNDED)).commit(Instant.now()),
+            bundleFactory.createRootBundle(PCollection.createPrimitiveOutputInternal(p,
+                WindowingStrategy.globalDefault(),
+                PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
+    CommittedResult result =
+        CommittedResult.create(StepTransformResult.withoutHold(transform).build(), outputs);
+
+    assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index ee56954..d1ea51a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -460,7 +460,7 @@ public class InProcessEvaluationContextTest {
 
     UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
     rootBundle.add(WindowedValue.valueInGlobalWindow(1));
-    Iterable<? extends CommittedBundle<?>> handleResult =
+    CommittedResult handleResult =
         context.handleResult(
             null,
             ImmutableList.<TimerData>of(),
@@ -469,7 +469,7 @@ public class InProcessEvaluationContextTest {
                 .build());
     @SuppressWarnings("unchecked")
     CommittedBundle<Integer> committedBundle =
-        (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult);
+        (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult.getOutputs());
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2618aa68/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
index d3d70e0..31cb29a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java
@@ -487,11 +487,11 @@ public class TransformExecutorTest {
     }
 
     @Override
-    public Iterable<? extends CommittedBundle<?>> handleResult(
+    public CommittedResult handleResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
       handledResult = result;
       onMethod.countDown();
-      return Collections.emptyList();
+      return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList());
     }
 
     @Override


Mime
View raw message