beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-362] Port runners to runners-core AggregatoryFactory
Date Fri, 16 Dec 2016 09:59:48 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master d624d3b6b -> 5ebbd500c


[BEAM-362] Port runners to runners-core AggregatoryFactory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55f04955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55f04955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55f04955

Branch: refs/heads/master
Commit: 55f0495583312c9c0dea620d6a4e85193e97f255
Parents: d624d3b
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Dec 15 21:06:14 2016 -0800
Committer: Sela <ansela@paypal.com>
Committed: Fri Dec 16 11:46:18 2016 +0200

----------------------------------------------------------------------
 .../runners/apex/translation/operators/ApexParDoOperator.java   | 2 +-
 .../src/main/java/org/apache/beam/runners/core/DoFnRunners.java | 1 -
 .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java     | 1 -
 .../java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java  | 1 -
 .../org/apache/beam/runners/direct/AggregatorContainer.java     | 2 +-
 .../flink/translation/wrappers/streaming/DoFnOperator.java      | 3 ++-
 .../apache/beam/runners/spark/aggregators/SparkAggregators.java | 5 +++--
 7 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 1e76949..4538fb5 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -37,6 +37,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
 import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
+import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -46,7 +47,6 @@ import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index da16573..0e4bf75 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -22,7 +22,6 @@ import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 041cdde..d504b40 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -30,7 +30,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.Context;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 10af29a..7d93200 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
index e86bc3e..c7fa4df 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -27,8 +27,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.util.ExecutionContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 87b15a7..001e3b6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -192,7 +193,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     currentInputWatermark = Long.MIN_VALUE;
     currentOutputWatermark = currentInputWatermark;
 
-    Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() {
+    AggregatorFactory aggregatorFactory = new AggregatorFactory() {
       @Override
       public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
           Class<?> fnClass,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
index 657264f..17d5844 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.aggregators;
 import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import java.util.Map;
+import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -99,9 +100,9 @@ public class SparkAggregators {
   }
 
   /**
-   * An implementation of {@link Aggregator.AggregatorFactory} for the SparkRunner.
+   * An implementation of {@link AggregatorFactory} for the SparkRunner.
    */
-  public static class Factory implements Aggregator.AggregatorFactory {
+  public static class Factory implements AggregatorFactory {
 
     private final SparkRuntimeContext runtimeContext;
     private final Accumulator<NamedAggregators> accumulator;


Mime
View raw message