beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Make TransformEvaluatorFactory reuse Explicit
Date Wed, 20 Jul 2016 17:31:10 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master c4ad11832 -> 6d7efe3df


Make TransformEvaluatorFactory reuse Explicit

Transform Evaluator Factories must be reused for the entire execution of
a Pipeline and must not be reused across pipelines.

Remove EvaluatorKey, and key explicitly by the transform application.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34467f92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34467f92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34467f92

Branch: refs/heads/master
Commit: 34467f92d5a31b47f95b734c737fde0a8277311b
Parents: 6952471
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Jul 15 10:51:24 2016 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri Jul 15 10:52:32 2016 -0700

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactory.java     | 11 ++--
 .../beam/runners/direct/EvaluatorKey.java       | 55 --------------------
 .../direct/TransformEvaluatorFactory.java       |  4 ++
 .../direct/UnboundedReadEvaluatorFactory.java   | 11 ++--
 4 files changed, 13 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index e550f54..9ba8b61 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -46,7 +46,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
    * Evaluators are cached here to ensure that the reader is not restarted if the evaluator
is
    * retriggered.
    */
-  private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>>
+  private final ConcurrentMap<AppliedPTransform<?, ?, ?>, Queue<? extends BoundedReadEvaluator<?>>>
       sourceEvaluators = new ConcurrentHashMap<>();
 
   @SuppressWarnings({"unchecked", "rawtypes"})
@@ -79,12 +79,11 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
       final EvaluationContext evaluationContext) {
     // Key by the application and the context the evaluation is occurring in (which call
to
     // Pipeline#run).
-    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
     Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
-        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(transform);
     if (evaluatorQueue == null) {
       evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+      if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
         // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
         // factory for this transform
         BoundedSource<OutputT> source = transform.getTransform().getSource();
@@ -93,7 +92,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
         evaluatorQueue.offer(evaluator);
       } else {
         // otherwise return the existing Queue that arrived before us
-        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(transform);
       }
     }
     return evaluatorQueue;
@@ -132,7 +131,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
     @Override
     public TransformResult finishBundle() throws IOException {
       try (final BoundedReader<OutputT> reader =
-              source.createReader(evaluationContext.getPipelineOptions());) {
+              source.createReader(evaluationContext.getPipelineOptions())) {
         boolean contentsRemaining = reader.start();
         UncommittedBundle<OutputT> output =
             evaluationContext.createRootBundle(transform.getOutput());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
deleted file mode 100644
index 164e05a..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluatorKey.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-
-import java.util.Objects;
-
-/**
- * A (Transform, Pipeline Execution) key for stateful evaluators.
- *
- * Source evaluators are stateful to ensure data is not read multiple times. Evaluators are
cached
- * to ensure that the reader is not restarted if the evaluator is retriggered. An
- * {@link EvaluatorKey} is used to ensure that multiple Pipelines can be executed without
sharing
- * the same evaluators.
- */
-final class EvaluatorKey {
-  private final AppliedPTransform<?, ?, ?> transform;
-  private final EvaluationContext context;
-
-  public EvaluatorKey(AppliedPTransform<?, ?, ?> transform, EvaluationContext context)
{
-    this.transform = transform;
-    this.context = context;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(transform, context);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof EvaluatorKey)) {
-      return false;
-    }
-    EvaluatorKey that = (EvaluatorKey) other;
-    return Objects.equals(this.transform, that.transform)
-        && Objects.equals(this.context, that.context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
index 1973a2f..7fac1e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorFactory.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -28,6 +29,9 @@ import javax.annotation.Nullable;
 /**
  * A factory for creating instances of {@link TransformEvaluator} for the application of
a
  * {@link PTransform}.
+ *
+ * <p>{@link TransformEvaluatorFactory TransformEvaluatorFactories} will be reused
within a single
+ * execution of a {@link Pipeline} but will not be reused across executions.
  */
 public interface TransformEvaluatorFactory {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34467f92/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index b226a2a..674be5e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -62,7 +62,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
    * an arbitrary Queue implementation does not, so the concrete type is used explicitly.
    */
   private final ConcurrentMap<
-      EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>>
+      AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?,
?>>>
       sourceEvaluators = new ConcurrentHashMap<>();
 
   @SuppressWarnings({"unchecked", "rawtypes"})
@@ -91,16 +91,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
   Queue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> getTransformEvaluatorQueue(
       final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>>
transform,
       final EvaluationContext evaluationContext) {
-    // Key by the application and the context the evaluation is occurring in (which call
to
-    // Pipeline#run).
-    EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
     @SuppressWarnings("unchecked")
     ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue
=
         (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
-            sourceEvaluators.get(key);
+            sourceEvaluators.get(transform);
     if (evaluatorQueue == null) {
       evaluatorQueue = new ConcurrentLinkedQueue<>();
-      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+      if (sourceEvaluators.putIfAbsent(transform, evaluatorQueue) == null) {
         // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
         // factory for this transform
         UnboundedSource<OutputT, CheckpointMarkT> source =
@@ -119,7 +116,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
         // otherwise return the existing Queue that arrived before us
         evaluatorQueue =
             (ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>>)
-                sourceEvaluators.get(key);
+                sourceEvaluators.get(transform);
       }
     }
     return evaluatorQueue;


Mime
View raw message