Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B5547200BAC for ; Wed, 26 Oct 2016 18:43:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B3E34160AE1; Wed, 26 Oct 2016 16:43:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5EBD9160AFD for ; Wed, 26 Oct 2016 18:43:50 +0200 (CEST) Received: (qmail 86922 invoked by uid 500); 26 Oct 2016 16:43:49 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 86909 invoked by uid 99); 26 Oct 2016 16:43:49 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2016 16:43:49 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1B3BDC1266 for ; Wed, 26 Oct 2016 16:43:49 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id WBRERL1s0BPU for ; Wed, 26 Oct 2016 16:43:44 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 871365FC58 for ; Wed, 26 Oct 2016 16:43:42 +0000 (UTC) Received: (qmail 82265 invoked by uid 99); 26 Oct 2016 16:43:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Oct 2016 16:43:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E09E1E97DD; Wed, 26 Oct 2016 16:43:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Wed, 26 Oct 2016 16:44:26 -0000 Message-Id: <2ab931c0f8634b7bbfbb883f97679ee0@git.apache.org> In-Reply-To: <35cd756b7a90424cbecde858fc5896b0@git.apache.org> References: <35cd756b7a90424cbecde858fc5896b0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [48/50] [abbrv] incubator-beam git commit: remove "pipeline" in runner name archived-at: Wed, 26 Oct 2016 16:43:51 -0000 remove "pipeline" in runner name Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/94bd47cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/94bd47cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/94bd47cd Branch: refs/heads/gearpump-runner Commit: 94bd47cdb7e4b8f1d874ace1c60e4251636a8110 Parents: 8f013cb Author: manuzhang Authored: Wed Oct 26 16:18:39 2016 +0800 Committer: manuzhang Committed: Wed Oct 26 16:19:13 2016 +0800 ---------------------------------------------------------------------- .../gearpump/GearpumpPipelineRunner.java | 191 ------------------- .../GearpumpPipelineRunnerRegistrar.java | 62 ------ .../beam/runners/gearpump/GearpumpRunner.java | 191 +++++++++++++++++++ .../gearpump/GearpumpRunnerRegistrar.java | 62 ++++++ .../runners/gearpump/TestGearpumpRunner.java | 4 +- .../gearpump/examples/StreamingWordCount.java | 4 +- 6 files changed, 257 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java deleted file mode 100644 index 9e32227..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.gearpump; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.beam.runners.core.AssignWindows; -import org.apache.beam.runners.gearpump.translators.TranslationContext; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -import org.apache.gearpump.cluster.ClusterConfig; -import org.apache.gearpump.cluster.UserConfig; -import org.apache.gearpump.cluster.client.ClientContext; -import org.apache.gearpump.cluster.embedded.EmbeddedCluster; -import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; - -/** - * A {@link PipelineRunner} that executes the operations in the - * pipeline by first translating them to Gearpump Stream DSL - * and then executing them on a Gearpump cluster. - */ -@SuppressWarnings({"rawtypes", "unchecked"}) -public class GearpumpPipelineRunner extends PipelineRunner { - - private final GearpumpPipelineOptions options; - - private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers"; - private static final String DEFAULT_APPNAME = "beam_gearpump_app"; - - public GearpumpPipelineRunner(GearpumpPipelineOptions options) { - this.options = options; - } - - public static GearpumpPipelineRunner fromOptions(PipelineOptions options) { - GearpumpPipelineOptions pipelineOptions = - PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options); - return new GearpumpPipelineRunner(pipelineOptions); - } - - - public OutputT apply( - PTransform transform, InputT input) { - if (Window.Bound.class.equals(transform.getClass())) { - return (OutputT) super.apply( - new AssignWindowsAndSetStrategy((Window.Bound) transform), input); - } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) - && ((PCollectionList) input).size() == 0) { - return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of()); - } else if (Create.Values.class.equals(transform.getClass())) { - return (OutputT) PCollection - .createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - PCollection.IsBounded.BOUNDED); - } else { - return super.apply(transform, input); - } - } - - @Override - public GearpumpPipelineResult run(Pipeline pipeline) { - String appName = options.getApplicationName(); - if (null == appName) { - appName = DEFAULT_APPNAME; - } - Config config = registerSerializers(ClusterConfig.defaultConfig(), - options.getSerializers()); - ClientContext clientContext = getClientContext(options, config); - options.setClientContext(clientContext); - JavaStreamApp streamApp = new JavaStreamApp( - appName, clientContext, UserConfig.empty()); - TranslationContext translationContext = new TranslationContext(streamApp, options); - GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext); - translator.translate(pipeline); - streamApp.run(); - - return null; - } - - private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) { - EmbeddedCluster cluster = options.getEmbeddedCluster(); - if (cluster != null) { - return cluster.newClientContext(); - } else { - return ClientContext.apply(config); - } - } - - /** - * register class with default kryo serializers. - */ - private Config registerSerializers(Config config, Map userSerializers) { - Map serializers = new HashMap<>(); - serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", ""); - serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", ""); - serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", ""); - serializers.put("org.joda.time.Instant", ""); - serializers.put("org.apache.beam.sdk.values.KV", ""); - serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", ""); - serializers.put("org.apache.beam.sdk.values.TimestampedValue", ""); - if (userSerializers != null && !userSerializers.isEmpty()) { - serializers.putAll(userSerializers); - } - return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers)); - } - - - /** - * copied from DirectPipelineRunner. - * used to replace Window.Bound till window function is added to Gearpump Stream DSL - */ - private static class AssignWindowsAndSetStrategy - extends PTransform, PCollection> { - - private final Window.Bound wrapped; - - AssignWindowsAndSetStrategy(Window.Bound wrapped) { - this.wrapped = wrapped; - } - - @Override - public PCollection apply(PCollection input) { - WindowingStrategy outputStrategy = - wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); - - WindowFn windowFn = - (WindowFn) outputStrategy.getWindowFn(); - - if (!windowFn.isNonMerging()) { - throw new UnsupportedOperationException( - "merging window is not supported in Gearpump pipeline"); - } - - // If the Window.Bound transform only changed parts other than the WindowFn, then - // we skip AssignWindows even though it should be harmless in a perfect world. - // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly - // crash if another GBK is performed without explicitly setting the WindowFn. So we skip - // AssignWindows in this case. - if (wrapped.getWindowFn() == null) { - return input.apply("Identity", ParDo.of(new IdentityFn())) - .setWindowingStrategyInternal(outputStrategy); - } else { - return input - .apply("AssignWindows", new AssignWindows<>(windowFn)) - .setWindowingStrategyInternal(outputStrategy); - } - } - } - - private static class IdentityFn extends OldDoFn { - @Override - public void processElement(ProcessContext c) { - c.output(c.element()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java deleted file mode 100644 index ca173d1..0000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; - -/** - * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the - * {@link GearpumpPipelineRunner}. - * - * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner} - * and {@link PipelineOptions} as available pipeline runner services. - */ -public class GearpumpPipelineRunnerRegistrar { - private GearpumpPipelineRunnerRegistrar() { } - - /** - * Registers the {@link GearpumpPipelineRunner}. - */ - @AutoService(PipelineRunnerRegistrar.class) - public static class Runner implements PipelineRunnerRegistrar { - - @Override - public Iterable>> getPipelineRunners() { - return ImmutableList.>>of(TestGearpumpRunner.class); - } - } - - /** - * Registers the {@link GearpumpPipelineOptions}. - */ - @AutoService(PipelineOptionsRegistrar.class) - public static class Options implements PipelineOptionsRegistrar { - - @Override - public Iterable> getPipelineOptions() { - return ImmutableList.>of(GearpumpPipelineOptions.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java new file mode 100644 index 0000000..ed0813d --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.gearpump; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.beam.runners.core.AssignWindows; +import org.apache.beam.runners.gearpump.translators.TranslationContext; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsValidator; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; + +import org.apache.gearpump.cluster.ClusterConfig; +import org.apache.gearpump.cluster.UserConfig; +import org.apache.gearpump.cluster.client.ClientContext; +import org.apache.gearpump.cluster.embedded.EmbeddedCluster; +import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; + +/** + * A {@link PipelineRunner} that executes the operations in the + * pipeline by first translating them to Gearpump Stream DSL + * and then executing them on a Gearpump cluster. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class GearpumpRunner extends PipelineRunner { + + private final GearpumpPipelineOptions options; + + private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers"; + private static final String DEFAULT_APPNAME = "beam_gearpump_app"; + + public GearpumpRunner(GearpumpPipelineOptions options) { + this.options = options; + } + + public static GearpumpRunner fromOptions(PipelineOptions options) { + GearpumpPipelineOptions pipelineOptions = + PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options); + return new GearpumpRunner(pipelineOptions); + } + + + public OutputT apply( + PTransform transform, InputT input) { + if (Window.Bound.class.equals(transform.getClass())) { + return (OutputT) super.apply( + new AssignWindowsAndSetStrategy((Window.Bound) transform), input); + } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass()) + && ((PCollectionList) input).size() == 0) { + return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of()); + } else if (Create.Values.class.equals(transform.getClass())) { + return (OutputT) PCollection + .createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.BOUNDED); + } else { + return super.apply(transform, input); + } + } + + @Override + public GearpumpPipelineResult run(Pipeline pipeline) { + String appName = options.getApplicationName(); + if (null == appName) { + appName = DEFAULT_APPNAME; + } + Config config = registerSerializers(ClusterConfig.defaultConfig(), + options.getSerializers()); + ClientContext clientContext = getClientContext(options, config); + options.setClientContext(clientContext); + JavaStreamApp streamApp = new JavaStreamApp( + appName, clientContext, UserConfig.empty()); + TranslationContext translationContext = new TranslationContext(streamApp, options); + GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext); + translator.translate(pipeline); + streamApp.run(); + + return null; + } + + private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) { + EmbeddedCluster cluster = options.getEmbeddedCluster(); + if (cluster != null) { + return cluster.newClientContext(); + } else { + return ClientContext.apply(config); + } + } + + /** + * register class with default kryo serializers. + */ + private Config registerSerializers(Config config, Map userSerializers) { + Map serializers = new HashMap<>(); + serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", ""); + serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", ""); + serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", ""); + serializers.put("org.joda.time.Instant", ""); + serializers.put("org.apache.beam.sdk.values.KV", ""); + serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", ""); + serializers.put("org.apache.beam.sdk.values.TimestampedValue", ""); + if (userSerializers != null && !userSerializers.isEmpty()) { + serializers.putAll(userSerializers); + } + return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers)); + } + + + /** + * copied from DirectPipelineRunner. + * used to replace Window.Bound till window function is added to Gearpump Stream DSL + */ + private static class AssignWindowsAndSetStrategy + extends PTransform, PCollection> { + + private final Window.Bound wrapped; + + AssignWindowsAndSetStrategy(Window.Bound wrapped) { + this.wrapped = wrapped; + } + + @Override + public PCollection apply(PCollection input) { + WindowingStrategy outputStrategy = + wrapped.getOutputStrategyInternal(input.getWindowingStrategy()); + + WindowFn windowFn = + (WindowFn) outputStrategy.getWindowFn(); + + if (!windowFn.isNonMerging()) { + throw new UnsupportedOperationException( + "merging window is not supported in Gearpump pipeline"); + } + + // If the Window.Bound transform only changed parts other than the WindowFn, then + // we skip AssignWindows even though it should be harmless in a perfect world. + // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly + // crash if another GBK is performed without explicitly setting the WindowFn. So we skip + // AssignWindows in this case. + if (wrapped.getWindowFn() == null) { + return input.apply("Identity", ParDo.of(new IdentityFn())) + .setWindowingStrategyInternal(outputStrategy); + } else { + return input + .apply("AssignWindows", new AssignWindows<>(windowFn)) + .setWindowingStrategyInternal(outputStrategy); + } + } + } + + private static class IdentityFn extends OldDoFn { + @Override + public void processElement(ProcessContext c) { + c.output(c.element()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java new file mode 100644 index 0000000..b77e1e3 --- /dev/null +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.gearpump; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; + +/** + * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the + * {@link GearpumpRunner}. + * + * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner} + * and {@link PipelineOptions} as available pipeline runner services. + */ +public class GearpumpRunnerRegistrar { + private GearpumpRunnerRegistrar() { } + + /** + * Registers the {@link GearpumpRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + + @Override + public Iterable>> getPipelineRunners() { + return ImmutableList.>>of(TestGearpumpRunner.class); + } + } + + /** + * Registers the {@link GearpumpPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>of(GearpumpPipelineOptions.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java index cedd31f..89d31a6 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java @@ -33,14 +33,14 @@ import org.apache.gearpump.cluster.embedded.EmbeddedCluster; */ public class TestGearpumpRunner extends PipelineRunner { - private final GearpumpPipelineRunner delegate; + private final GearpumpRunner delegate; private final EmbeddedCluster cluster; private TestGearpumpRunner(GearpumpPipelineOptions options) { cluster = EmbeddedCluster.apply(); cluster.start(); options.setEmbeddedCluster(cluster); - delegate = GearpumpPipelineRunner.fromOptions(options); + delegate = GearpumpRunner.fromOptions(options); } public static TestGearpumpRunner fromOptions(PipelineOptions options) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java index ba50de7..1d85c25 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.gearpump.examples; import org.apache.beam.runners.gearpump.GearpumpPipelineOptions; -import org.apache.beam.runners.gearpump.GearpumpPipelineRunner; +import org.apache.beam.runners.gearpump.GearpumpRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -75,7 +75,7 @@ public class StreamingWordCount { public static void main(String[] args) { GearpumpPipelineOptions options = PipelineOptionsFactory .fromArgs(args).as(GearpumpPipelineOptions.class); - options.setRunner(GearpumpPipelineRunner.class); + options.setRunner(GearpumpRunner.class); options.setApplicationName("StreamingWordCount"); options.setParallelism(1);