nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taegeo...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-294] Beam-Runner (#163)
Date Mon, 03 Dec 2018 06:07:29 GMT
This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e2a4db  [NEMO-294] Beam-Runner (#163)
6e2a4db is described below

commit 6e2a4db7b49f0789a90d2d4aa9fcc8246d781245
Author: Won Wook SONG <wonook@apache.org>
AuthorDate: Mon Dec 3 15:07:25 2018 +0900

    [NEMO-294] Beam-Runner (#163)
    
    JIRA: [NEMO-294: Support Nemo Runner execution by providing PipelineOptions to the Beam
program](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-294)
    
    **Major changes:**
    - Update launch scripts to launch shaded jars with the current project version, instead
of 0.1-SNAPSHOT
    - Refactor `JobLauncher` so that jobs can be launched from the JobLauncher's main class
as well as from a particular Beam application. This also changes so that the job ID can be
received from the application itself.
    - Use Google's AutoService from Beam to register NemoRunner as one of its runners with
the `NemoRunnerRegistrar`.
    - Rename `NemoPipelineRunner` into `NemoRunner` to follow conventions.
    - Instead of having a default `executor_json` file, put in a default JSON string instead,
so that it doesn't require a JSON file by default.
    - Add a MinimalWordCount examples from the beam homepage (from the quickstart article)
    - Now, we can create a QuickStart with the MinimalWordCount example on the Beam site by
adding a simple profile on the maven `pom.xml` on the MinimalWordCount example.
    
    **Minor changes to note:**
    - Fix minor typos (e.g., confg --> config)
    - Fix minor indentation inconsistencies
    
    **Tests for the changes:**
    - Existing tests confirm that these changes do not break the code
    
    **Other comments:**
    - N/A
---
 bin/run_beam.sh                                    |   4 +-
 bin/run_spark.sh                                   |   4 +-
 .../java/org/apache/nemo/client/JobLauncher.java   | 163 +++++++++++++--------
 compiler/frontend/beam/pom.xml                     |  13 +-
 .../compiler/frontend/beam/NemoPipelineResult.java |   5 +-
 .../{NemoPipelineRunner.java => NemoRunner.java}   |  32 +++-
 .../frontend/beam/NemoRunnerRegistrar.java         |  62 ++++++++
 .../frontend/spark/core/SparkFrontendUtils.java    |   2 +-
 .../compiler/frontend/spark/core/rdd/RDD.scala     |   2 +-
 .../org/apache/nemo/compiler/CompilerTestUtil.java |   3 +-
 .../main/java/org/apache/nemo/conf/JobConf.java    |   2 +-
 .../nemo/examples/beam/AlternatingLeastSquare.java |   4 +-
 .../beam/AlternatingLeastSquareInefficient.java    |   4 +-
 .../org/apache/nemo/examples/beam/Broadcast.java   |   4 +-
 .../nemo/examples/beam/MinimalWordCount.java       |  87 +++++++++++
 .../beam/MultinomialLogisticRegression.java        |   4 +-
 .../nemo/examples/beam/NetworkTraceAnalysis.java   |   4 +-
 .../nemo/examples/beam/PartitionWordsByLength.java |   4 +-
 .../apache/nemo/examples/beam/PerKeyMedian.java    |   4 +-
 .../nemo/examples/beam/PerPercentileAverage.java   |   4 +-
 .../apache/nemo/examples/beam/SimpleSumSQL.java    |   4 +-
 .../nemo/examples/beam/WindowedBroadcast.java      |   4 +-
 .../nemo/examples/beam/WindowedWordCount.java      |   4 +-
 .../org/apache/nemo/examples/beam/WordCount.java   |   4 +-
 pom.xml                                            |   1 +
 25 files changed, 327 insertions(+), 101 deletions(-)

diff --git a/bin/run_beam.sh b/bin/run_beam.sh
index 41c1ef5..cbd082c 100755
--- a/bin/run_beam.sh
+++ b/bin/run_beam.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-0.1-SNAPSHOT-shaded.jar:`yarn
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-$(mvn
-q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher
"$@"
diff --git a/bin/run_spark.sh b/bin/run_spark.sh
index 057b017..314fd0d 100755
--- a/bin/run_spark.sh
+++ b/bin/run_spark.sh
@@ -17,4 +17,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-0.1-SNAPSHOT-shaded.jar:`yarn
classpath` org.apache.nemo.client.JobLauncher "$@"
+java -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/spark/target/nemo-examples-spark-$(mvn
-q \
+  -Dexec.executable=echo -Dexec.args='${project.version}' \
+  --non-recursive exec:exec)-shaded.jar:`yarn classpath` org.apache.nemo.client.JobLauncher
"$@"
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 035d719..f95bc73 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -101,6 +101,23 @@ public final class JobLauncher {
    * @throws Exception exception on the way.
    */
   public static void main(final String[] args) throws Exception {
+    try {
+      setup(args);
+      // Launch client main. The shutdown() method is called inside the launchDAG() method.
+      runUserProgramMain(builtJobConf);
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Set up the driver, etc. before the actual execution.
+   * @param args arguments.
+   * @throws InjectionException injection exception from REEF.
+   * @throws ClassNotFoundException class not found exception.
+   * @throws IOException IO exception.
+   */
+  public static void setup(final String[] args) throws InjectionException, ClassNotFoundException,
IOException {
     // Get Job and Driver Confs
     builtJobConf = getJobConf(args);
 
@@ -108,77 +125,76 @@ public final class JobLauncher {
     LOG.info("Launching RPC Server");
     driverRPCServer = new DriverRPCServer();
     driverRPCServer
-        .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event ->
{
-        })
-        .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event ->
driverReadyLatch.countDown())
-        .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event ->
jobDoneLatch.countDown())
-        .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message
-> COLLECTED_DATA.addAll(
-            SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
-        .run();
+      .registerHandler(ControlMessage.DriverToClientMessageType.DriverStarted, event ->
{
+      })
+      .registerHandler(ControlMessage.DriverToClientMessageType.DriverReady, event ->
driverReadyLatch.countDown())
+      .registerHandler(ControlMessage.DriverToClientMessageType.ExecutionDone, event ->
jobDoneLatch.countDown())
+      .registerHandler(ControlMessage.DriverToClientMessageType.DataCollected, message ->
COLLECTED_DATA.addAll(
+        SerializationUtils.deserialize(Base64.getDecoder().decode(message.getDataCollected().getData()))))
+      .run();
 
     final Configuration driverConf = getDriverConf(builtJobConf);
     final Configuration driverNcsConf = getDriverNcsConf();
-    final Configuration driverMessageConfg = getDriverMessageConf();
+    final Configuration driverMessageConfig = getDriverMessageConf();
+    final String defaultExecutorResourceConfig = "[{\"type\":\"Transient\",\"memory_mb\":512,\"capacity\":5},"
+      + "{\"type\":\"Reserved\",\"memory_mb\":512,\"capacity\":5}]";
     final Configuration executorResourceConfig = getJSONConf(builtJobConf, JobConf.ExecutorJSONPath.class,
-        JobConf.ExecutorJSONContents.class);
+      JobConf.ExecutorJSONContents.class, defaultExecutorResourceConfig);
     final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
-        JobConf.BandwidthJSONContents.class);
+      JobConf.BandwidthJSONContents.class, "");
     final Configuration clientConf = getClientConf();
     final Configuration schedulerConf = getSchedulerConf(builtJobConf);
 
     // Merge Job and Driver Confs
-    jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfg,
-        executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(),
schedulerConf);
+    jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfig,
+      executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(),
schedulerConf);
 
     // Get DeployMode Conf
     deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
 
     // Start Driver and launch user program.
-    try {
-      if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
-        throw new RuntimeException("Configuration for launching driver is not ready");
-      }
-
-
-      // Launch driver
-      LOG.info("Launching driver");
-      driverReadyLatch = new CountDownLatch(1);
-      driverLauncher = DriverLauncher.getLauncher(deployModeConf);
-      driverLauncher.submit(jobAndDriverConf, 500);
-      // When the driver is up and the resource is ready, the DriverReady message is delivered.
+    if (jobAndDriverConf == null || deployModeConf == null || builtJobConf == null) {
+      throw new RuntimeException("Configuration for launching driver is not ready");
+    }
 
-      // Launch client main
-      runUserProgramMain(builtJobConf);
+    // Launch driver
+    LOG.info("Launching driver");
+    driverReadyLatch = new CountDownLatch(1);
+    driverLauncher = DriverLauncher.getLauncher(deployModeConf);
+    driverLauncher.submit(jobAndDriverConf, 500);
+    // When the driver is up and the resource is ready, the DriverReady message is delivered.
+  }
 
-      // Trigger driver shutdown afterwards
-      driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
-          .setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build());
-      // Wait for driver to naturally finish
-      synchronized (driverLauncher) {
-        while (!driverLauncher.getStatus().isDone()) {
-          try {
-            LOG.info("Wait for the driver to finish");
-            driverLauncher.wait();
-          } catch (final InterruptedException e) {
-            LOG.warn("Interrupted: " + e);
-            // clean up state...
-            Thread.currentThread().interrupt();
-          }
+  /**
+   * Clean up everything.
+   */
+  public static void shutdown() {
+    // Trigger driver shutdown afterwards
+    driverRPCServer.send(ControlMessage.ClientToDriverMessage.newBuilder()
+      .setType(ControlMessage.ClientToDriverMessageType.DriverShutdown).build());
+    // Wait for driver to naturally finish
+    synchronized (driverLauncher) {
+      while (!driverLauncher.getStatus().isDone()) {
+        try {
+          LOG.info("Wait for the driver to finish");
+          driverLauncher.wait();
+        } catch (final InterruptedException e) {
+          LOG.warn("Interrupted: " + e);
+          // clean up state...
+          Thread.currentThread().interrupt();
         }
-        LOG.info("Driver terminated");
-      }
-    } catch (final InjectionException e) {
-      throw new RuntimeException(e);
-    } finally {
-      // Close everything that's left
-      driverRPCServer.shutdown();
-      driverLauncher.close();
-      final Optional<Throwable> possibleError = driverLauncher.getStatus().getError();
-      if (possibleError.isPresent()) {
-        throw new RuntimeException(possibleError.get());
-      } else {
-        LOG.info("Job successfully completed");
       }
+      LOG.info("Driver terminated");
+    }
+
+    // Close everything that's left
+    driverRPCServer.shutdown();
+    driverLauncher.close();
+    final Optional<Throwable> possibleError = driverLauncher.getStatus().getError();
+    if (possibleError.isPresent()) {
+      throw new RuntimeException(possibleError.get());
+    } else {
+      LOG.info("Job successfully completed");
     }
   }
 
@@ -191,14 +207,32 @@ public final class JobLauncher {
    */
   // When modifying the signature of this method, see CompilerTestUtil#compileDAG and make
corresponding changes
   public static void launchDAG(final DAG dag) {
-    launchDAG(dag, Collections.emptyMap());
+    launchDAG(dag, Collections.emptyMap(), "");
+  }
+
+  /**
+   * @param dag the application DAG.
+   * @param jobId job ID.
+   */
+  public static void launchDAG(final DAG dag, final String jobId) {
+    launchDAG(dag, Collections.emptyMap(), jobId);
   }
 
   /**
    * @param dag the application DAG.
    * @param broadcastVariables broadcast variables (can be empty).
+   * @param jobId job ID.
    */
-  public static void launchDAG(final DAG dag, final Map<Serializable, Object> broadcastVariables)
{
+  public static void launchDAG(final DAG dag, final Map<Serializable, Object> broadcastVariables,
final String jobId) {
+    // launch driver if it hasn't been already
+    if (driverReadyLatch == null) {
+      try {
+        setup(new String[]{"-job_id", jobId});
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     // Wait until the driver is ready.
     try {
       LOG.info("Waiting for the driver to be ready");
@@ -229,8 +263,11 @@ public final class JobLauncher {
       // clean up state...
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
+    } finally {
+      LOG.info("DAG execution done");
+      // trigger shutdown.
+      shutdown();
     }
-    LOG.info("DAG execution done");
   }
 
   /**
@@ -267,6 +304,13 @@ public final class JobLauncher {
     return jcb.build();
   }
 
+  /**
+   * Fetch scheduler configuration.
+   * @param jobConf job configuration.
+   * @return the scheduler configuration.
+   * @throws ClassNotFoundException exception while finding the class.
+   * @throws InjectionException exception while injection (REEF Tang).
+   */
   private static Configuration getSchedulerConf(final Configuration jobConf)
     throws ClassNotFoundException, InjectionException {
     final Injector injector = TANG.newInjector(jobConf);
@@ -399,13 +443,14 @@ public final class JobLauncher {
    */
   private static Configuration getJSONConf(final Configuration jobConf,
                                            final Class<? extends Name<String>>
pathParameter,
-                                           final Class<? extends Name<String>>
contentsParameter)
+                                           final Class<? extends Name<String>>
contentsParameter,
+                                           final String defaultContent)
       throws InjectionException {
     final Injector injector = TANG.newInjector(jobConf);
     try {
       final String path = injector.getNamedInstance(pathParameter);
-      final String contents = path.isEmpty() ? ""
-          : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
+      final String contents = path.isEmpty() ? defaultContent
+        : new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
       return TANG.newConfigurationBuilder()
           .bindNamedParameter(contentsParameter, contents)
           .build();
diff --git a/compiler/frontend/beam/pom.xml b/compiler/frontend/beam/pom.xml
index 01000f7..02fab3a 100644
--- a/compiler/frontend/beam/pom.xml
+++ b/compiler/frontend/beam/pom.xml
@@ -29,9 +29,10 @@ under the License.
 
     <artifactId>nemo-compiler-frontend-beam</artifactId>
     <name>Nemo Compiler Frontend: Beam</name>
+    <packaging>jar</packaging>
 
     <dependencies>
-	    <dependency>
+	      <dependency>
             <groupId>org.apache.nemo</groupId>
             <artifactId>nemo-common</artifactId>
             <version>${project.version}</version>
@@ -46,11 +47,11 @@ under the License.
             <artifactId>beam-sdks-java-core</artifactId>
             <version>${beam.version}</version>
         </dependency>
-      <dependency>
+        <dependency>
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-runners-core-java</artifactId>
             <version>${beam.version}</version>
-      </dependency>
+        </dependency>
         <dependency>
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
@@ -62,5 +63,11 @@ under the License.
             <version>${hadoop.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <version>${auto-service.version}</version>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
 </project>
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
index 57f1634..1cfbf7a 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineResult.java
@@ -57,10 +57,7 @@ public final class NemoPipelineResult extends ClientEndpoint implements
Pipeline
 
   @Override
   public State waitUntilFinish() {
-    throw new UnsupportedOperationException();
-    // TODO #208: NemoPipelineResult#waitUntilFinish hangs
-    // Previous code that hangs the job:
-    // return (State) super.waitUntilJobFinish();
+    return waitUntilFinish(Duration.ZERO);
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
similarity index 67%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
rename to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
index d011d11..9128213 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam;
 
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.nemo.client.JobLauncher;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
@@ -27,25 +28,46 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
 /**
  * Runner class for BEAM programs.
  */
-public final class NemoPipelineRunner extends PipelineRunner<NemoPipelineResult> {
+public final class NemoRunner extends PipelineRunner<NemoPipelineResult> {
   private final NemoPipelineOptions nemoPipelineOptions;
 
   /**
    * BEAM Pipeline Runner.
    * @param nemoPipelineOptions PipelineOptions.
    */
-  private NemoPipelineRunner(final NemoPipelineOptions nemoPipelineOptions) {
+  private NemoRunner(final NemoPipelineOptions nemoPipelineOptions) {
     this.nemoPipelineOptions = nemoPipelineOptions;
   }
 
   /**
+   * Creates and returns a new NemoRunner with default options.
+   *
+   * @return A pipeline runner with default options.
+   */
+  public static NemoRunner create() {
+    NemoPipelineOptions options = PipelineOptionsFactory.as(NemoPipelineOptions.class);
+    options.setRunner(NemoRunner.class);
+    return new NemoRunner(options);
+  }
+
+  /**
+   * Creates and returns a new NemoRunner with specified options.
+   *
+   * @param options The NemoPipelineOptions to use when executing the job.
+   * @return A pipeline runner that will execute with specified options.
+   */
+  public static NemoRunner create(final NemoPipelineOptions options) {
+    return new NemoRunner(options);
+  }
+
+  /**
    * Static initializer for creating PipelineRunner with the given options.
    * @param options given PipelineOptions.
    * @return The created PipelineRunner.
    */
-  public static PipelineRunner<NemoPipelineResult> fromOptions(final PipelineOptions
options) {
+  public static NemoRunner fromOptions(final PipelineOptions options) {
     final NemoPipelineOptions nemoOptions = PipelineOptionsValidator.validate(NemoPipelineOptions.class,
options);
-    return new NemoPipelineRunner(nemoOptions);
+    return new NemoRunner(nemoOptions);
   }
 
   /**
@@ -57,7 +79,7 @@ public final class NemoPipelineRunner extends PipelineRunner<NemoPipelineResult>
     final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions);
     pipeline.traverseTopologically(pipelineVisitor);
     final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
-    JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline());
+    JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName());
     return nemoPipelineResult;
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
new file mode 100644
index 0000000..aa05519
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoRunnerRegistrar.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for
the {@link NemoRunner}.
+ *
+ * {@link AutoService} will register Nemo's implementations of the {@link PipelineRunner}
and {@link PipelineOptions}
+ * as available pipeline runner services.
+ */
+public final class NemoRunnerRegistrar {
+  /**
+   * Private constructor.
+   */
+  private NemoRunnerRegistrar() {
+  }
+
+  /**
+   * Registers the {@link NemoRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static final class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners()
{
+      return ImmutableList.of(NemoRunner.class);
+    }
+  }
+
+  /**
+   * Registers the {@link NemoPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static final class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.of(NemoPipelineOptions.class);
+    }
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index 3cb7555..0a5c0e5 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -105,7 +105,7 @@ public final class SparkFrontendUtils {
     builder.connectVertices(newEdge);
 
     // launch DAG
-    JobLauncher.launchDAG(builder.build(), SparkBroadcastVariables.getAll());
+    JobLauncher.launchDAG(builder.build(), SparkBroadcastVariables.getAll(), "");
 
     return (List<T>) JobLauncher.getCollectedData();
   }
diff --git a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 3cf516a..234e867 100644
--- a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -230,7 +230,7 @@ final class RDD[T: ClassTag] protected[rdd] (
     newEdge.setProperty(keyExtractorProperty)
 
     builder.connectVertices(newEdge)
-    JobLauncher.launchDAG(builder.build, SparkBroadcastVariables.getAll)
+    JobLauncher.launchDAG(builder.build, SparkBroadcastVariables.getAll, "")
   }
 
   /////////////// CACHING ///////////////
diff --git a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
index 2a4d359..d948138 100644
--- a/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
+++ b/compiler/test/src/main/java/org/apache/nemo/compiler/CompilerTestUtil.java
@@ -86,8 +86,9 @@ public final class CompilerTestUtil {
     final Method userMainMethod = userMainClass.getMethod("main", String[].class);
 
     final ArgumentCaptor<DAG> captor = ArgumentCaptor.forClass(DAG.class);
+    final ArgumentCaptor<String> stringArg = ArgumentCaptor.forClass(String.class);
     PowerMockito.mockStatic(JobLauncher.class);
-    PowerMockito.doNothing().when(JobLauncher.class, "launchDAG", captor.capture());
+    PowerMockito.doNothing().when(JobLauncher.class, "launchDAG", captor.capture(), stringArg.capture());
     userMainMethod.invoke(null, (Object) userMainMethodArgs);
     return captor.getValue();
   }
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 7de0e18..1bdb7c9 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -165,7 +165,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
    * Path to the JSON file that specifies resource layout.
    */
   @NamedParameter(doc = "Path to the JSON file that specifies resources for executors", short_name
= "executor_json",
-      default_value = "examples/resources/test_executor_resources.json")
+      default_value = "")
   public final class ExecutorJSONPath implements Name<String> {
   }
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 76bdc03..f3ce915 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -20,7 +20,7 @@ package org.apache.nemo.examples.beam;
 
 import com.github.fommil.netlib.BLAS;
 import com.github.fommil.netlib.LAPACK;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
@@ -371,7 +371,7 @@ public final class AlternatingLeastSquare {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("ALS");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index ab1760f..5af8dd1 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderProviders;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -116,7 +116,7 @@ public final class AlternatingLeastSquareInefficient {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("ALS");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
index da609e4..5ded58b 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/Broadcast.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.examples.beam;
 
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -49,7 +49,7 @@ public final class Broadcast {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
 
     final Pipeline p = Pipeline.create(options);
     final PCollection<String> elemCollection = GenericSourceSink.read(p, inputFilePath);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
new file mode 100644
index 0000000..5549d10
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MinimalWordCount.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Filter;
+import org.apache.beam.sdk.transforms.FlatMapElements;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
+
+import java.util.Arrays;
+/**
+ * MinimalWordCount program from BEAM.
+ */
+public final class MinimalWordCount {
+  /**
+   * Private Constructor.
+   */
+  private MinimalWordCount() {
+  }
+  /**
+   * Main function for the MinimalWordCount Beam program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String inputFilePath = args[0];
+    final String outputFilePath = args[1];
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoRunner.class);
+    options.setJobName("MinimalWordCount");
+    // Create the Pipeline object with the options we defined above
+    final Pipeline p = Pipeline.create(options);
+    // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read
a set
+    // of input text files. TextIO.Read returns a PCollection where each element is one line
from
+    // the input text (a set of Shakespeare's texts).
+    // This example reads a public data set consisting of the complete works of Shakespeare.
+    p.apply(TextIO.read().from(inputFilePath))
+      // Concept #2: Apply a FlatMapElements transform the PCollection of text lines.
+      // This transform splits the lines in PCollection<String>, where each element
is an
+      // individual word in Shakespeare's collected texts.
+      .apply(
+        FlatMapElements.into(TypeDescriptors.strings())
+          .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
+      // We use a Filter transform to avoid empty word
+      .apply(Filter.by((String word) -> !word.isEmpty()))
+      // Concept #3: Apply the Count transform to our PCollection of individual words. The
Count
+      // transform returns a new PCollection of key/value pairs, where each key represents
a
+      // unique word in the text. The associated value is the occurrence count for that word.
+      .apply(Count.perElement())
+      // Apply a MapElements transform that formats our PCollection of word counts into a
+      // printable string, suitable for writing to an output file.
+      .apply(
+        MapElements.into(TypeDescriptors.strings())
+          .via(
+            (KV<String, Long> wordCount) ->
+              wordCount.getKey() + ": " + wordCount.getValue()))
+      // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
+      // TextIO.Write writes the contents of a PCollection (in this case, our PCollection
of
+      // formatted strings) to a series of text files.
+      //
+      // By default, it will write to a set of files with names like wordcounts-00001-of-00005
+      .apply(TextIO.write().to(outputFilePath));
+    p.run();
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 921b862..2b37eba 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.nemo.common.Pair;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -420,7 +420,7 @@ public final class MultinomialLogisticRegression {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create();
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("MLR");
     options.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
index e0567c6..a9bbc43 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/NetworkTraceAnalysis.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -59,7 +59,7 @@ public final class NetworkTraceAnalysis {
     final String input1FilePath = args[1];
     final String outputFilePath = args[2];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("NetworkTraceAnalysis");
 
     // Given "4 0.0 192.168.3.1 -> 192.168.0.2 Len=29", this finds "192.168.3.1", "192.168.0.2"
and "29"
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
index b446858..816be62 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PartitionWordsByLength.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -47,7 +47,7 @@ public final class PartitionWordsByLength {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("PartitionWordsByLength");
 
     // {} here is required for preserving type information.
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
index ba2a94e..6255a93 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerKeyMedian.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -53,7 +53,7 @@ public final class PerKeyMedian {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("PerKeyMedian");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
index 39b941a..486442b 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/PerPercentileAverage.java
@@ -20,7 +20,7 @@ package org.apache.nemo.examples.beam;
 
 import com.google.common.collect.Lists;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -52,7 +52,7 @@ public final class PerPercentileAverage {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("PerPercentileAverage");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
index b476460..6fa8f5e 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/SimpleSumSQL.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.*;
 import org.apache.beam.sdk.values.*;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -50,7 +50,7 @@ public final class SimpleSumSQL {
     final String outputFilePath = args[0];
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("SimpleSumSQL");
     final Pipeline p = Pipeline.create(options);
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
index 30ee405..5bff5d8 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -62,7 +62,7 @@ public final class WindowedBroadcast {
       .every(Duration.standardSeconds(1)));
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("WindowedBroadcast");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 0f13dc4..a814165 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 
@@ -106,7 +106,7 @@ public final class WindowedWordCount {
     }
 
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("WindowedWordCount");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
index 9af9d7c..ba3cb80 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WordCount.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.apache.nemo.compiler.frontend.beam.NemoRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -45,7 +45,7 @@ public final class WordCount {
     final String inputFilePath = args[0];
     final String outputFilePath = args[1];
     final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
-    options.setRunner(NemoPipelineRunner.class);
+    options.setRunner(NemoRunner.class);
     options.setJobName("WordCount");
 
     final Pipeline p = Pipeline.create(options);
diff --git a/pom.xml b/pom.xml
index 7ed51fe..132c5b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@ under the License.
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <auto-service.version>1.0-rc2</auto-service.version>
         <beam.version>2.6.0</beam.version>
         <spark.version>2.2.0</spark.version>
         <scala.version>2.11.8</scala.version>


Mime
View raw message