beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] beam git commit: Inline rather than reference FunctionSpecs.
Date Wed, 01 Mar 2017 17:04:35 GMT
Repository: beam
Updated Branches:
  refs/heads/master 79b1395c2 -> d84b06791


Inline rather than reference FunctionSpecs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d390406e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d390406e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d390406e

Branch: refs/heads/master
Commit: d390406e27112faed31233d7daef1f650a31cd0f
Parents: 79b1395
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Tue Feb 28 15:51:24 2017 -0800
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Wed Mar 1 09:04:30 2017 -0800

----------------------------------------------------------------------
 .../src/main/proto/beam_runner_api.proto        | 39 +++++++++-----------
 .../beam/sdk/util/WindowingStrategies.java      | 18 ++-------
 2 files changed, 20 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 58532b2..44ead56 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -47,10 +47,6 @@ message Components {
 
   // (Required) A map from pipeline-scoped id to Environment.
   map<string, Environment> environments = 5;
-
-  // (Required) A map from pipeline-scoped id to FunctionSpec,
-  // a record for a particular user-defined function.
-  map<string, FunctionSpec> function_specs = 6;
 }
 
 // A disjoint union of all the things that may contain references
@@ -207,8 +203,8 @@ message PCollection {
 // The payload for the primitive ParDo transform.
 message ParDoPayload {
 
-  // (Required) The pipeline-scoped id of the FunctionSpec for the DoFn.
-  string fn_id = 1;
+  // (Required) The FunctionSpec of the DoFn.
+  FunctionSpec do_fn = 1;
 
   // (Required) Additional pieces of context the DoFn may require that
   // are not otherwise represented in the payload.
@@ -266,9 +262,8 @@ enum IsBounded {
 // The payload for the primitive Read transform.
 message ReadPayload {
 
-  // (Required) The pipeline-scoped id of the FunctionSpec of the source for
-  // this Read.
-  string source_id = 1;
+  // (Required) The FunctionSpec of the source for this Read.
+  FunctionSpec source = 1;
 
   // (Required) Whether the source is bounded or unbounded
   IsBounded is_bounded = 2;
@@ -279,15 +274,15 @@ message ReadPayload {
 // The payload for the WindowInto transform.
 message WindowIntoPayload {
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the WindowFn.
-  string fn_id = 1;
+  // (Required) The FunctionSpec of the WindowFn.
+  FunctionSpec window_fn = 1;
 }
 
 // The payload for the special-but-not-primitive Combine transform.
 message CombinePayload {
 
-  // (Required) The pipeline-scoped id of the FunctionSpec for the CombineFn.
-  string fn_id = 1;
+  // (Required) The FunctionSpec of the CombineFn.
+  FunctionSpec combine_fn = 1;
 
   // (Required) A reference to the Coder to use for accumulators of the CombineFn
   string accumulator_coder_id = 2;
@@ -325,10 +320,10 @@ message Coder {
 // TODO: consider inlining field on PCollection
 message WindowingStrategy {
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
-  // assigns windows, merges windows, and shifts timestamps before they are
+  // (Required) The FunctionSpec of the UDF that assigns windows,
+  // merges windows, and shifts timestamps before they are
   // combined according to the OutputTime.
-  string fn_id = 1;
+  FunctionSpec window_fn = 1;
 
   // (Required) Whether or not the window fn is merging.
   //
@@ -584,20 +579,20 @@ message SideInput {
   // URN)
   UrnWithParameter access_pattern = 1;
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
-  // adapts a particular access_pattern to a user-facing view type.
+  // (Required) The FunctionSpec of the UDF that adapts a particular
+  // access_pattern to a user-facing view type.
   //
   // For example, View.asSingleton() may include a `view_fn` that adapts a
   // specially-designed multimap to a single value per window.
-  string view_fn_id = 2;
+  FunctionSpec view_fn = 2;
 
-  // (Required) The pipeline-scoped id for the FunctionSpec of the UDF that
-  // maps a main input window to a side input window.
+  // (Required) The FunctionSpec of the UDF that maps a main input window
+  // to a side input window.
   //
   // For example, when the main input is in fixed windows of one hour, this
   // can specify that the side input should be accessed according to the day
   // in which that hour falls.
-  string window_mapping_fn_id = 3;
+  FunctionSpec window_mapping_fn = 3;
 }
 
 // An environment for executing UDFs. Generally an SDK container URL, but

http://git-wip-us.apache.org/repos/asf/beam/blob/d390406e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
index 3047da1..7bc581c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
@@ -195,10 +195,6 @@ public class WindowingStrategies implements Serializable {
   public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
       throws IOException {
 
-    // TODO: have an inverted components to find the id for a thing already
-    // in the components
-    String windowFnId = UUID.randomUUID().toString();
-
     RunnerApi.MessageWithComponents windowFnWithComponents =
         toProto(windowingStrategy.getWindowFn());
 
@@ -209,16 +205,11 @@ public class WindowingStrategies implements Serializable {
             .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
             .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
             .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
-            .setFnId(windowFnId);
+            .setWindowFn(windowFnWithComponents.getFunctionSpec());
 
     return RunnerApi.MessageWithComponents.newBuilder()
         .setWindowingStrategy(windowingStrategyProto)
-        .setComponents(
-            windowFnWithComponents
-                .getComponents()
-                .toBuilder()
-                .putFunctionSpecs(windowFnId, windowFnWithComponents.getFunctionSpec()))
-        .build();
+        .setComponents(windowFnWithComponents.getComponents()).build();
   }
 
   /**
@@ -246,10 +237,7 @@ public class WindowingStrategies implements Serializable {
       RunnerApi.WindowingStrategy proto, RunnerApi.Components components)
       throws InvalidProtocolBufferException {
 
-    FunctionSpec windowFnSpec =
-        components
-            .getFunctionSpecsMap()
-            .get(proto.getFnId());
+    FunctionSpec windowFnSpec = proto.getWindowFn();
 
     checkArgument(
         windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),


Mime
View raw message