Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91854200C1B for ; Mon, 30 Jan 2017 21:48:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8E392160B60; Mon, 30 Jan 2017 20:48:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 43AD3160B4D for ; Mon, 30 Jan 2017 21:48:53 +0100 (CET) Received: (qmail 50759 invoked by uid 500); 30 Jan 2017 20:48:52 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 50626 invoked by uid 99); 30 Jan 2017 20:48:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Jan 2017 20:48:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3E306DFBAD; Mon, 30 Jan 2017 20:48:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: lcwik@apache.org To: commits@beam.apache.org Date: Mon, 30 Jan 2017 20:48:56 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/6] beam git commit: A proposal for a portability framework to execute user definable functions. archived-at: Mon, 30 Jan 2017 20:48:55 -0000 A proposal for a portability framework to execute user definable functions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b4b2bec Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b4b2bec Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b4b2bec Branch: refs/heads/master Commit: 0b4b2becb45b9f637ba31f599ebe8be0331bd633 Parents: 582c4a8 Author: Luke Cwik Authored: Thu Jan 19 15:16:55 2017 -0800 Committer: Luke Cwik Committed: Mon Jan 30 12:47:55 2017 -0800 ---------------------------------------------------------------------- pom.xml | 36 +- runners/apex/pom.xml | 2 +- sdks/common/fn-api/pom.xml | 111 +++ .../fn-api/src/main/proto/beam_fn_api.proto | 771 +++++++++++++++++++ sdks/common/pom.xml | 38 + .../src/main/resources/beam/findbugs-filter.xml | 32 +- sdks/java/harness/pom.xml | 167 ++++ .../org/apache/beam/fn/harness/FnHarness.java | 131 ++++ .../harness/channel/ManagedChannelFactory.java | 80 ++ .../harness/channel/SocketAddressFactory.java | 64 ++ .../beam/fn/harness/channel/package-info.java | 22 + .../fn/harness/control/BeamFnControlClient.java | 165 ++++ .../harness/control/ProcessBundleHandler.java | 334 ++++++++ .../fn/harness/control/RegisterHandler.java | 92 +++ .../beam/fn/harness/control/package-info.java | 22 + .../BeamFnDataBufferingOutboundObserver.java | 135 ++++ .../beam/fn/harness/data/BeamFnDataClient.java | 64 ++ .../fn/harness/data/BeamFnDataGrpcClient.java | 122 +++ .../harness/data/BeamFnDataGrpcMultiplexer.java | 140 ++++ .../harness/data/BeamFnDataInboundObserver.java | 81 ++ .../beam/fn/harness/data/package-info.java | 22 + .../fn/harness/fake/FakeAggregatorFactory.java | 52 ++ .../beam/fn/harness/fake/FakeStepContext.java | 70 ++ .../beam/fn/harness/fake/package-info.java | 22 + .../harness/fn/CloseableThrowingConsumer.java | 23 + .../beam/fn/harness/fn/ThrowingBiFunction.java | 32 + .../beam/fn/harness/fn/ThrowingConsumer.java | 32 + .../beam/fn/harness/fn/ThrowingFunction.java | 32 + .../beam/fn/harness/fn/ThrowingRunnable.java | 30 + .../apache/beam/fn/harness/fn/package-info.java | 22 + .../fn/harness/logging/BeamFnLoggingClient.java | 308 ++++++++ .../beam/fn/harness/logging/package-info.java | 22 + .../apache/beam/fn/harness/package-info.java | 22 + .../beam/fn/harness/stream/AdvancingPhaser.java | 36 + .../harness/stream/BufferingStreamObserver.java | 166 ++++ .../fn/harness/stream/DirectStreamObserver.java | 71 ++ .../ForwardingClientResponseObserver.java | 63 ++ .../harness/stream/StreamObserverFactory.java | 91 +++ .../beam/fn/harness/stream/package-info.java | 22 + .../beam/runners/core/BeamFnDataReadRunner.java | 104 +++ .../runners/core/BeamFnDataWriteRunner.java | 87 +++ .../beam/runners/core/BoundedSourceRunner.java | 105 +++ .../apache/beam/runners/core/package-info.java | 22 + .../apache/beam/fn/harness/FnHarnessTest.java | 130 ++++ .../channel/ManagedChannelFactoryTest.java | 74 ++ .../channel/SocketAddressFactoryTest.java | 56 ++ .../control/BeamFnControlClientTest.java | 182 +++++ .../control/ProcessBundleHandlerTest.java | 674 ++++++++++++++++ .../fn/harness/control/RegisterHandlerTest.java | 80 ++ ...BeamFnDataBufferingOutboundObserverTest.java | 142 ++++ .../harness/data/BeamFnDataGrpcClientTest.java | 309 ++++++++ .../data/BeamFnDataGrpcMultiplexerTest.java | 96 +++ .../data/BeamFnDataInboundObserverTest.java | 116 +++ .../logging/BeamFnLoggingClientTest.java | 169 ++++ .../fn/harness/stream/AdvancingPhaserTest.java | 48 ++ .../stream/BufferingStreamObserverTest.java | 146 ++++ .../stream/DirectStreamObserverTest.java | 139 ++++ .../ForwardingClientResponseObserverTest.java | 60 ++ .../stream/StreamObserverFactoryTest.java | 84 ++ .../beam/fn/harness/test/TestExecutors.java | 85 ++ .../beam/fn/harness/test/TestExecutorsTest.java | 160 ++++ .../beam/fn/harness/test/TestStreams.java | 162 ++++ .../beam/fn/harness/test/TestStreamsTest.java | 84 ++ .../runners/core/BeamFnDataReadRunnerTest.java | 187 +++++ .../runners/core/BeamFnDataWriteRunnerTest.java | 155 ++++ .../runners/core/BoundedSourceRunnerTest.java | 113 +++ sdks/java/pom.xml | 1 + sdks/pom.xml | 1 + 68 files changed, 7514 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d09bf59..a53453b 100644 --- a/pom.xml +++ b/pom.xml @@ -117,7 +117,7 @@ 1.22.0 1.4.5 0.5.160304 - 19.0 + 20.0 1.0.1 1.3 2.7.2 @@ -127,7 +127,7 @@ 1.9.5 4.1.3.Final 1.4.0.Final - 3.0.0 + 3.1.0 v1-rev10-1.22.0 1.7.14 3.1.4 @@ -314,6 +314,11 @@ + + org.apache.beam + beam-sdks-common-fn-api + ${project.version} + org.apache.beam @@ -729,6 +734,13 @@ + io.netty + netty-transport-native-epoll + ${netty.version} + linux-x86_64 + + + org.apache.avro avro ${avro.version} @@ -741,6 +753,12 @@ + com.google.errorprone + error_prone_annotations + 2.0.13 + + + joda-time joda-time ${joda.version} @@ -824,9 +842,23 @@ + + + kr.motd.maven + os-maven-plugin + ${os-maven-plugin.version} + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.0 + + + org.apache.maven.plugins maven-checkstyle-plugin 2.17 http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 7ae07e2..5e16083 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -226,7 +226,7 @@ org.slf4j:slf4j-api:jar:1.7.14 org.apache.hadoop:hadoop-common:jar:2.6.0 joda-time:joda-time:jar:2.4 - com.google.guava:guava:jar:19.0 + com.google.guava:guava:jar:20.0 http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/fn-api/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/pom.xml b/sdks/common/fn-api/pom.xml new file mode 100644 index 0000000..72788d0 --- /dev/null +++ b/sdks/common/fn-api/pom.xml @@ -0,0 +1,111 @@ + + + + 4.0.0 + + jar + + org.apache.beam + beam-sdks-common-parent + 0.6.0-SNAPSHOT + ../pom.xml + + + beam-sdks-common-fn-api + Apache Beam :: SDKs :: Common :: Fn API + This artifact generates the stub bindings. + + + + + src/main/resources + true + + + ${project.build.directory}/original_sources_to_package + + + + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + true + + + + + + org.codehaus.mojo + findbugs-maven-plugin + + true + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + + + + com.google.guava + guava + + + + com.google.protobuf + protobuf-java + + + + io.grpc + grpc-core + + + + io.grpc + grpc-protobuf + + + + io.grpc + grpc-stub + + + http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/fn-api/src/main/proto/beam_fn_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto new file mode 100644 index 0000000..3ac0fbf --- /dev/null +++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto @@ -0,0 +1,771 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Protocol Buffers describing the Fn API and boostrapping. + * + * TODO: Usage of plural names in lists looks awkward in Java + * e.g. getOutputsMap, addCodersBuilder + * + * TODO: gRPC / proto field names conflict with generated code + * e.g. "class" in java, "output" in python + */ + +syntax = "proto3"; + +/* TODO: Consider consolidating common components in another package + * and lanaguage namespaces for re-use with Runner Api. + */ + +package org.apache.beam.fn.v1; + +option java_package = "org.apache.beam.fn.v1"; +option java_outer_classname = "BeamFnApi"; + +import "google/protobuf/any.proto"; +import "google/protobuf/timestamp.proto"; + +/* + * Constructs that define the pipeline shape. + * + * These are mostly unstable due to the missing pieces to be shared with + * the Runner Api like windowing strategy, display data, .... There are still + * some modelling questions related to whether a side input is modelled + * as another field on a PrimitiveTransform or as part of inputs and we + * still are missing things like the CompositeTransform. + */ + +// A representation of an input or output definition on a primitive transform. +// Stable +message Target { + // A repeated list of target definitions. + message List { + repeated Target target = 1; + } + + // (Required) The id of the PrimitiveTransform which is the target. + int64 primitive_transform_reference = 1; + + // (Required) The local name of an input or output defined on the primitive + // transform. + string name = 2; +} + +// Information defining a PCollection +message PCollection { + // (Required) A reference to a coder. + int64 coder_reference = 1; + + // TODO: Windowing strategy, ... +} + +// A primitive transform within Apache Beam. +message PrimitiveTransform { + // (Required) A pipeline level unique id which can be used as a reference to + // refer to this. + int64 id = 1; + + // (Required) A function spec that is used by this primitive + // transform to process data. + FunctionSpec function_spec = 2; + + // A map of distinct input names to target definitions. + // For example, in CoGbk this represents the tag name associated with each + // distinct input name and a list of primitive transforms that are associated + // with the specified input. + map inputs = 3; + + // A map from local output name to PCollection definitions. For example, in + // DoFn this represents the tag name associated with each distinct output. + map outputs = 4; + + // TODO: Should we model side inputs as a special type of input for a + // primitive transform or should it be modeled as the relationship that + // the predecessor input will be a view primitive transform. + // A map of from side input names to side inputs. + map side_inputs = 5; + + // The user name of this step. + // TODO: This should really be in display data and not at this level + string step_name = 6; +} + +/* + * User Definable Functions + * + * This is still unstable mainly due to how we model the side input. + */ + +// Defines the common elements of user-definable functions, to allow the SDK to +// express the information the runner needs to execute work. +// Stable +message FunctionSpec { + // (Required) A pipeline level unique id which can be used as a reference to + // refer to this. + int64 id = 1; + + // (Required) A globally unique name representing this user definable + // function. + // + // User definable functions use the urn encodings registered such that another + // may implement the user definable function within another language. + // + // For example: + // urn:org.apache.beam:coder:kv:1.0 + string urn = 2; + + // (Required) Reference to specification of execution environment required to + // invoke this function. + int64 environment_reference = 3; + + // Data used to parameterize this function. Depending on the urn, this may be + // optional or required. + google.protobuf.Any data = 4; +} + +message SideInput { + // TODO: Coder? + + // For RunnerAPI. + Target input = 1; + + // For FnAPI. + FunctionSpec view_fn = 2; +} + +// Defines how to encode values into byte streams and decode values from byte +// streams. A coder can be parameterized by additional properties which may or +// may not be language agnostic. +// +// Coders using the urn:org.apache.beam:coder namespace must have their +// encodings registered such that another may implement the encoding within +// another language. +// +// For example: +// urn:org.apache.beam:coder:kv:1.0 +// urn:org.apache.beam:coder:iterable:1.0 +// Stable +message Coder { + // TODO: This looks weird when compared to the other function specs + // which use URN to differentiate themselves. Should "Coder" be embedded + // inside the FunctionSpec data block. + + // The data associated with this coder used to reconstruct it. + FunctionSpec function_spec = 1; + + // A list of component coder references. + // + // For a key-value coder, there must be exactly two component coder references + // where the first reference represents the key coder and the second reference + // is the value coder. + // + // For an iterable coder, there must be exactly one component coder reference + // representing the value coder. + // + // TODO: Perhaps this is redundant with the data of the FunctionSpec + // for known coders? + repeated int64 component_coder_reference = 2; +} + +// A descriptor for connecting to a remote port using the Beam Fn Data API. +// Allows for communication between two environments (for example between the +// runner and the SDK). +// Stable +message RemoteGrpcPort { + // (Required) An API descriptor which describes where to + // connect to including any authentication that is required. + ApiServiceDescriptor api_service_descriptor = 1; +} + +/* + * Control Plane API + * + * Progress reporting and splitting still need further vetting. Also, this may change + * with the addition of new types of instructions/responses related to metrics. + */ + +// An API that describes the work that a SDK Fn Harness is meant to do. +// Stable +service BeamFnControl { + // Instructions sent by the runner to the SDK requesting different types + // of work. + rpc Control( + // A stream of responses to instructions the SDK was asked to be performed. + stream InstructionResponse + ) returns ( + // A stream of instructions requested of the SDK to be performed. + stream InstructionRequest + ) {} +} + +// A request sent by a runner which it the SDK is asked to fulfill. +// Stable +message InstructionRequest { + // (Required) An unique identifier provided by the runner which represents + // this requests execution. The InstructionResponse MUST have the matching id. + int64 instruction_id = 1; + + // (Required) A request that the SDK Harness needs to interpret. + oneof request { + RegisterRequest register = 1000; + ProcessBundleRequest process_bundle = 1001; + ProcessBundleProgressRequest process_bundle_progress = 1002; + ProcessBundleSplitRequest process_bundle_split = 1003; + } +} + +// The response for an associated request the SDK had been asked to fulfill. +// Stable +message InstructionResponse { + // (Required) A reference provided by the runner which represents a requests + // execution. The InstructionResponse MUST have the matching id when + // responding to the runner. + int64 instruction_id = 1; + + // If this is specified, then this instruction has failed. + // A human readable string representing the reason as to why processing has + // failed. + string error = 2; + + // If the instruction did not fail, it is required to return an equivalent + // response type depending on the request this matches. + oneof response { + RegisterResponse register = 1000; + ProcessBundleResponse process_bundle = 1001; + ProcessBundleProgressResponse process_bundle_progress = 1002; + ProcessBundleSplitResponse process_bundle_split = 1003; + } +} + +// A list of objects which can be referred to by the runner in +// future requests. +// Stable +message RegisterRequest { + // (Optional) The set of descriptors used to process bundles. + repeated ProcessBundleDescriptor process_bundle_descriptor = 1; +} + +// Stable +message RegisterResponse { +} + +// A descriptor of references used when processing a bundle. +// Stable +message ProcessBundleDescriptor { + // (Required) A pipeline level unique id which can be used as a reference to + // refer to this. + int64 id = 1; + + // (Required) A list of primitive transforms that should + // be used to construct the bundle processing graph. + repeated PrimitiveTransform primitive_transform = 2; + + // (Required) The set of all coders referenced in this bundle. + repeated Coder coders = 4; +} + +// A request to process a given bundle. +// Stable +message ProcessBundleRequest { + int64 process_bundle_descriptor_reference = 1; +} + +// Stable +message ProcessBundleResponse { +} + +message ProcessBundleProgressRequest { + // (Required) A reference to an active process bundle request with the given + // instruction id. + int64 instruction_reference = 1; +} + +message ProcessBundleProgressResponse { + // (Required) The finished amount of work. A monotonically increasing + // unitless measure of work finished. + double finished_work = 1; + + // (Required) The known amount of backlog for the process bundle request. + // Computed as: + // (estimated known work - finish work) / finished work + double backlog = 2; +} + +message ProcessBundleSplitRequest { + // (Required) A reference to an active process bundle request with the given + // instruction id. + int64 instruction_reference = 1; + + // (Required) The fraction of work (when compared to the known amount of work) + // the process bundle request should try to split at. + double fraction = 2; +} + +// urn:org.apache.beam:restriction:element-count:1.0 +message ElementCountRestriction { + // A restriction representing the number of elements that should be processed. + // Effectively the range [0, count] + int64 count = 1; +} + +// urn:org.apache.beam:restriction:element-count-skip:1.0 +message ElementCountSkipRestriction { + // A restriction representing the number of elements that should be skipped. + // Effectively the range (count, infinity] + int64 count = 1; +} + +// Each primitive transform that is splittable is defined by a restriction +// it is currently processing. During splitting, that currently active +// restriction (R_initial) is split into 2 components: +// * a restriction (R_done) representing all elements that will be fully +// processed +// * a restriction (R_todo) representing all elements that will not be fully +// processed +// +// where: +// R_initial = R_done ⋃ R_todo +message PrimitiveTransformSplit { + // (Required) A reference to a primitive transform with the given id that + // is part of the active process bundle request with the given instruction + // id. + int64 primitive_transform_reference = 1; + + // (Required) A function specification describing the restriction + // that has been completed by the primitive transform. + // + // For example, a remote GRPC source will have a specific urn and data + // block containing an ElementCountRestriction. + FunctionSpec completed_restriction = 2; + + // (Required) A function specification describing the restriction + // representing the remainder of work for the primitive transform. + // + // FOr example, a remote GRPC source will have a specific urn and data + // block contain an ElemntCountSkipRestriction. + FunctionSpec remaining_restriction = 3; +} + +message ProcessBundleSplitResponse { + // (Optional) A set of split responses for a currently active work item. + // + // If primitive transform B is a descendant of primitive transform A and both + // A and B report a split. Then B's restriction is reported as an element + // restriction pair and thus the fully reported restriction is: + // R = A_done + // ⋃ (A_boundary ⋂ B_done) + // ⋃ (A_boundary ⋂ B_todo) + // ⋃ A_todo + // If there is a decendant of B named C, then C would similarly report a + // set of element pair restrictions. + // + // This restriction is processed and completed by the currently active process + // bundle request: + // A_done ⋃ (A_boundary ⋂ B_done) + // and these restrictions will be processed by future process bundle requests: + // A_boundary ⋂ B_todo (passed to SDF B directly) + // A_todo (passed to SDF A directly) + + // If primitive transform B and C are siblings and descendants of A and A, B, + // and C report a split. Then B and C's restrictions are relative to A's. + // R = A_done + // ⋃ (A_boundary ⋂ B_done) + // ⋃ (A_boundary ⋂ B_todo) + // ⋃ (A_boundary ⋂ B_todo) + // ⋃ (A_boundary ⋂ C_todo) + // ⋃ A_todo + // If there is no descendant of B or C also reporting a split, than + // B_boundary = ∅ and C_boundary = ∅ + // + // This restriction is processed and completed by the currently active process + // bundle request: + // A_done ⋃ (A_boundary ⋂ B_done) + // ⋃ (A_boundary ⋂ C_done) + // and these restrictions will be processed by future process bundle requests: + // A_boundary ⋂ B_todo (passed to SDF B directly) + // A_boundary ⋂ C_todo (passed to SDF C directly) + // A_todo (passed to SDF A directly) + // + // Note that descendants splits should only be reported if it is inexpensive + // to compute the boundary restriction intersected with descendants splits. + // Also note, that the boundary restriction may represent a set of elements + // produced by a parent primitive transform which can not be split at each + // element or that there are intermediate unsplittable primitive transforms + // between an ancestor splittable function and a descendant splittable + // function which may have more than one output per element. Finally note + // that the descendant splits should only be reported if the split + // information is relatively compact. + repeated PrimitiveTransformSplit splits = 1; +} + +/* + * Data Plane API + */ + +// Messages used to represent logical byte streams. +// Stable +message Elements { + // Represents multiple encoded elements in nested context for a given named + // instruction and target. + message Data { + // (Required) A reference to an active instruction request with the given + // instruction id. + int64 instruction_reference = 1; + + // (Required) A definition representing a consumer or producer of this data. + // If received by a harness, this represents the consumer within that + // harness that should consume these bytes. If sent by a harness, this + // represents the producer of these bytes. + // + // Note that a single element may span multiple Data messages. + // + // Note that a sending/receiving pair should share the same target + // identifier. + Target target = 2; + + // (Optional) Represents a part of a logical byte stream. Elements within + // the logical byte stream are encoded in the nested context and + // concatenated together. + // + // An empty data block represents the end of stream for the given + // instruction and target. + bytes data = 3; + } + + // (Required) A list containing parts of logical byte streams. + repeated Data data = 1; +} + +// Stable +service BeamFnData { + // Used to send data between harnesses. + rpc Data( + // A stream of data representing input. + stream Elements + ) returns ( + // A stream of data representing output. + stream Elements + ) {} +} + +/* + * State API + * + * This is just a high level sketch of how this could work. There is still + * a lot of work with respect to how the key spaces for the different types + * of access required (side inputs, user state, ...) and how state caching + * works across bundles. + */ + +message StateRequest { + // (Required) An unique identifier provided by the SDK which represents this + // requests execution. The StateResponse must have the matching id. + int64 id = 1; + + // (Required) The associated instruction id of the work that is currently + // being processed. This allows for the runner to associate any modifications + // to state to be committed with the appropriate work execution. + int64 instruction_reference = 2; + + // At least one of the following fields should be populated. + // Also, no request should use a state key referred to in another state key. + + // (Optional) A request to get state. + repeated StateGetRequest get = 3; + + // (Optional) A request to append to state. + repeated StateAppendRequest append = 4; + + // (Optional) A request to clear state. + repeated StateClearRequest clear = 5; +} + +message StateResponse { + // (Required) A reference provided by the SDK which represents a requests + // execution. The StateResponse must have the matching id when responding + // to the SDK. + int64 id = 1; + + // (Required) The associated instruction id of the work that is currently + // being processed. + int64 instruction_reference = 2; + + // (Required) A key to associate with the version of this state. Allows for + // SDKs to share state across work items if they have the same cache key and + // state key. + bytes cache_key = 3; + + // (Optional) If this is specified, then the state request has failed. + // A human readable string representing the reason as to why the request + // failed. + string error = 4; + + // For every field populated in the StateRequest, there is a matching field in + // the StateResponse. + + // (Optional) A response to getting state. + repeated StateGetResponse get = 5; + + // (Optional) A response to appending to state. + repeated StateAppendResponse append = 6; + + // (Optional) A response to clearing state. + repeated StateClearResponse clear = 7; +} + +service BeamFnState { + // Used to get/append/clear state stored by the runner on behalf of the SDK. + rpc State( + // A stream of state instructions requested of the runner. + stream StateRequest + ) returns ( + // A stream of responses to state instructions the runner was asked to be + // performed. + stream StateResponse + ) {} +} + + +// TODO: Resolve with the other State API. +service SimpleBeamFnState { + // Gets the elements associated with the given key. + rpc Get(StateKey) returns (Elements.Data) {} + // Appends elements to a given state bag. + rpc Append(SimpleStateAppendRequest) returns (Empty) {} + // Clears a given state bag. + rpc Clear(StateKey) returns (Empty) {} +} + +message Empty { +} + +message SimpleStateAppendRequest { + StateKey state_key = 1; + repeated bytes data = 2; +} + +message StateKey { + // (Required) Represents the namespace for the state. If this state is for a + // DoFn, then this reference is expected to point to the DoFn. If this state + // is for a side input, then this is expected to reference the ViewFn. + int64 function_spec_reference = 1; + + // (Required) The bytes of the window which this state request is for encoded + // in the outer context. + bytes window = 2; + + // (Required) The user key for which the value was encoded in the outer + // context. + bytes key = 3; +} + +message StateKeyOrIterable { + // One of the two fields below are required to be set. + // If state key is set, then the State API should be invoked to fetch the + // values allowing one to restart the iterable. Otherwise the bytes for the + // entire iterable are represented and should be decoded using an iterable + // coder using the outer context. + StateKey state_key = 1; + repeated bytes iterable = 2; +} + +// A request to get state for the given state key. +message StateGetRequest { + // A state key encoded in the outer context. + StateKey state_key = 1; +} + +// A response to get state for the given state key. +message StateGetResponse { + // A state key encoded in the outer context. + StateKey state_key = 1; + + oneof state { + // A description of an input port which will stream the state data. + RemoteGrpcPort remote_grpc_port = 1000; + } +} + +// A request to append state for the given state key. +message StateAppendRequest { + // A state key encoded in the outer context. + StateKey state_key = 1; +} + +// A response to append state for the given state key. +message StateAppendResponse { + // A state key encoded in the outer context. + StateKey state_key = 1; + + oneof state { + // A description of an output port which to stream the state data to. + RemoteGrpcPort remote_grpc_port = 1000; + } +} + +// A request to clear state for the given state key. +message StateClearRequest { + // A state key encoded in the outer context. + StateKey state_key = 1; +} + +// A response to clear state for the given state key. +message StateClearResponse { +} + +/* + * Logging API + * + * This is very stable. There can be some changes to how we define a LogEntry, + * to increase/decrease the severity types, the way we format an exception/stack + * trace, or the log site. + */ + +// A log entry +message LogEntry { + // A list of log entries, enables buffering and batching of multiple + // log messages using the logging API. + message List { + // (Required) One or or more log messages. + repeated LogEntry log_entries = 1; + } + + // The severity of the event described in a log entry, expressed as one of the + // severity levels listed below. For your reference, the levels are + // assigned the listed numeric values. The effect of using numeric values + // other than those listed is undefined. + // + // If you are writing log entries, you should map other severity encodings to + // one of these standard levels. For example, you might map all of + // Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`. + // + // This list is intentionally not comprehensive; the intent is to provide a + // common set of "good enough" severity levels so that logging front ends + // can provide filtering and searching across log types. Users of the API are + // free not to use all severity levels in their log messages. + enum Severity { + // Trace level information, also the default log level unless + // another severity is specified. + TRACE = 0; + // Debugging information. + DEBUG = 10; + // Normal events. + INFO = 20; + // Normal but significant events, such as start up, shut down, or + // configuration. + NOTICE = 30; + // Warning events might cause problems. + WARN = 40; + // Error events are likely to cause problems. + ERROR = 50; + // Critical events cause severe problems or brief outages and may + // indicate that a person must take action. + CRITICAL = 60; + } + + // (Required) The severity of the log statement. + Severity severity = 1; + + // (Required) The time at which this log statement occurred. + google.protobuf.Timestamp timestamp = 2; + + // (Required) A human readable message. + string message = 3; + + // (Optional) An optional trace of the functions involved. For example, in + // Java this can include multiple causes and multiple suppressed exceptions. + string trace = 4; + + // (Optional) A reference to the instruction this log statement is associated + // with. + int64 instruction_reference = 5; + + // (Optional) A reference to the primitive transform this log statement is + // associated with. + int64 primitive_transform_reference = 6; + + // (Optional) Human-readable name of the function or method being invoked, + // with optional context such as the class or package name. The format can + // vary by language. For example: + // qual.if.ied.Class.method (Java) + // dir/package.func (Go) + // module.function (Python) + // file.cc:382 (C++) + string log_location = 7; + + // (Optional) The name of the thread this log statement is associated with. + string thread = 8; +} + +message LogControl { +} + +// Stable +service BeamFnLogging { + // Allows for the SDK to emit log entries which the runner can + // associate with the active job. + rpc Logging( + // A stream of log entries batched into lists emitted by the SDK harness. + stream LogEntry.List + ) returns ( + // A stream of log control messages used to configure the SDK. + stream LogControl + ) {} +} + +/* + * Environment types + */ +message ApiServiceDescriptor { + // (Required) A pipeline level unique id which can be used as a reference to + // refer to this. + int64 id = 1; + + // (Required) The URL to connect to. + string url = 2; + + // (Optional) The method for authentication. If unspecified, access to the + // url is already being performed in a trusted context (e.g. localhost, + // private network). + oneof authentication { + OAuth2ClientCredentialsGrant oauth2_client_credentials_grant = 3; + } +} + +message OAuth2ClientCredentialsGrant { + // (Required) The URL to submit a "client_credentials" grant type request for + // an OAuth access token which will be used as a bearer token for requests. + string url = 1; +} + +// A Docker container configuration for launching the SDK Fn Harness to execute +// user specified functions. +message DockerContainer { + // (Required) A pipeline level unique id which can be used as a reference to + // refer to this. + int64 id = 1; + + // (Required) The Docker container URI + // For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1" + string uri = 2; + + // (Optional) Docker registry specification. + // If unspecified, the uri is expected to be able to be fetched without + // requiring additional configuration by a runner. + int64 registry_reference = 3; +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/common/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/common/pom.xml b/sdks/common/pom.xml new file mode 100644 index 0000000..8364d9a --- /dev/null +++ b/sdks/common/pom.xml @@ -0,0 +1,38 @@ + + + + + 4.0.0 + + + org.apache.beam + beam-sdks-parent + 0.6.0-SNAPSHOT + ../pom.xml + + + beam-sdks-common-parent + + pom + + Apache Beam :: SDKs :: Common + + + fn-api + + http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 35b5ed3..91ab9be 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -34,12 +34,42 @@ unapproved artifact license. --> + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml new file mode 100644 index 0000000..e164ee0 --- /dev/null +++ b/sdks/java/harness/pom.xml @@ -0,0 +1,167 @@ + + + + 4.0.0 + + jar + + + org.apache.beam + beam-sdks-java-parent + 0.6.0-SNAPSHOT + ../pom.xml + + + beam-sdks-java-harness + Apache Beam :: SDKs :: Java :: Harness + This contains the SDK Fn Harness for Beam Java + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + + + org.apache.beam + beam-sdks-java-core + + + + org.apache.beam + beam-sdks-java-core + ${project.version} + tests + test + + + + org.apache.beam + beam-runners-core-java + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + + + + org.apache.beam + beam-sdks-common-fn-api + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.google.auto.value + auto-value + provided + + + + com.google.errorprone + error_prone_annotations + provided + + + + com.google.code.findbugs + jsr305 + + + + com.google.guava + guava + + + + com.google.protobuf + protobuf-lite + + + + com.google.protobuf + protobuf-java + + + + io.grpc + grpc-core + + + + io.grpc + grpc-netty + + + + io.grpc + grpc-stub + + + + io.netty + netty-transport-native-epoll + linux-x86_64 + + + + org.slf4j + slf4j-api + + + + + org.hamcrest + hamcrest-all + test + + + + junit + junit + test + + + + org.mockito + mockito-all + test + + + + org.slf4j + slf4j-jdk14 + test + + + http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java new file mode 100644 index 0000000..3e06f38 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.TextFormat; +import java.io.PrintStream; +import java.util.EnumMap; +import org.apache.beam.fn.harness.channel.ManagedChannelFactory; +import org.apache.beam.fn.harness.control.BeamFnControlClient; +import org.apache.beam.fn.harness.control.ProcessBundleHandler; +import org.apache.beam.fn.harness.control.RegisterHandler; +import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient; +import org.apache.beam.fn.harness.fn.ThrowingFunction; +import org.apache.beam.fn.harness.logging.BeamFnLoggingClient; +import org.apache.beam.fn.harness.stream.StreamObserverFactory; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main entry point into the Beam SDK Fn Harness for Java. + * + *

This entry point expects the following environment variables: + *

    + *
  • LOGGING_API_SERVICE_DESCRIPTOR: A + * {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text + * representing the endpoint that is to be connected to for the Beam Fn Logging service.
  • + *
  • CONTROL_API_SERVICE_DESCRIPTOR: A + * {@link org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor} encoded as text + * representing the endpoint that is to be connected to for the Beam Fn Control service.
  • + *
  • PIPELINE_OPTIONS: A serialized form of {@link PipelineOptions}. See {@link PipelineOptions} + * for further details.
  • + *
+ */ +public class FnHarness { + private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR"; + private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR"; + private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS"; + private static final Logger LOGGER = LoggerFactory.getLogger(FnHarness.class); + + private static BeamFnApi.ApiServiceDescriptor getApiServiceDescriptor(String env) + throws TextFormat.ParseException { + BeamFnApi.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = + BeamFnApi.ApiServiceDescriptor.newBuilder(); + TextFormat.merge(System.getenv(env), apiServiceDescriptorBuilder); + return apiServiceDescriptorBuilder.build(); + } + + public static void main(String[] args) throws Exception { + System.out.format("SDK Fn Harness started%n"); + System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR)); + System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR)); + System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS)); + + PipelineOptions options = new ObjectMapper().readValue( + System.getenv(PIPELINE_OPTIONS), PipelineOptions.class); + + BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor = + getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR); + + BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor = + getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR); + + main(options, loggingApiServiceDescriptor, controlApiServiceDescriptor); + } + + public static void main(PipelineOptions options, + BeamFnApi.ApiServiceDescriptor loggingApiServiceDescriptor, + BeamFnApi.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception { + IOChannelUtils.registerIOFactories(options); + + ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options); + StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options); + PrintStream originalErrStream = System.err; + + try (BeamFnLoggingClient logging = new BeamFnLoggingClient( + options, + loggingApiServiceDescriptor, + channelFactory::forDescriptor, + streamObserverFactory::from)) { + + LOGGER.info("Fn Harness started"); + EnumMap> handlers = + new EnumMap<>(BeamFnApi.InstructionRequest.RequestCase.class); + + RegisterHandler fnApiRegistry = new RegisterHandler(); + BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient( + options, channelFactory::forDescriptor, streamObserverFactory::from); + + ProcessBundleHandler processBundleHandler = + new ProcessBundleHandler(options, fnApiRegistry::getById, beamFnDataMultiplexer); + handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, + fnApiRegistry::register); + handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, + processBundleHandler::processBundle); + BeamFnControlClient control = new BeamFnControlClient(controlApiServiceDescriptor, + channelFactory::forDescriptor, + streamObserverFactory::from, + handlers); + + LOGGER.info("Entering instruction processing loop"); + control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService()); + } catch (Throwable t) { + t.printStackTrace(originalErrStream); + } finally { + originalErrStream.println("Shutting SDK harness down."); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java new file mode 100644 index 0000000..d26f4a5 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.channel; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.netty.NettyChannelBuilder; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.unix.DomainSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import org.apache.beam.fn.v1.BeamFnApi.ApiServiceDescriptor; +import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Uses {@link PipelineOptions} to configure which underlying {@link ManagedChannel} implementation + * to use. + */ +public abstract class ManagedChannelFactory { + public static ManagedChannelFactory from(PipelineOptions options) { + List experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments(); + if (experiments != null && experiments.contains("beam_fn_api_epoll")) { + io.netty.channel.epoll.Epoll.ensureAvailability(); + return new Epoll(); + } + return new Default(); + } + + public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor); + + /** + * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address + * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an + * {@link EpollSocketChannel}. + */ + private static class Epoll extends ManagedChannelFactory { + @Override + public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl()); + return NettyChannelBuilder.forAddress(address) + .channelType(address instanceof DomainSocketAddress + ? EpollDomainSocketChannel.class : EpollSocketChannel.class) + .eventLoopGroup(new EpollEventLoopGroup()) + .usePlaintext(true) + .build(); + } + } + + /** + * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create + * instances. + */ + private static class Default extends ManagedChannelFactory { + @Override + public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { + return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) + .usePlaintext(true) + .build(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java new file mode 100644 index 0000000..a27d542 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/SocketAddressFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.channel; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.net.HostAndPort; +import io.netty.channel.unix.DomainSocketAddress; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +/** Creates a {@link SocketAddress} based upon a supplied string. */ +public class SocketAddressFactory { + private static final String UNIX_DOMAIN_SOCKET_PREFIX = "unix://"; + + /** + * Parse a {@link SocketAddress} from the given string. + */ + public static SocketAddress createFrom(String value) { + if (value.startsWith(UNIX_DOMAIN_SOCKET_PREFIX)) { + // Unix Domain Socket address. + // Create the underlying file for the Unix Domain Socket. + String filePath = value.substring(UNIX_DOMAIN_SOCKET_PREFIX.length()); + File file = new File(filePath); + if (!file.isAbsolute()) { + throw new IllegalArgumentException("File path must be absolute: " + filePath); + } + try { + if (file.createNewFile()) { + // If this application created the file, delete it when the application exits. + file.deleteOnExit(); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + // Create the SocketAddress referencing the file. + return new DomainSocketAddress(file); + } else { + // Standard TCP/IP address. + HostAndPort hostAndPort = HostAndPort.fromString(value); + checkArgument(hostAndPort.hasPort(), + "Address must be a unix:// path or be in the form host:port. Got: %s", value); + return new InetSocketAddress(hostAndPort.getHostText(), hostAndPort.getPort()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java new file mode 100644 index 0000000..6323166 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * gRPC channel management. + */ +package org.apache.beam.fn.harness.channel; http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java new file mode 100644 index 0000000..7f44a01 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.control; + +import static com.google.common.base.Throwables.getStackTraceAsString; + +import com.google.common.util.concurrent.Uninterruptibles; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.EnumMap; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.beam.fn.harness.fn.ThrowingFunction; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnControlGrpc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A client for the Beam Fn Control API. Uses an unbounded internal queue to pull down + * an unbounded number of requests. + * + *

Also can delegate to a set of handlers based upon the + * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest.RequestCase request type}. + * + *

When the inbound instruction stream finishes successfully, the {@code onFinish} is + * completed successfully signaling to the caller that this client will not produce any more + * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s. If the inbound instruction stream + * errors, the {@code onFinish} is completed exceptionally propagating the failure reason + * to the caller and signaling that this client will not produce any more + * {@link org.apache.beam.fn.v1.BeamFnApi.InstructionRequest}s. + */ +public class BeamFnControlClient { + private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnControlClient.class); + private static final BeamFnApi.InstructionRequest POISON_PILL = + BeamFnApi.InstructionRequest.newBuilder().setInstructionId(Long.MIN_VALUE).build(); + + private final StreamObserver outboundObserver; + private final BlockingDeque bufferedInstructions; + private final EnumMap> handlers; + private final CompletableFuture onFinish; + + public BeamFnControlClient( + BeamFnApi.ApiServiceDescriptor apiServiceDescriptor, + Function channelFactory, + BiFunction, + StreamObserver>, + StreamObserver, + StreamObserver> streamObserverFactory, + EnumMap> handlers) { + this.bufferedInstructions = new LinkedBlockingDeque<>(); + this.outboundObserver = streamObserverFactory.apply( + BeamFnControlGrpc.newStub(channelFactory.apply(apiServiceDescriptor))::control, + new InboundObserver()); + this.handlers = handlers; + this.onFinish = new CompletableFuture<>(); + } + + /** + * A {@link StreamObserver} for the inbound stream that completes the future on stream + * termination. + */ + private class InboundObserver implements StreamObserver { + @Override + public void onNext(BeamFnApi.InstructionRequest value) { + LOGGER.info("InstructionRequest received {}", value); + Uninterruptibles.putUninterruptibly(bufferedInstructions, value); + } + + @Override + public void onError(Throwable t) { + placePoisonPillIntoQueue(); + onFinish.completeExceptionally(t); + } + + @Override + public void onCompleted() { + placePoisonPillIntoQueue(); + onFinish.complete(null); + } + + /** + * This method emulates {@link Uninterruptibles#putUninterruptibly} but placing the + * element at the front of the queue. + * + *

We place the poison pill at the front of the queue because if the server shutdown, + * any remaining instructions can be discarded. + */ + private void placePoisonPillIntoQueue() { + while (true) { + try { + bufferedInstructions.putFirst(POISON_PILL); + return; + } catch (InterruptedException e) { + // Ignored until we place the poison pill into the queue + } + } + } + } + + /** + * Note that this method continuously submits work to the supplied executor until the + * Beam Fn Control server hangs up or fails exceptionally. + */ + public void processInstructionRequests(Executor executor) + throws InterruptedException, ExecutionException { + BeamFnApi.InstructionRequest request; + while ((request = bufferedInstructions.take()) != POISON_PILL) { + BeamFnApi.InstructionRequest currentRequest = request; + executor.execute( + () -> sendInstructionResponse(delegateOnInstructionRequestType(currentRequest))); + } + onFinish.get(); + } + + public BeamFnApi.InstructionResponse delegateOnInstructionRequestType( + BeamFnApi.InstructionRequest value) { + try { + return handlers.getOrDefault(value.getRequestCase(), this::missingHandler) + .apply(value) + .setInstructionId(value.getInstructionId()) + .build(); + } catch (Exception e) { + return BeamFnApi.InstructionResponse.newBuilder() + .setInstructionId(value.getInstructionId()) + .setError(getStackTraceAsString(e)) + .build(); + } + } + + public void sendInstructionResponse(BeamFnApi.InstructionResponse value) { + outboundObserver.onNext(value); + } + + private BeamFnApi.InstructionResponse.Builder missingHandler( + BeamFnApi.InstructionRequest request) { + return BeamFnApi.InstructionResponse.newBuilder().setError( + String.format("Unknown InstructionRequest type %s", request.getRequestCase())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java new file mode 100644 index 0000000..05c2aab --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.control; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.Iterables.getOnlyElement; + +import com.google.common.collect.Collections2; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fake.FakeAggregatorFactory; +import org.apache.beam.fn.harness.fake.FakeStepContext; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.core.BeamFnDataReadRunner; +import org.apache.beam.runners.core.BeamFnDataWriteRunner; +import org.apache.beam.runners.core.BoundedSourceRunner; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.dataflow.util.DoFnInfo; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.NullSideInputReader; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing + * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec}, + * wiring them together based upon the {@code input} and {@code output} map definitions. + * + *

Finally executes the DAG based graph by starting all runners in reverse topological order, + * and finishing all runners in forward topological order. + */ +public class ProcessBundleHandler { + // TODO: What should the initial set of URNs be? + private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1"; + private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1"; + private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1"; + private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ProcessBundleHandler.class); + + private final PipelineOptions options; + private final Function fnApiRegistry; + private final BeamFnDataClient beamFnDataClient; + + public ProcessBundleHandler( + PipelineOptions options, + Function fnApiRegistry, + BeamFnDataClient beamFnDataClient) { + this.options = options; + this.fnApiRegistry = fnApiRegistry; + this.beamFnDataClient = beamFnDataClient; + } + + protected void createConsumersForPrimitiveTransform( + BeamFnApi.PrimitiveTransform primitiveTransform, + Supplier processBundleInstructionId, + Function>>> consumers, + BiConsumer>> addConsumer, + Consumer addStartFunction, + Consumer addFinishFunction) throws IOException { + + BeamFnApi.FunctionSpec functionSpec = primitiveTransform.getFunctionSpec(); + + // For every output PCollection, create a map from output name to Consumer + ImmutableMap.Builder>>> + outputMapBuilder = ImmutableMap.builder(); + for (Map.Entry entry : + primitiveTransform.getOutputsMap().entrySet()) { + outputMapBuilder.put( + entry.getKey(), + consumers.apply( + BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference(primitiveTransform.getId()) + .setName(entry.getKey()) + .build())); + } + ImmutableMap>>> outputMap = + outputMapBuilder.build(); + + // Based upon the function spec, populate the start/finish/consumer information. + ThrowingConsumer> consumer; + switch (functionSpec.getUrn()) { + default: + BeamFnApi.Target target; + BeamFnApi.Coder coderSpec; + throw new IllegalArgumentException( + String.format("Unknown FunctionSpec %s", functionSpec)); + + case DATA_OUTPUT_URN: + target = BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference(primitiveTransform.getId()) + .setName(getOnlyElement(primitiveTransform.getOutputsMap().keySet())) + .build(); + coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply( + getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference()); + BeamFnDataWriteRunner remoteGrpcWriteRunner = + new BeamFnDataWriteRunner<>( + functionSpec, + processBundleInstructionId, + target, + coderSpec, + beamFnDataClient); + addStartFunction.accept(remoteGrpcWriteRunner::registerForOutput); + consumer = remoteGrpcWriteRunner::consume; + addFinishFunction.accept(remoteGrpcWriteRunner::close); + break; + + case DATA_INPUT_URN: + target = BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference(primitiveTransform.getId()) + .setName(getOnlyElement(primitiveTransform.getInputsMap().keySet())) + .build(); + coderSpec = (BeamFnApi.Coder) fnApiRegistry.apply( + getOnlyElement(primitiveTransform.getOutputsMap().values()).getCoderReference()); + BeamFnDataReadRunner remoteGrpcReadRunner = + new BeamFnDataReadRunner<>( + functionSpec, + processBundleInstructionId, + target, + coderSpec, + beamFnDataClient, + outputMap); + addStartFunction.accept(remoteGrpcReadRunner::registerInputLocation); + consumer = null; + addFinishFunction.accept(remoteGrpcReadRunner::blockTillReadFinishes); + break; + + case JAVA_DO_FN_URN: + DoFnRunner doFnRunner = createDoFnRunner(functionSpec, outputMap); + addStartFunction.accept(doFnRunner::startBundle); + addFinishFunction.accept(doFnRunner::finishBundle); + consumer = doFnRunner::processElement; + break; + + case JAVA_SOURCE_URN: + @SuppressWarnings({"unchecked", "rawtypes"}) + BoundedSourceRunner, OutputT> sourceRunner = + createBoundedSourceRunner(functionSpec, outputMap); + @SuppressWarnings({"unchecked", "rawtypes"}) + ThrowingConsumer> sourceConsumer = + (ThrowingConsumer) + (ThrowingConsumer>>) + sourceRunner::runReadLoop; + // TODO: Remove and replace with source being sent across gRPC port + addStartFunction.accept(sourceRunner::start); + consumer = (ThrowingConsumer) sourceConsumer; + break; + } + + if (consumer != null) { + for (Map.Entry entry : + primitiveTransform.getInputsMap().entrySet()) { + for (BeamFnApi.Target target : entry.getValue().getTargetList()) { + addConsumer.accept(target, consumer); + } + } + } + } + + public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request) + throws Exception { + BeamFnApi.InstructionResponse.Builder response = + BeamFnApi.InstructionResponse.newBuilder() + .setProcessBundle(BeamFnApi.ProcessBundleResponse.getDefaultInstance()); + + long bundleId = request.getProcessBundle().getProcessBundleDescriptorReference(); + BeamFnApi.ProcessBundleDescriptor bundleDescriptor = + (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId); + + Multimap>> outputTargetToConsumer = + HashMultimap.create(); + List startFunctions = new ArrayList<>(); + List finishFunctions = new ArrayList<>(); + // We process the primitive transform list in reverse order + // because we assume that the runner provides it in topologically order. + // This means that all the start/finish functions will be in reverse topological order. + for (BeamFnApi.PrimitiveTransform primitiveTransform : + Lists.reverse(bundleDescriptor.getPrimitiveTransformList())) { + createConsumersForPrimitiveTransform( + primitiveTransform, + request::getInstructionId, + outputTargetToConsumer::get, + outputTargetToConsumer::put, + startFunctions::add, + finishFunctions::add); + } + + // Already in reverse order so we don't need to do anything. + for (ThrowingRunnable startFunction : startFunctions) { + LOGGER.debug("Starting function {}", startFunction); + startFunction.run(); + } + + // Need to reverse this since we want to call finish in topological order. + for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions)) { + LOGGER.debug("Finishing function {}", finishFunction); + finishFunction.run(); + } + + return response; + } + + /** + * Converts a {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec} into a {@link DoFnRunner}. + */ + private DoFnRunner createDoFnRunner( + BeamFnApi.FunctionSpec functionSpec, + Map>>> outputMap) { + ByteString serializedFn; + try { + serializedFn = functionSpec.getData().unpack(BytesValue.class).getValue(); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException( + String.format("Unable to unwrap DoFn %s", functionSpec), e); + } + DoFnInfo doFnInfo = + (DoFnInfo) + SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo"); + + checkArgument( + Objects.equals( + new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)), + doFnInfo.getOutputMap().keySet()), + "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.", + outputMap.keySet(), + doFnInfo.getOutputMap()); + + ImmutableMultimap.Builder, + ThrowingConsumer>> tagToOutput = + ImmutableMultimap.builder(); + for (Map.Entry> entry : doFnInfo.getOutputMap().entrySet()) { + tagToOutput.putAll(entry.getValue(), outputMap.get(Long.toString(entry.getKey()))); + } + @SuppressWarnings({"unchecked", "rawtypes"}) + final Map, Collection>>> tagBasedOutputMap = + (Map) tagToOutput.build().asMap(); + + OutputManager outputManager = + new OutputManager() { + Map, Collection>>> tupleTagToOutput = + tagBasedOutputMap; + + @Override + public void output(TupleTag tag, WindowedValue output) { + try { + Collection>> consumers = + tupleTagToOutput.get(tag); + if (consumers == null) { + // TODO: Should we handle undeclared outputs, if so how? + throw new UnsupportedOperationException(String.format( + "Unable to output %s on unknown output %s", output, tag)); + } + for (ThrowingConsumer> consumer : consumers) { + consumer.accept(output); + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }; + + @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) + DoFnRunner runner = + DoFnRunners.simpleRunner( + PipelineOptionsFactory.create(), /* TODO */ + (DoFn) doFnInfo.getDoFn(), + NullSideInputReader.empty(), /* TODO */ + outputManager, + (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()), + new ArrayList<>(doFnInfo.getOutputMap().values()), + new FakeStepContext(), + new FakeAggregatorFactory(), + (WindowingStrategy) doFnInfo.getWindowingStrategy()); + return runner; + } + + private , OutputT> + BoundedSourceRunner createBoundedSourceRunner( + BeamFnApi.FunctionSpec functionSpec, + Map>>> outputMap) { + + @SuppressWarnings({"rawtypes", "unchecked"}) + BoundedSourceRunner runner = + new BoundedSourceRunner(options, functionSpec, outputMap); + return runner; + } +}