beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Removes ReduceFnExecutor interface
Date Tue, 24 Jan 2017 22:02:26 GMT
Repository: beam
Updated Branches:
  refs/heads/master b3334879f -> 11c3cd70b


Removes ReduceFnExecutor interface


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8989473b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8989473b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8989473b

Branch: refs/heads/master
Commit: 8989473b8e379a40b888565aadead001379c9398
Parents: b333487
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue Jan 24 13:32:24 2017 -0800
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Tue Jan 24 13:32:24 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunner.java    | 20 --------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  5 +----
 .../beam/runners/direct/ParDoEvaluator.java     |  2 --
 .../runners/spark/translation/DoFnFunction.java |  2 --
 .../spark/translation/MultiDoFnFunction.java    |  2 --
 5 files changed, 1 insertion(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index 66f95db..b29adcc 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -17,12 +17,10 @@
  */
 package org.apache.beam.runners.core;
 
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.joda.time.Instant;
 
 /**
@@ -51,22 +49,4 @@ public interface DoFnRunner<InputT, OutputT> {
    * additional tasks, such as flushing in-memory states.
    */
   void finishBundle();
-
-  /**
-   * An internal interface for signaling that a {@link OldDoFn} requires late data dropping.
-   */
-  public interface ReduceFnExecutor<K, InputT, OutputT, W> {
-    /**
-     * Gets this object as a {@link OldDoFn}.
-     *
-     * <p>Most implementors of this interface are expected to be {@link OldDoFn} instances,
and will
-     * return themselves.
-     */
-    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn();
-
-    /**
-     * Returns an aggregator that tracks elements that are dropped due to being late.
-     */
-    Aggregator<Long, Long> getDroppedDueToLatenessAggregator();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index ecce4fc..d0387cf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.core;
 
-import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -37,7 +36,7 @@ import org.apache.beam.sdk.values.KV;
 @SystemDoFnInternal
 public class GroupAlsoByWindowViaWindowSetDoFn<
         K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
-    extends OldDoFn<RinT, KV<K, OutputT>> implements ReduceFnExecutor<K, InputT,
OutputT, W> {
+    extends OldDoFn<RinT, KV<K, OutputT>> {
 
   public static <K, InputT, OutputT, W extends BoundedWindow>
       OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
@@ -95,7 +94,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
     reduceFnRunner.persist();
   }
 
-  @Override
   public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
     // Safe contravariant cast
     @SuppressWarnings("unchecked")
@@ -104,7 +102,6 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
     return asFn;
   }
 
-  @Override
   public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
     return droppedDueToLateness;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 97d5360..48f0f8d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -73,8 +73,6 @@ class ParDoEvaluator<InputT, OutputT> implements TransformEvaluator<InputT>
{
     ReadyCheckingSideInputReader sideInputReader =
         evaluationContext.createSideInputReader(sideInputs);
 
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> underlying =
         DoFnRunners.simpleRunner(
             evaluationContext.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
index bd6cfbe..4fd5e51 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java
@@ -81,8 +81,6 @@ public class DoFnFunction<InputT, OutputT>
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
 
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
         DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),

http://git-wip-us.apache.org/repos/asf/beam/blob/8989473b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index cceffc8..911e6c5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -88,8 +88,6 @@ public class MultiDoFnFunction<InputT, OutputT>
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
 
-    // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn,
-    // and window-exploded processing is achieved within the simple runner
     DoFnRunner<InputT, OutputT> doFnRunner =
         DoFnRunners.simpleRunner(
             runtimeContext.getPipelineOptions(),


Mime
View raw message