beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [3/9] beam git commit: BEAM-1419 Flatten should comply with PTransform style guide
Date Wed, 01 Mar 2017 04:10:50 GMT
BEAM-1419 Flatten should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f5056efc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f5056efc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f5056efc

Branch: refs/heads/master
Commit: f5056efc80bf4f240ec1eeea4e2b50bf567a2d6c
Parents: b87621e
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue Feb 7 16:55:19 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/ApexPipelineTranslator.java     |  2 +-
 .../translation/FlattenPCollectionTranslator.java    |  6 +++---
 .../translation/operators/ApexFlattenOperator.java   |  4 ++--
 .../construction/EmptyFlattenAsCreateFactory.java    |  7 +++----
 .../core/construction/PTransformMatchers.java        |  4 ++--
 .../core/construction/PTransformMatchersTest.java    |  6 +++---
 .../beam/runners/direct/EmptyInputProvider.java      |  4 ++--
 .../beam/runners/direct/FlattenEvaluatorFactory.java |  4 ++--
 .../beam/runners/direct/RootProviderRegistry.java    |  4 ++--
 .../runners/direct/TransformEvaluatorRegistry.java   |  4 ++--
 .../beam/runners/direct/DirectGraphVisitorTest.java  |  4 ++--
 .../flink/FlinkBatchTransformTranslators.java        |  6 +++---
 .../flink/FlinkStreamingTransformTranslators.java    |  6 +++---
 .../runners/dataflow/DataflowPipelineTranslator.java |  8 ++++----
 .../spark/translation/TransformTranslator.java       |  8 ++++----
 .../streaming/StreamingTransformTranslator.java      |  8 ++++----
 .../java/org/apache/beam/sdk/transforms/Flatten.java | 15 ++++++++-------
 .../org/apache/beam/sdk/transforms/FlattenTest.java  | 12 ++++++------
 18 files changed, 56 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 36d679a..e9d6571 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -64,7 +64,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor
{
     registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
     registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
-    registerTransformTranslator(Flatten.FlattenPCollectionList.class,
+    registerTransformTranslator(Flatten.PCollections.class,
         new FlattenPCollectionTranslator());
     registerTransformTranslator(PrimitiveCreate.class, new CreateValuesTranslator());
     registerTransformTranslator(CreateApexPCollectionView.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
index 2e31dfc..080c5e9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java
@@ -35,14 +35,14 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
- * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
+ * {@link Flatten.PCollections} translation to Apex operator.
  */
 class FlattenPCollectionTranslator<T> implements
-    TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+    TransformTranslator<Flatten.PCollections<T>> {
   private static final long serialVersionUID = 1L;
 
   @Override
-  public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext
context) {
+  public void translate(Flatten.PCollections<T> transform, TranslationContext context)
{
     List<PCollection<T>> inputCollections = extractPCollections(context.getInputs());
 
     if (inputCollections.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
index 3d9db51..4594765 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexFlattenOperator.java
@@ -21,15 +21,15 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
-
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Apex operator for Beam {@link org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList}.
+ * Apex operator for Beam {@link PCollections}.
  */
 public class ApexFlattenOperator<InputT> extends BaseOperator {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
index 3b29c0a..0168039 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -36,11 +35,11 @@ import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a
{@link
- * Flatten.FlattenPCollectionList} that takes no input {@link PCollection PCollections}.
+ * Flatten.PCollections} that takes no input {@link PCollection PCollections}.
  */
 public class EmptyFlattenAsCreateFactory<T>
     implements PTransformOverrideFactory<
-        PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>>
{
+        PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>>
{
   private static final EmptyFlattenAsCreateFactory<Object> INSTANCE =
       new EmptyFlattenAsCreateFactory<>();
 
@@ -52,7 +51,7 @@ public class EmptyFlattenAsCreateFactory<T>
 
   @Override
   public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
-      FlattenPCollectionList<T> transform) {
+      Flatten.PCollections<T> transform) {
     return (PTransform) Create.empty(VoidCoder.of());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 7f8d467..efcc455 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -165,14 +165,14 @@ public class PTransformMatchers {
   }
 
   /**
-   * A {@link PTransformMatcher} which matches a {@link Flatten.FlattenPCollectionList} which
+   * A {@link PTransformMatcher} which matches a {@link Flatten.PCollections} which
    * consumes no input {@link PCollection PCollections}.
    */
   public static PTransformMatcher emptyFlatten() {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        return (application.getTransform() instanceof Flatten.FlattenPCollectionList)
+        return (application.getTransform() instanceof Flatten.PCollections)
             && application.getInputs().isEmpty();
       }
     };

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index be3ed6b..491c14f 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -327,7 +327,7 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
                 of(
                     "EmptyFlatten",
                     Collections.<TaggedPValue>emptyList(),
@@ -346,7 +346,7 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonEmptyFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollectionList<Object>, PCollection<Object>, Flatten.FlattenPCollectionList<Object>>
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
                 of(
                     "Flatten",
                     Collections.singletonList(
@@ -369,7 +369,7 @@ public class PTransformMatchersTest implements Serializable {
   public void emptyFlattenWithNonFlatten() {
     AppliedPTransform application =
         AppliedPTransform
-            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.FlattenIterables<Object>>
+            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
                 of(
                     "EmptyFlatten",
                     Collections.<TaggedPValue>emptyList(),

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
index 1185130..98d4a64 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EmptyInputProvider.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 
 /** A {@link RootInputProvider} that provides a singleton empty bundle. */
 class EmptyInputProvider<T>
-    implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.FlattenPCollectionList<T>>
{
+    implements RootInputProvider<T, Void, PCollectionList<T>, Flatten.PCollections<T>>
{
   EmptyInputProvider() {}
 
   /**
@@ -37,7 +37,7 @@ class EmptyInputProvider<T>
    */
   @Override
   public Collection<CommittedBundle<Void>> getInitialInputs(
-      AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.FlattenPCollectionList<T>>
+      AppliedPTransform<PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>>
           transform,
       int targetParallelism) {
     return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 66862ea..8528905 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -22,7 +22,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -53,7 +53,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
 
   private <InputT> TransformEvaluator<InputT> createInMemoryEvaluator(
       final AppliedPTransform<
-              PCollectionList<InputT>, PCollection<InputT>, FlattenPCollectionList<InputT>>
+              PCollectionList<InputT>, PCollection<InputT>, PCollections<InputT>>
           application) {
     final UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
index e8a7665..eb9492c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/RootProviderRegistry.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -42,7 +42,7 @@ class RootProviderRegistry {
         .put(
             TestStreamEvaluatorFactory.DirectTestStreamFactory.DirectTestStream.class,
             new TestStreamEvaluatorFactory.InputProvider(context))
-        .put(FlattenPCollectionList.class, new EmptyInputProvider());
+        .put(PCollections.class, new EmptyInputProvider());
     return new RootProviderRegistry(defaultProviders.build());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 1ddf9f4..9fdefc3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -32,7 +32,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -53,7 +53,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt))
             .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt))
             .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt))
-            .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory(ctxt))
+            .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt))
             .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
             // Runner-specific primitives used in expansion of GroupByKey

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index df49796..8b4573f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -110,7 +110,7 @@ public class DirectGraphVisitorTest implements Serializable {
 
   @Test
   public void getRootTransformsContainsEmptyFlatten() {
-    FlattenPCollectionList<String> flatten = Flatten.pCollections();
+    PCollections<String> flatten = Flatten.pCollections();
     PCollectionList<String> emptyList = PCollectionList.empty(p);
     PCollection<String> empty = emptyList.apply(flatten);
     empty.setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 30e9d68..acc204d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -108,7 +108,7 @@ class FlinkBatchTransformTranslators {
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
     TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
 
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
 
     TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
 
@@ -706,12 +706,12 @@ class FlinkBatchTransformTranslators {
 
   private static class FlattenPCollectionTranslatorBatch<T>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
-          Flatten.FlattenPCollectionList<T>> {
+      Flatten.PCollections<T>> {
 
     @Override
     @SuppressWarnings("unchecked")
     public void translateNode(
-        Flatten.FlattenPCollectionList<T> transform,
+        Flatten.PCollections<T> transform,
         FlinkBatchTranslationContext context) {
 
       List<TaggedPValue> allInputs = context.getInputs(transform);

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index a3cceb2..03f567d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -125,7 +125,7 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator());
 
     TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
-    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator());
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator());
     TRANSLATORS.put(
         FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
         new CreateViewStreamingTranslator());
@@ -999,11 +999,11 @@ class FlinkStreamingTransformTranslators {
 
   private static class FlattenPCollectionTranslator<T>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
-        Flatten.FlattenPCollectionList<T>> {
+      Flatten.PCollections<T>> {
 
     @Override
     public void translateNode(
-        Flatten.FlattenPCollectionList<T> transform,
+        Flatten.PCollections<T> transform,
         FlinkStreamingTranslationContext context) {
       List<TaggedPValue> allInputs = context.getInputs(transform);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index c672e99..fe5db5a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -744,16 +744,16 @@ public class DataflowPipelineTranslator {
         });
 
     registerTransformTranslator(
-        Flatten.FlattenPCollectionList.class,
-        new TransformTranslator<Flatten.FlattenPCollectionList>() {
+        Flatten.PCollections.class,
+        new TransformTranslator<Flatten.PCollections>() {
           @Override
           public void translate(
-              Flatten.FlattenPCollectionList transform, TranslationContext context) {
+              Flatten.PCollections transform, TranslationContext context) {
             flattenHelper(transform, context);
           }
 
           private <T> void flattenHelper(
-              Flatten.FlattenPCollectionList<T> transform, TranslationContext context)
{
+              Flatten.PCollections<T> transform, TranslationContext context) {
             StepTranslationContext stepContext = context.addStep(transform, "Flatten");
 
             List<OutputReference> inputs = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a643651..7fc09ad 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -97,11 +97,11 @@ public final class TransformTranslator {
   private TransformTranslator() {
   }
 
-  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>>
flattenPColl() {
-    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+  private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl()
{
+    return new TransformEvaluator<Flatten.PCollections<T>>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext
context) {
+      public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context)
{
         List<TaggedPValue> pcs = context.getInputs(transform);
         JavaRDD<WindowedValue<T>> unionRDD;
         if (pcs.size() == 0) {
@@ -729,7 +729,7 @@ public final class TransformTranslator {
     EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());
-    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+    EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
     EVALUATORS.put(Create.Values.class, create());
     EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
     EVALUATORS.put(View.AsIterable.class, viewAsIter());

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 4a07741..a856897 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -170,11 +170,11 @@ final class StreamingTransformTranslator {
     };
   }
 
-  private static <T> TransformEvaluator<Flatten.FlattenPCollectionList<T>>
flattenPColl() {
-    return new TransformEvaluator<Flatten.FlattenPCollectionList<T>>() {
+  private static <T> TransformEvaluator<Flatten.PCollections<T>> flattenPColl()
{
+    return new TransformEvaluator<Flatten.PCollections<T>>() {
       @SuppressWarnings("unchecked")
       @Override
-      public void evaluate(Flatten.FlattenPCollectionList<T> transform, EvaluationContext
context) {
+      public void evaluate(Flatten.PCollections<T> transform, EvaluationContext context)
{
         List<TaggedPValue> pcs = context.getInputs(transform);
         // since this is a streaming pipeline, at least one of the PCollections to "flatten"
are
         // unbounded, meaning it represents a DStream.
@@ -445,7 +445,7 @@ final class StreamingTransformTranslator {
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
     EVALUATORS.put(CreateStream.class, createFromQueue());
     EVALUATORS.put(Window.Bound.class, window());
-    EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
+    EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 3ef2e55..7b282b5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -64,8 +64,8 @@ public class Flatten {
    * @param <T> the type of the elements in the input and output
    * {@code PCollection}s.
    */
-  public static <T> FlattenPCollectionList<T> pCollections() {
-    return new FlattenPCollectionList<>();
+  public static <T> PCollections<T> pCollections() {
+    return new PCollections<>();
   }
 
   /**
@@ -86,8 +86,8 @@ public class Flatten {
    * @param <T> the type of the elements of the input {@code Iterable} and
    * the output {@code PCollection}
    */
-  public static <T> FlattenIterables<T> iterables() {
-    return new FlattenIterables<>();
+  public static <T> Iterables<T> iterables() {
+    return new Iterables<>();
   }
 
   /**
@@ -99,10 +99,10 @@ public class Flatten {
    * @param <T> the type of the elements in the input and output
    * {@code PCollection}s.
    */
-  public static class FlattenPCollectionList<T>
+  public static class PCollections<T>
       extends PTransform<PCollectionList<T>, PCollection<T>> {
 
-    private FlattenPCollectionList() { }
+    private PCollections() { }
 
     @Override
     public PCollection<T> expand(PCollectionList<T> inputs) {
@@ -159,8 +159,9 @@ public class Flatten {
    * @param <T> the type of the elements of the input {@code Iterable}s and
    * the output {@code PCollection}
    */
-  public static class FlattenIterables<T>
+  public static class Iterables<T>
       extends PTransform<PCollection<? extends Iterable<T>>, PCollection<T>>
{
+    private Iterables() {}
 
     @Override
     public PCollection<T> expand(PCollection<? extends Iterable<T>> in)
{

http://git-wip-us.apache.org/repos/asf/beam/blob/f5056efc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 3b5011b..bc3e322 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -81,7 +81,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionList() {
+  public void testFlattenPCollections() {
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
 
@@ -95,7 +95,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionListThenParDo() {
+  public void testFlattenPCollectionsThenParDo() {
     List<List<String>> inputs = Arrays.asList(
       LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES);
 
@@ -110,7 +110,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionListEmpty() {
+  public void testFlattenPCollectionsEmpty() {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of());
@@ -198,7 +198,7 @@ public class FlattenTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
-  public void testFlattenPCollectionListEmptyThenParDo() {
+  public void testFlattenPCollectionsEmptyThenParDo() {
     PCollection<String> output =
         PCollectionList.<String>empty(p)
         .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of())
@@ -366,8 +366,8 @@ public class FlattenTest implements Serializable {
 
   @Test
   public void testFlattenGetName() {
-    Assert.assertEquals("Flatten.FlattenIterables", Flatten.<String>iterables().getName());
-    Assert.assertEquals("Flatten.FlattenPCollectionList", Flatten.<String>pCollections().getName());
+    Assert.assertEquals("Flatten.Iterables", Flatten.<String>iterables().getName());
+    Assert.assertEquals("Flatten.PCollections", Flatten.<String>pCollections().getName());
   }
 
   /////////////////////////////////////////////////////////////////////////////


Mime
View raw message