beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [11/12] beam git commit: Add RawPTransform, which can just vend its URN and payload
Date Fri, 26 May 2017 04:08:17 GMT
Add RawPTransform, which can just vend its URN and payload

This is the type that will be returned when a pipeline is deserialized.
This also is convenient for direct runner overrides which do not really
merit translator registrations, yet URNs need to be known in order to
key the evaluator registry off URN.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/663ad881
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/663ad881
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/663ad881

Branch: refs/heads/master
Commit: 663ad88178ceadffe4cfa592555986ed7dde58b4
Parents: 8e09596
Author: Kenneth Knowles <klk@google.com>
Authored: Fri May 19 21:28:19 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu May 25 11:17:08 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     | 51 ++++++++++++++++++++
 1 file changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/663ad881/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 35bb0e3..9f5f3b5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -21,15 +21,20 @@ package org.apache.beam.runners.core.construction;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.Any;
+import com.google.protobuf.Message;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -135,4 +140,50 @@ public class PTransformTranslation {
     String getUrn(T transform);
     FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components);
   }
+
+  /**
+   * A {@link PTransform} that indicates its URN and payload directly.
+   *
+   * <p>This is the result of rehydrating transforms from a pipeline proto. There is
no {@link
+   * #expand} method since the definition of the transform may be lost. The transform is
already
+   * fully expanded in the pipeline proto.
+   */
+  public abstract static class RawPTransform<
+          InputT extends PInput, OutputT extends POutput, PayloadT extends Message>
+      extends PTransform<InputT, OutputT> {
+
+    public abstract String getUrn();
+
+    @Nullable
+    PayloadT getPayload() {
+      return null;
+    }
+  }
+
+  /**
+   * A translator that uses the explicit URN and payload from a {@link RawPTransform}.
+   */
+  public static class RawPTransformTranslator<PayloadT extends Message>
+      implements TransformPayloadTranslator<RawPTransform<?, ?, PayloadT>> {
+    @Override
+    public String getUrn(RawPTransform<?, ?, PayloadT> transform) {
+      return transform.getUrn();
+    }
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, RawPTransform<?, ?, PayloadT>> transform,
+        SdkComponents components) {
+      PayloadT payload = transform.getTransform().getPayload();
+
+      FunctionSpec.Builder transformSpec =
+          FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
+
+      if (payload != null) {
+        transformSpec.setParameter(Any.pack(payload));
+      }
+
+      return transformSpec.build();
+    }
+  }
 }


Mime
View raw message