beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-1347] Update protos related to State API for prototyping purposes.
Date Fri, 14 Apr 2017 16:10:56 GMT
Repository: beam
Updated Branches:
  refs/heads/master 4daac6644 -> 6bf42622c


[BEAM-1347] Update protos related to State API for prototyping purposes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25d77b12
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25d77b12
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25d77b12

Branch: refs/heads/master
Commit: 25d77b1240b366542c9094b2ab6373f520278cd0
Parents: 4daac66
Author: Luke Cwik <lcwik@google.com>
Authored: Wed Apr 12 15:28:35 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Fri Apr 14 09:10:08 2017 -0700

----------------------------------------------------------------------
 sdks/common/fn-api/pom.xml                      |   5 -
 .../fn-api/src/main/proto/beam_fn_api.proto     | 174 +++++++++----------
 2 files changed, 82 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/25d77b12/sdks/common/fn-api/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml
index e9253c2..e3a583b 100644
--- a/sdks/common/fn-api/pom.xml
+++ b/sdks/common/fn-api/pom.xml
@@ -82,11 +82,6 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/25d77b12/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 80bae2e..79e1872 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -199,7 +199,7 @@ message RemoteGrpcPort {
  * with the addition of new types of instructions/responses related to metrics.
  */
 
-// An API that describes the work that a SDK Fn Harness is meant to do.
+// An API that describes the work that a SDK harness is meant to do.
 // Stable
 service BeamFnControl {
   // Instructions sent by the runner to the SDK requesting different types
@@ -282,7 +282,13 @@ message ProcessBundleDescriptor {
 // A request to process a given bundle.
 // Stable
 message ProcessBundleRequest {
+  // (Required) A reference to the process bundle descriptor that must be
+  // instantiated and executed by the SDK harness.
   string process_bundle_descriptor_reference = 1;
+
+  // (Optional) A list of cache tokens that can be used by an SDK to cache
+  // data looked up using the State API across multiple bundles.
+  repeated CacheToken cache_tokens = 2;
 }
 
 // Stable
@@ -465,16 +471,12 @@ service BeamFnData {
 
 /*
  * State API
- *
- * This is just a high level sketch of how this could work. There is still
- * a lot of work with respect to how the key spaces for the different types
- * of access required (side inputs, user state, ...) and how state caching
- * works across bundles.
  */
 
 message StateRequest {
   // (Required) An unique identifier provided by the SDK which represents this
-  // requests execution. The StateResponse must have the matching id.
+  // requests execution. The StateResponse corresponding with this request
+  // will have the matching id.
   string id = 1;
 
   // (Required) The associated instruction id of the work that is currently
@@ -482,17 +484,20 @@ message StateRequest {
   // to state to be committed with the appropriate work execution.
   string instruction_reference = 2;
 
-  // At least one of the following fields should be populated.
-  // Also, no request should use a state key referred to in another state key.
+  // (Required) The state key this request is for.
+  StateKey state_key = 3;
 
-  // (Optional) A request to get state.
-  repeated StateGetRequest get = 3;
+  // (Required) The action to take on this request.
+  oneof request {
+    // A request to get state.
+    StateGetRequest get = 1000;
 
-  // (Optional) A request to append to state.
-  repeated StateAppendRequest append = 4;
+    // A request to append to state.
+    StateAppendRequest append = 1001;
 
-  // (Optional) A request to clear state.
-  repeated StateClearRequest clear = 5;
+    // A request to clear state.
+    StateClearRequest clear = 1002;
+  }
 }
 
 message StateResponse {
@@ -501,31 +506,22 @@ message StateResponse {
   // to the SDK.
   string id = 1;
 
-  // (Required) The associated instruction id of the work that is currently
-  // being processed.
-  string instruction_reference = 2;
-
-  // (Required) A key to associate with the version of this state. Allows for
-  // SDKs to share state across work items if they have the same cache key and
-  // state key.
-  bytes cache_key = 3;
-
   // (Optional) If this is specified, then the state request has failed.
   // A human readable string representing the reason as to why the request
   // failed.
-  string error = 4;
-
-  // For every field populated in the StateRequest, there is a matching field in
-  // the StateResponse.
+  string error = 2;
 
-  // (Optional) A response to getting state.
-  repeated StateGetResponse get = 5;
+  // A corresponding response matching the request will be populated.
+  oneof response {
+    // A response to getting state.
+    StateGetResponse get = 1000;
 
-  // (Optional) A response to appending to state.
-  repeated StateAppendResponse append = 6;
+    // A response to appending to state.
+    StateAppendResponse append = 1001;
 
-  // (Optional) A response to clearing state.
-  repeated StateClearResponse clear = 7;
+    // A response to clearing state.
+    StateClearResponse clear = 1002;
+  }
 }
 
 service BeamFnState {
@@ -540,91 +536,84 @@ service BeamFnState {
   ) {}
 }
 
+message CacheToken {
+  // (Required) Represents the function spec and tag associated with this state
+  // key.
+  //
+  // By combining the function_spec_reference with the tag representing:
+  //   * the input, we refer to the iterable portion of a large GBK
+  //   * the side input, we refer to the side input
+  //   * the user state, we refer to user state
+  Target target = 1;
 
-// TODO: Resolve with the other State API.
-service SimpleBeamFnState {
-  // Gets the elements associated with the given key.
-  rpc Get(StateKey) returns (Elements.Data) {}
-  // Appends elements to a given state bag.
-  rpc Append(SimpleStateAppendRequest) returns (Empty) {}
-  // Clears a given state bag.
-  rpc Clear(StateKey) returns (Empty) {}
-}
-
-message Empty {
-}
-
-message SimpleStateAppendRequest {
-  StateKey state_key = 1;
-  repeated bytes data = 2;
+  // (Required) An opaque identifier.
+  bytes token = 2;
 }
 
 message StateKey {
-  // (Required) Represents the namespace for the state. If this state is for a
-  // DoFn, then this reference is expected to point to the DoFn. If this state
-  // is for a side input, then this is expected to reference the ViewFn.
-  string function_spec_reference = 1;
+  // (Required) Represents the function spec and tag associated with this state
+  // key.
+  //
+  // By combining the function_spec_reference with the tag representing:
+  //   * the input, we refer to fetching the iterable portion of a large GBK
+  //   * the side input, we refer to fetching the side input
+  //   * the user state, we refer to fetching user state
+  Target target = 1;
 
   // (Required) The bytes of the window which this state request is for encoded
-  // in the outer context.
+  // in the nested context.
   bytes window = 2;
 
-  // (Required) The user key for which the value was encoded in the outer
-  // context.
+  // (Required) The user key encoded in the nested context.
   bytes key = 3;
 }
 
-message StateKeyOrIterable {
-  // One of the two fields below are required to be set.
-  // If state key is set, then the State API should be invoked to fetch the
-  // values allowing one to restart the iterable. Otherwise the bytes for the
-  // entire iterable are represented and should be decoded using an iterable
-  // coder using the outer context.
-  StateKey state_key = 1;
-  repeated bytes iterable = 2;
+// A logical byte stream which can be continued using the state API.
+message ContinuableStream {
+  // (Optional) If specified, represents a token which can be used with the
+  // state API to get the next chunk of this logical byte stream. The end of
+  // the logical byte stream is signalled by this field being unset.
+  bytes continuation_token = 1;
+
+  // Represents a part of a logical byte stream. Elements within
+  // the logical byte stream are encoded in the nested context and
+  // concatenated together.
+  bytes data = 2;
 }
 
-// A request to get state for the given state key.
+// A request to get state.
 message StateGetRequest {
-  // A state key encoded in the outer context.
-  StateKey state_key = 1;
+  // (Optional) If specified, signals to the runner that the response
+  // should resume from the following continuation token.
+  //
+  // If unspecified, signals to the runner that the response should start
+  // from the beginning of the logical continuable stream.
+  bytes continuation_token = 1;
 }
 
-// A response to get state for the given state key.
+// A response to get state.
 message StateGetResponse {
-  // A state key encoded in the outer context.
-  StateKey state_key = 1;
-
-  oneof state {
-    // A description of an input port which will stream the state data.
-    RemoteGrpcPort remote_grpc_port = 1000;
-  }
+  // (Required) The response containing a continuable logical byte stream.
+  ContinuableStream stream = 1;
 }
 
-// A request to append state for the given state key.
+// A request to append state.
 message StateAppendRequest {
-  // A state key encoded in the outer context.
-  StateKey state_key  = 1;
+  // Represents a part of a logical byte stream. Elements within
+  // the logical byte stream are encoded in the nested context and
+  // multiple append requests are concatenated together.
+  bytes data = 1;
 }
 
-// A response to append state for the given state key.
+// A response to append state.
 message StateAppendResponse {
-  // A state key encoded in the outer context.
-  StateKey state_key = 1;
-
-  oneof state {
-    // A description of an output port which to stream the state data to.
-    RemoteGrpcPort remote_grpc_port = 1000;
-  }
 }
 
-// A request to clear state for the given state key.
+// A request to clear state.
 message StateClearRequest {
-  // A state key encoded in the outer context.
-  StateKey state_key = 1;
 }
 
-// A response to clear state for the given state key.
+// A response to clear state.
 message StateClearResponse {
 }
 
@@ -753,7 +742,7 @@ message OAuth2ClientCredentialsGrant {
   string url = 1;
 }
 
-// A Docker container configuration for launching the SDK Fn Harness to execute
+// A Docker container configuration for launching the SDK harness to execute
 // user specified functions.
 message DockerContainer {
   // (Required) A pipeline level unique id which can be used as a reference to
@@ -769,3 +758,4 @@ message DockerContainer {
   // requiring additional configuration by a runner.
   string registry_reference = 3;
 }
+


Mime
View raw message