beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [21/36] beam git commit: mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.
Date Thu, 07 Sep 2017 18:39:30 GMT
mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d3386d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d3386d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d3386d4

Branch: refs/heads/mr-runner
Commit: 8d3386d479b5704fa9448c7a9b1eab9c66e75549
Parents: 4e7062c
Author: Pei He <pei@apache.org>
Authored: Wed Aug 30 19:40:24 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Fri Sep 1 17:13:39 2017 +0800

----------------------------------------------------------------------
 runners/map-reduce/pom.xml                      | 14 ++++++++++++
 .../beam/runners/mapreduce/MapReduceRunner.java |  5 +++++
 .../translation/ConfigurationUtils.java         | 23 +++++++++++++++-----
 .../mapreduce/translation/GraphPlanner.java     | 11 +++++-----
 .../mapreduce/translation/JobPrototype.java     |  4 +++-
 5 files changed, 44 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/pom.xml
----------------------------------------------------------------------
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index e858031..d65bb34 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -56,6 +56,20 @@
                   <groups>
                     org.apache.beam.sdk.testing.ValidatesRunner
                   </groups>
+                  <excludes>
+                    <exclude>org.apache.beam.sdk.testing.PAssertTest.java</exclude>
+                  </excludes>
+                  <excludedGroups>
+                    org.apache.beam.sdk.testing.UsesSetState,
+                    org.apache.beam.sdk.testing.UsesSplittableParDo,
+                    org.apache.beam.sdk.testing.UsesDistributionMetrics,
+                    org.apache.beam.sdk.testing.UsesGaugeMetrics,
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                    org.apache.beam.sdk.testing.LargeKeys$Above10MB,
+                    org.apache.beam.sdk.testing.UsesTimersInParDo,
+                    org.apache.beam.sdk.testing.UsesStatefulParDo,
+                    org.apache.beam.sdk.testing.UsesTestStream
+                  </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
                   <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 88ed01e..71edf1a 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -31,9 +31,11 @@ import org.apache.beam.runners.mapreduce.translation.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.BasicConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +64,9 @@ public class MapReduceRunner extends PipelineRunner<PipelineResult>
{
 
   @Override
   public PipelineResult run(Pipeline pipeline) {
+    BasicConfigurator.configure();
+    MetricsEnvironment.setMetricsSupported(true);
+
     TranslationContext context = new TranslationContext(options);
     GraphConverter graphConverter = new GraphConverter(context);
     pipeline.traverseTopologically(graphConverter);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
index 6d7a81a..4ec50bd 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
@@ -17,9 +17,13 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
@@ -28,21 +32,28 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  */
 public class ConfigurationUtils {
 
+  private final MapReducePipelineOptions options;
+
+  public ConfigurationUtils(MapReducePipelineOptions options) {
+    this.options = checkNotNull(options, "options");
+  }
+
   public static ResourceId getResourceIdForOutput(String fileName, Configuration conf) {
     ResourceId outDir = FileSystems.matchNewResource(conf.get(FileOutputFormat.OUTDIR), true);
     return outDir.resolve(fileName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
   }
 
-  public static String getFileOutputDir(String baseFileOutputDir, int stageId) {
-    if (baseFileOutputDir.endsWith("/")) {
-      return String.format("%sstage-%d", baseFileOutputDir, stageId);
+  public String getFileOutputDir(int stageId) {
+    String fileOutputDir = options.getFileOutputDir();
+    if (fileOutputDir.endsWith("/")) {
+      return String.format("%s%s/stage-%d", fileOutputDir, options.getJobName(), stageId);
     } else {
-      return String.format("%s/stage-%d", baseFileOutputDir, stageId);
+      return String.format("%s/%s/stage-%d", fileOutputDir, options.getJobName(), stageId);
     }
   }
 
-  public static String getFileOutputPath(String baseFileOutputDir, int stageId, String fileName)
{
-    return String.format("%s/%s", getFileOutputDir(baseFileOutputDir, stageId), fileName);
+  public String getFileOutputPath(int stageId, String fileName) {
+    return String.format("%s/%s", getFileOutputDir(stageId), fileName);
   }
 
   public static String toFileName(String tagName) {

http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index b6e134e..608b304 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -36,10 +36,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
  */
 public class GraphPlanner {
 
-  private final MapReducePipelineOptions options;
+  private final ConfigurationUtils configUtils;
 
   public GraphPlanner(MapReducePipelineOptions options) {
-    this.options = checkNotNull(options, "options");
+    checkNotNull(options, "options");
+    this.configUtils = new ConfigurationUtils(options);
   }
 
   public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) {
@@ -79,8 +80,7 @@ public class GraphPlanner {
           }
           consumer.removeTag(tag);
 
-          String filePath = ConfigurationUtils.getFileOutputPath(
-              options.getFileOutputDir(), fusedStep.getStageId(), fileName);
+          String filePath = configUtils.getFileOutputPath(fusedStep.getStageId(), fileName);
           consumer.addStep(
               Graphs.Step.of(
                   readStepName,
@@ -133,8 +133,7 @@ public class GraphPlanner {
         for (Graphs.Tag sideInTag : sideInputTags) {
           tupleTagToFilePath.put(
               sideInTag.getTupleTag(),
-              ConfigurationUtils.getFileOutputPath(
-                  options.getFileOutputDir(),
+              configUtils.getFileOutputPath(
                   fusedGraph.getProducer(sideInTag).getStageId(),
                   ConfigurationUtils.toFileName(sideInTag.getName())));
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index a0c6626..93ae33a 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -60,11 +60,13 @@ public class JobPrototype {
   private final int stageId;
   private final Graphs.FusedStep fusedStep;
   private final MapReducePipelineOptions options;
+  private final ConfigurationUtils configUtils;
 
   private JobPrototype(int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions
options) {
     this.stageId = stageId;
     this.fusedStep = checkNotNull(fusedStep, "fusedStep");
     this.options = checkNotNull(options, "options");
+    this.configUtils = new ConfigurationUtils(options);
   }
 
   public Job build(Class<?> jarClass, Configuration initConf) throws IOException {
@@ -79,7 +81,7 @@ public class JobPrototype {
     //TODO: config out dir with PipelineOptions.
     conf.set(
         FileOutputFormat.OUTDIR,
-        ConfigurationUtils.getFileOutputDir(options.getFileOutputDir(), fusedStep.getStageId()));
+        configUtils.getFileOutputDir(fusedStep.getStageId()));
 
     // Setup BoundedSources in BeamInputFormat.
     Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps());


Mime
View raw message