beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [48/50] [abbrv] incubator-beam git commit: remove "pipeline" in runner name
Date Wed, 26 Oct 2016 16:44:26 GMT
remove "pipeline" in runner name


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/94bd47cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/94bd47cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/94bd47cd

Branch: refs/heads/gearpump-runner
Commit: 94bd47cdb7e4b8f1d874ace1c60e4251636a8110
Parents: 8f013cb
Author: manuzhang <owenzhang1990@gmail.com>
Authored: Wed Oct 26 16:18:39 2016 +0800
Committer: manuzhang <owenzhang1990@gmail.com>
Committed: Wed Oct 26 16:19:13 2016 +0800

----------------------------------------------------------------------
 .../gearpump/GearpumpPipelineRunner.java        | 191 -------------------
 .../GearpumpPipelineRunnerRegistrar.java        |  62 ------
 .../beam/runners/gearpump/GearpumpRunner.java   | 191 +++++++++++++++++++
 .../gearpump/GearpumpRunnerRegistrar.java       |  62 ++++++
 .../runners/gearpump/TestGearpumpRunner.java    |   4 +-
 .../gearpump/examples/StreamingWordCount.java   |   4 +-
 6 files changed, 257 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
deleted file mode 100644
index 9e32227..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunner.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.gearpump;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.beam.runners.core.AssignWindows;
-import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import org.apache.gearpump.cluster.ClusterConfig;
-import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
-
-/**
- * A {@link PipelineRunner} that executes the operations in the
- * pipeline by first translating them to Gearpump Stream DSL
- * and then executing them on a Gearpump cluster.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineRunner extends PipelineRunner<GearpumpPipelineResult>
{
-
-  private final GearpumpPipelineOptions options;
-
-  private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
-  private static final String DEFAULT_APPNAME = "beam_gearpump_app";
-
-  public GearpumpPipelineRunner(GearpumpPipelineOptions options) {
-    this.options = options;
-  }
-
-  public static GearpumpPipelineRunner fromOptions(PipelineOptions options) {
-    GearpumpPipelineOptions pipelineOptions =
-        PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
-    return new GearpumpPipelineRunner(pipelineOptions);
-  }
-
-
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (Window.Bound.class.equals(transform.getClass())) {
-      return (OutputT) super.apply(
-              new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
-    } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
-            && ((PCollectionList<?>) input).size() == 0) {
-      return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
-    } else if (Create.Values.class.equals(transform.getClass())) {
-      return (OutputT) PCollection
-              .<OutputT>createPrimitiveOutputInternal(
-                      input.getPipeline(),
-                      WindowingStrategy.globalDefault(),
-                      PCollection.IsBounded.BOUNDED);
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
-  @Override
-  public GearpumpPipelineResult run(Pipeline pipeline) {
-    String appName = options.getApplicationName();
-    if (null == appName) {
-      appName = DEFAULT_APPNAME;
-    }
-    Config config = registerSerializers(ClusterConfig.defaultConfig(),
-        options.getSerializers());
-    ClientContext clientContext = getClientContext(options, config);
-    options.setClientContext(clientContext);
-    JavaStreamApp streamApp = new JavaStreamApp(
-        appName, clientContext, UserConfig.empty());
-    TranslationContext translationContext = new TranslationContext(streamApp, options);
-    GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
-    translator.translate(pipeline);
-    streamApp.run();
-
-    return null;
-  }
-
-  private ClientContext getClientContext(GearpumpPipelineOptions options, Config config)
{
-    EmbeddedCluster cluster = options.getEmbeddedCluster();
-    if (cluster != null) {
-      return cluster.newClientContext();
-    } else {
-      return ClientContext.apply(config);
-    }
-  }
-
-  /**
-   * register class with default kryo serializers.
-   */
-  private Config registerSerializers(Config config, Map<String, String> userSerializers)
{
-    Map<String, String> serializers = new HashMap<>();
-    serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow",
"");
-    serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
-    serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
-    serializers.put("org.joda.time.Instant", "");
-    serializers.put("org.apache.beam.sdk.values.KV", "");
-    serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
-    serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
-    if (userSerializers != null && !userSerializers.isEmpty()) {
-      serializers.putAll(userSerializers);
-    }
-    return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
-  }
-
-
-  /**
-   * copied from DirectPipelineRunner.
-   * used to replace Window.Bound till window function is added to Gearpump Stream DSL
-   */
-  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
-      extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private final Window.Bound<T> wrapped;
-
-    AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
-      this.wrapped = wrapped;
-    }
-
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
-      WindowFn<T, BoundedWindow> windowFn =
-          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
-      if (!windowFn.isNonMerging()) {
-        throw new UnsupportedOperationException(
-            "merging window is not supported in Gearpump pipeline");
-      }
-
-      // If the Window.Bound transform only changed parts other than the WindowFn, then
-      // we skip AssignWindows even though it should be harmless in a perfect world.
-      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
-      // crash if another GBK is performed without explicitly setting the WindowFn. So we
skip
-      // AssignWindows in this case.
-      if (wrapped.getWindowFn() == null) {
-        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
-            .setWindowingStrategyInternal(outputStrategy);
-      } else {
-        return input
-            .apply("AssignWindows", new AssignWindows<>(windowFn))
-            .setWindowingStrategyInternal(outputStrategy);
-      }
-    }
-  }
-
-  private static class IdentityFn<T> extends OldDoFn<T, T> {
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
deleted file mode 100644
index ca173d1..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineRunnerRegistrar.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-/**
- * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for
the
- * {@link GearpumpPipelineRunner}.
- *
- * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner}
- * and {@link PipelineOptions} as available pipeline runner services.
- */
-public class GearpumpPipelineRunnerRegistrar {
-  private GearpumpPipelineRunnerRegistrar() { }
-
-  /**
-   * Registers the {@link GearpumpPipelineRunner}.
-   */
-  @AutoService(PipelineRunnerRegistrar.class)
-  public static class Runner implements PipelineRunnerRegistrar {
-
-    @Override
-    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners()
{
-      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
-    }
-  }
-
-  /**
-   * Registers the {@link GearpumpPipelineOptions}.
-   */
-  @AutoService(PipelineOptionsRegistrar.class)
-  public static class Options implements PipelineOptionsRegistrar {
-
-    @Override
-    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-      return ImmutableList.<Class<? extends PipelineOptions>>of(GearpumpPipelineOptions.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
new file mode 100644
index 0000000..ed0813d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.core.AssignWindows;
+import org.apache.beam.runners.gearpump.translators.TranslationContext;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to Gearpump Stream DSL
+ * and then executing them on a Gearpump cluster.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
+
+  private final GearpumpPipelineOptions options;
+
+  private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
+  private static final String DEFAULT_APPNAME = "beam_gearpump_app";
+
+  public GearpumpRunner(GearpumpPipelineOptions options) {
+    this.options = options;
+  }
+
+  public static GearpumpRunner fromOptions(PipelineOptions options) {
+    GearpumpPipelineOptions pipelineOptions =
+        PipelineOptionsValidator.validate(GearpumpPipelineOptions.class, options);
+    return new GearpumpRunner(pipelineOptions);
+  }
+
+
+  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+      PTransform<InputT, OutputT> transform, InputT input) {
+    if (Window.Bound.class.equals(transform.getClass())) {
+      return (OutputT) super.apply(
+              new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+    } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+            && ((PCollectionList<?>) input).size() == 0) {
+      return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
+    } else if (Create.Values.class.equals(transform.getClass())) {
+      return (OutputT) PCollection
+              .<OutputT>createPrimitiveOutputInternal(
+                      input.getPipeline(),
+                      WindowingStrategy.globalDefault(),
+                      PCollection.IsBounded.BOUNDED);
+    } else {
+      return super.apply(transform, input);
+    }
+  }
+
+  @Override
+  public GearpumpPipelineResult run(Pipeline pipeline) {
+    String appName = options.getApplicationName();
+    if (null == appName) {
+      appName = DEFAULT_APPNAME;
+    }
+    Config config = registerSerializers(ClusterConfig.defaultConfig(),
+        options.getSerializers());
+    ClientContext clientContext = getClientContext(options, config);
+    options.setClientContext(clientContext);
+    JavaStreamApp streamApp = new JavaStreamApp(
+        appName, clientContext, UserConfig.empty());
+    TranslationContext translationContext = new TranslationContext(streamApp, options);
+    GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
+    translator.translate(pipeline);
+    streamApp.run();
+
+    return null;
+  }
+
+  private ClientContext getClientContext(GearpumpPipelineOptions options, Config config)
{
+    EmbeddedCluster cluster = options.getEmbeddedCluster();
+    if (cluster != null) {
+      return cluster.newClientContext();
+    } else {
+      return ClientContext.apply(config);
+    }
+  }
+
+  /**
+   * register class with default kryo serializers.
+   */
+  private Config registerSerializers(Config config, Map<String, String> userSerializers)
{
+    Map<String, String> serializers = new HashMap<>();
+    serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow",
"");
+    serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
+    serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
+    serializers.put("org.joda.time.Instant", "");
+    serializers.put("org.apache.beam.sdk.values.KV", "");
+    serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
+    serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
+    if (userSerializers != null && !userSerializers.isEmpty()) {
+      serializers.putAll(userSerializers);
+    }
+    return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
+  }
+
+
+  /**
+   * copied from DirectPipelineRunner.
+   * used to replace Window.Bound till window function is added to Gearpump Stream DSL
+   */
+  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+
+    private final Window.Bound<T> wrapped;
+
+    AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
+      this.wrapped = wrapped;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      WindowingStrategy<?, ?> outputStrategy =
+          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
+
+      WindowFn<T, BoundedWindow> windowFn =
+          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+
+      if (!windowFn.isNonMerging()) {
+        throw new UnsupportedOperationException(
+            "merging window is not supported in Gearpump pipeline");
+      }
+
+      // If the Window.Bound transform only changed parts other than the WindowFn, then
+      // we skip AssignWindows even though it should be harmless in a perfect world.
+      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
+      // crash if another GBK is performed without explicitly setting the WindowFn. So we
skip
+      // AssignWindows in this case.
+      if (wrapped.getWindowFn() == null) {
+        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
+            .setWindowingStrategyInternal(outputStrategy);
+      } else {
+        return input
+            .apply("AssignWindows", new AssignWindows<>(windowFn))
+            .setWindowingStrategyInternal(outputStrategy);
+      }
+    }
+  }
+
+  private static class IdentityFn<T> extends OldDoFn<T, T> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
new file mode 100644
index 0000000..b77e1e3
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunnerRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for
the
+ * {@link GearpumpRunner}.
+ *
+ * {@link AutoService} will register Gearpump's implementations of the {@link PipelineRunner}
+ * and {@link PipelineOptions} as available pipeline runner services.
+ */
+public class GearpumpRunnerRegistrar {
+  private GearpumpRunnerRegistrar() { }
+
+  /**
+   * Registers the {@link GearpumpRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners()
{
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(TestGearpumpRunner.class);
+    }
+  }
+
+  /**
+   * Registers the {@link GearpumpPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(GearpumpPipelineOptions.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index cedd31f..89d31a6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -33,14 +33,14 @@ import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
  */
 public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
 
-  private final GearpumpPipelineRunner delegate;
+  private final GearpumpRunner delegate;
   private final EmbeddedCluster cluster;
 
   private TestGearpumpRunner(GearpumpPipelineOptions options) {
     cluster = EmbeddedCluster.apply();
     cluster.start();
     options.setEmbeddedCluster(cluster);
-    delegate = GearpumpPipelineRunner.fromOptions(options);
+    delegate = GearpumpRunner.fromOptions(options);
   }
 
   public static TestGearpumpRunner fromOptions(PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94bd47cd/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
index ba50de7..1d85c25 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
@@ -19,7 +19,7 @@
 package org.apache.beam.runners.gearpump.examples;
 
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.GearpumpPipelineRunner;
+import org.apache.beam.runners.gearpump.GearpumpRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -75,7 +75,7 @@ public class StreamingWordCount {
   public static void main(String[] args) {
     GearpumpPipelineOptions options = PipelineOptionsFactory
             .fromArgs(args).as(GearpumpPipelineOptions.class);
-    options.setRunner(GearpumpPipelineRunner.class);
+    options.setRunner(GearpumpRunner.class);
     options.setApplicationName("StreamingWordCount");
     options.setParallelism(1);
 


Mime
View raw message