beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [7/9] beam git commit: Rename PTransforms to PTransformTranslation
Date Wed, 24 May 2017 20:14:16 GMT
Rename PTransforms to PTransformTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b6728e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b6728e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b6728e2

Branch: refs/heads/master
Commit: 9b6728e24748791b7181b20183df3ada31f45682
Parents: 940819e
Author: Kenneth Knowles <klk@google.com>
Authored: Tue May 23 15:28:08 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue May 23 15:53:41 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     | 119 ++++++++++++
 .../runners/core/construction/PTransforms.java  | 119 ------------
 .../beam/runners/core/construction/ParDos.java  |   4 +-
 .../core/construction/SdkComponents.java        |   3 +-
 .../TransformPayloadTranslatorRegistrar.java    |   2 +-
 .../core/construction/WindowIntoTranslator.java |   2 +-
 .../construction/PTransformTranslationTest.java | 189 +++++++++++++++++++
 .../core/construction/PTransformsTest.java      | 188 ------------------
 8 files changed, 314 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
new file mode 100644
index 0000000..86638de
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner
API
+ * protocol buffers}.
+ */
+public class PTransformTranslation {
+  private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
+      KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+
+  private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
+      loadTransformPayloadTranslators() {
+    ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator>
builder =
+        ImmutableMap.builder();
+    for (TransformPayloadTranslatorRegistrar registrar :
+        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+      builder.putAll(registrar.getTransformPayloadTranslators());
+    }
+    return builder.build();
+  }
+
+  private PTransformTranslation() {}
+
+  /**
+   * Translates an {@link AppliedPTransform} into a runner API proto.
+   *
+   * <p>Does not register the {@code appliedPTransform} within the provided {@link
SdkComponents}.
+   */
+  static RunnerApi.PTransform toProto(
+      AppliedPTransform<?, ?, ?> appliedPTransform,
+      List<AppliedPTransform<?, ?, ?>> subtransforms,
+      SdkComponents components)
+      throws IOException {
+    RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
+    for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet())
{
+      checkArgument(
+          taggedInput.getValue() instanceof PCollection,
+          "Unexpected input type %s",
+          taggedInput.getValue().getClass());
+      transformBuilder.putInputs(
+          toProto(taggedInput.getKey()),
+          components.registerPCollection((PCollection<?>) taggedInput.getValue()));
+    }
+    for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet())
{
+      // TODO: Remove gating
+      if (taggedOutput.getValue() instanceof PCollection) {
+        checkArgument(
+            taggedOutput.getValue() instanceof PCollection,
+            "Unexpected output type %s",
+            taggedOutput.getValue().getClass());
+        transformBuilder.putOutputs(
+            toProto(taggedOutput.getKey()),
+            components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
+      }
+    }
+    for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
+      transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
+    }
+
+    transformBuilder.setUniqueName(appliedPTransform.getFullName());
+    // TODO: Display Data
+
+    PTransform<?, ?> transform = appliedPTransform.getTransform();
+    if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
+      FunctionSpec payload =
+          KNOWN_PAYLOAD_TRANSLATORS
+              .get(transform.getClass())
+              .translate(appliedPTransform, components);
+      transformBuilder.setSpec(payload);
+    }
+
+    return transformBuilder.build();
+  }
+
+  private static String toProto(TupleTag<?> tag) {
+    return tag.getId();
+  }
+
+  /**
+   * A translator consumes a {@link PTransform} application and produces the appropriate
+   * FunctionSpec for a distinguished or primitive transform within the Beam runner API.
+   */
+  public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
+    FunctionSpec translate(AppliedPTransform<?, ?, T> transform, SdkComponents components);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
deleted file mode 100644
index 9826b77..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner
API
- * protocol buffers}.
- */
-public class PTransforms {
-  private static final Map<Class<? extends PTransform>, TransformPayloadTranslator>
-      KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
-
-  private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
-      loadTransformPayloadTranslators() {
-    ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator>
builder =
-        ImmutableMap.builder();
-    for (TransformPayloadTranslatorRegistrar registrar :
-        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
-      builder.putAll(registrar.getTransformPayloadTranslators());
-    }
-    return builder.build();
-  }
-
-  private PTransforms() {}
-
-  /**
-   * Translates an {@link AppliedPTransform} into a runner API proto.
-   *
-   * <p>Does not register the {@code appliedPTransform} within the provided {@link
SdkComponents}.
-   */
-  static RunnerApi.PTransform toProto(
-      AppliedPTransform<?, ?, ?> appliedPTransform,
-      List<AppliedPTransform<?, ?, ?>> subtransforms,
-      SdkComponents components)
-      throws IOException {
-    RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
-    for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet())
{
-      checkArgument(
-          taggedInput.getValue() instanceof PCollection,
-          "Unexpected input type %s",
-          taggedInput.getValue().getClass());
-      transformBuilder.putInputs(
-          toProto(taggedInput.getKey()),
-          components.registerPCollection((PCollection<?>) taggedInput.getValue()));
-    }
-    for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet())
{
-      // TODO: Remove gating
-      if (taggedOutput.getValue() instanceof PCollection) {
-        checkArgument(
-            taggedOutput.getValue() instanceof PCollection,
-            "Unexpected output type %s",
-            taggedOutput.getValue().getClass());
-        transformBuilder.putOutputs(
-            toProto(taggedOutput.getKey()),
-            components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
-      }
-    }
-    for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
-      transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
-    }
-
-    transformBuilder.setUniqueName(appliedPTransform.getFullName());
-    // TODO: Display Data
-
-    PTransform<?, ?> transform = appliedPTransform.getTransform();
-    if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
-      FunctionSpec payload =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(transform.getClass())
-              .translate(appliedPTransform, components);
-      transformBuilder.setSpec(payload);
-    }
-
-    return transformBuilder.build();
-  }
-
-  private static String toProto(TupleTag<?> tag) {
-    return tag.getId();
-  }
-
-  /**
-   * A translator consumes a {@link PTransform} application and produces the appropriate
-   * FunctionSpec for a distinguished or primitive transform within the Beam runner API.
-   */
-  public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
-    FunctionSpec translate(AppliedPTransform<?, ?, T> transform, SdkComponents components);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
index 2ecc041..12f2969 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -96,7 +96,7 @@ public class ParDos {
    * A {@link TransformPayloadTranslator} for {@link ParDo}.
    */
   public static class ParDoPayloadTranslator
-      implements PTransforms.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>>
{
+      implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?,
?>> {
     public static TransformPayloadTranslator create() {
       return new ParDoPayloadTranslator();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 3d8d4cd..da22982 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -131,7 +131,8 @@ class SdkComponents {
       return name;
     }
     checkNotNull(children, "child nodes may not be null");
-    componentsBuilder.putTransforms(name, PTransforms.toProto(appliedPTransform, children,
this));
+    componentsBuilder.putTransforms(name, PTransformTranslation
+        .toProto(appliedPTransform, children, this));
     return name;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
index bc568a6..3b3ffa1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java
@@ -19,7 +19,7 @@
 package org.apache.beam.runners.core.construction;
 
 import java.util.Map;
-import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /** A registrar of TransformPayloadTranslator. */

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
index ea4c996..7ed2a49 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java
new file mode 100644
index 0000000..0e6ef97
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests for {@link PTransformTranslation}.
+ */
+@RunWith(Parameterized.class)
+public class PTransformTranslationTest {
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ToAndFromProtoSpec> data() {
+    // This pipeline exists for construction, not to run any test.
+    // TODO: Leaf node with understood payload - i.e. validate payloads
+    ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create()));
+    ToAndFromProtoSpec readMultipleInAndOut =
+        ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create()));
+    TestPipeline compositeReadPipeline = TestPipeline.create();
+    ToAndFromProtoSpec compositeRead =
+        ToAndFromProtoSpec.composite(
+            generateSequence(compositeReadPipeline),
+            ToAndFromProtoSpec.leaf(read(compositeReadPipeline)));
+    return ImmutableList.<ToAndFromProtoSpec>builder()
+        .add(readLeaf)
+        .add(readMultipleInAndOut)
+        .add(compositeRead)
+        // TODO: Composite with multiple children
+        // TODO: Composite with a composite child
+        .build();
+  }
+
+  @AutoValue
+  abstract static class ToAndFromProtoSpec {
+    public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) {
+      return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec(
+          transform, Collections.<ToAndFromProtoSpec>emptyList());
+    }
+
+    public static ToAndFromProtoSpec composite(
+        AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec...
specs) {
+      List<ToAndFromProtoSpec> childSpecs = new ArrayList<>();
+      childSpecs.add(spec);
+      childSpecs.addAll(Arrays.asList(specs));
+      return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec(topLevel, childSpecs);
+    }
+
+    abstract AppliedPTransform<?, ?, ?> getTransform();
+    abstract Collection<ToAndFromProtoSpec> getChildren();
+  }
+
+  @Parameter(0)
+  public ToAndFromProtoSpec spec;
+
+  @Test
+  public void toAndFromProto() throws IOException {
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.PTransform converted = convert(spec, components);
+    Components protoComponents = components.toComponents();
+
+    // Sanity checks
+    assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size()));
+    assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size()));
+    assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size()));
+
+    assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName()));
+    for (PValue inputValue : spec.getTransform().getInputs().values()) {
+      PCollection<?> inputPc = (PCollection<?>) inputValue;
+      protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc));
+    }
+    for (PValue outputValue : spec.getTransform().getOutputs().values()) {
+      PCollection<?> outputPc = (PCollection<?>) outputValue;
+      protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc));
+    }
+  }
+
+  private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components)
+      throws IOException {
+    List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>();
+    for (ToAndFromProtoSpec child : spec.getChildren()) {
+      childTransforms.add(child.getTransform());
+      System.out.println("Converting child " + child);
+      convert(child, components);
+      // Sanity call
+      components.getExistingPTransformId(child.getTransform());
+    }
+    PTransform convert = PTransformTranslation
+        .toProto(spec.getTransform(), childTransforms, components);
+    // Make sure the converted transform is registered. Convert it independently, but if
this is a
+    // child spec, the child must be in the components.
+    components.registerPTransform(spec.getTransform(), childTransforms);
+    return convert;
+  }
+
+  private static class TestDoFn extends DoFn<Long, KV<Long, String>> {
+    // Exists to stop the ParDo application from throwing
+    @ProcessElement public void process(ProcessContext context) {}
+  }
+
+  private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) {
+    GenerateSequence sequence = GenerateSequence.from(0);
+    PCollection<Long> pcollection = pipeline.apply(sequence);
+    return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of(
+        "Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline);
+  }
+
+  private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) {
+    Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded());
+    PCollection<Long> pcollection = pipeline.apply(transform);
+    return AppliedPTransform.<PBegin, PCollection<Long>, Read.Unbounded<Long>>of(
+        "ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline);
+  }
+
+  private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
+    PCollectionView<String> view =
+        pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
+    PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
+    ParDo.MultiOutput<Long, KV<Long, String>> parDo =
+        ParDo.of(new TestDoFn())
+            .withSideInputs(view)
+            .withOutputTags(
+                new TupleTag<KV<Long, String>>() {},
+                TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
+    PCollectionTuple output = input.apply(parDo);
+
+    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+    inputs.putAll(parDo.getAdditionalInputs());
+    inputs.putAll(input.expand());
+
+    return AppliedPTransform
+        .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long,
String>>>of(
+            "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
deleted file mode 100644
index 4125544..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform;
-import org.apache.beam.sdk.io.CountingSource;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Tests for {@link PTransforms}.
- */
-@RunWith(Parameterized.class)
-public class PTransformsTest {
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<ToAndFromProtoSpec> data() {
-    // This pipeline exists for construction, not to run any test.
-    // TODO: Leaf node with understood payload - i.e. validate payloads
-    ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create()));
-    ToAndFromProtoSpec readMultipleInAndOut =
-        ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create()));
-    TestPipeline compositeReadPipeline = TestPipeline.create();
-    ToAndFromProtoSpec compositeRead =
-        ToAndFromProtoSpec.composite(
-            generateSequence(compositeReadPipeline),
-            ToAndFromProtoSpec.leaf(read(compositeReadPipeline)));
-    return ImmutableList.<ToAndFromProtoSpec>builder()
-        .add(readLeaf)
-        .add(readMultipleInAndOut)
-        .add(compositeRead)
-        // TODO: Composite with multiple children
-        // TODO: Composite with a composite child
-        .build();
-  }
-
-  @AutoValue
-  abstract static class ToAndFromProtoSpec {
-    public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) {
-      return new AutoValue_PTransformsTest_ToAndFromProtoSpec(
-          transform, Collections.<ToAndFromProtoSpec>emptyList());
-    }
-
-    public static ToAndFromProtoSpec composite(
-        AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec...
specs) {
-      List<ToAndFromProtoSpec> childSpecs = new ArrayList<>();
-      childSpecs.add(spec);
-      childSpecs.addAll(Arrays.asList(specs));
-      return new AutoValue_PTransformsTest_ToAndFromProtoSpec(topLevel, childSpecs);
-    }
-
-    abstract AppliedPTransform<?, ?, ?> getTransform();
-    abstract Collection<ToAndFromProtoSpec> getChildren();
-  }
-
-  @Parameter(0)
-  public ToAndFromProtoSpec spec;
-
-  @Test
-  public void toAndFromProto() throws IOException {
-    SdkComponents components = SdkComponents.create();
-    RunnerApi.PTransform converted = convert(spec, components);
-    Components protoComponents = components.toComponents();
-
-    // Sanity checks
-    assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size()));
-    assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size()));
-    assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size()));
-
-    assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName()));
-    for (PValue inputValue : spec.getTransform().getInputs().values()) {
-      PCollection<?> inputPc = (PCollection<?>) inputValue;
-      protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc));
-    }
-    for (PValue outputValue : spec.getTransform().getOutputs().values()) {
-      PCollection<?> outputPc = (PCollection<?>) outputValue;
-      protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc));
-    }
-  }
-
-  private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components)
-      throws IOException {
-    List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>();
-    for (ToAndFromProtoSpec child : spec.getChildren()) {
-      childTransforms.add(child.getTransform());
-      System.out.println("Converting child " + child);
-      convert(child, components);
-      // Sanity call
-      components.getExistingPTransformId(child.getTransform());
-    }
-    PTransform convert = PTransforms.toProto(spec.getTransform(), childTransforms, components);
-    // Make sure the converted transform is registered. Convert it independently, but if
this is a
-    // child spec, the child must be in the components.
-    components.registerPTransform(spec.getTransform(), childTransforms);
-    return convert;
-  }
-
-  private static class TestDoFn extends DoFn<Long, KV<Long, String>> {
-    // Exists to stop the ParDo application from throwing
-    @ProcessElement public void process(ProcessContext context) {}
-  }
-
-  private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) {
-    GenerateSequence sequence = GenerateSequence.from(0);
-    PCollection<Long> pcollection = pipeline.apply(sequence);
-    return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of(
-        "Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline);
-  }
-
-  private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) {
-    Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded());
-    PCollection<Long> pcollection = pipeline.apply(transform);
-    return AppliedPTransform.<PBegin, PCollection<Long>, Read.Unbounded<Long>>of(
-        "ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline);
-  }
-
-  private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
-    PCollectionView<String> view =
-        pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
-    PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
-    ParDo.MultiOutput<Long, KV<Long, String>> parDo =
-        ParDo.of(new TestDoFn())
-            .withSideInputs(view)
-            .withOutputTags(
-                new TupleTag<KV<Long, String>>() {},
-                TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
-    PCollectionTuple output = input.apply(parDo);
-
-    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
-    inputs.putAll(parDo.getAdditionalInputs());
-    inputs.putAll(input.expand());
-
-    return AppliedPTransform
-        .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long,
String>>>of(
-            "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
-  }
-}


Mime
View raw message