Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 72E3B200CF0 for ; Thu, 7 Sep 2017 20:39:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 71D391616B3; Thu, 7 Sep 2017 18:39:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 931F61610C8 for ; Thu, 7 Sep 2017 20:39:16 +0200 (CEST) Received: (qmail 51079 invoked by uid 500); 7 Sep 2017 18:39:15 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 48390 invoked by uid 99); 7 Sep 2017 18:39:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Sep 2017 18:39:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 751B6F56F2; Thu, 7 Sep 2017 18:39:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Thu, 07 Sep 2017 18:39:30 -0000 Message-Id: <89c708059cd94ccba05d8f3f52a2713f@git.apache.org> In-Reply-To: <8519f8d6f8f5492d9530d6ca81ad7f65@git.apache.org> References: <8519f8d6f8f5492d9530d6ca81ad7f65@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/36] beam git commit: mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline. archived-at: Thu, 07 Sep 2017 18:39:17 -0000 mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d3386d4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d3386d4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d3386d4 Branch: refs/heads/mr-runner Commit: 8d3386d479b5704fa9448c7a9b1eab9c66e75549 Parents: 4e7062c Author: Pei He Authored: Wed Aug 30 19:40:24 2017 +0800 Committer: Pei He Committed: Fri Sep 1 17:13:39 2017 +0800 ---------------------------------------------------------------------- runners/map-reduce/pom.xml | 14 ++++++++++++ .../beam/runners/mapreduce/MapReduceRunner.java | 5 +++++ .../translation/ConfigurationUtils.java | 23 +++++++++++++++----- .../mapreduce/translation/GraphPlanner.java | 11 +++++----- .../mapreduce/translation/JobPrototype.java | 4 +++- 5 files changed, 44 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/pom.xml ---------------------------------------------------------------------- diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml index e858031..d65bb34 100644 --- a/runners/map-reduce/pom.xml +++ b/runners/map-reduce/pom.xml @@ -56,6 +56,20 @@ org.apache.beam.sdk.testing.ValidatesRunner + + org.apache.beam.sdk.testing.PAssertTest.java + + + org.apache.beam.sdk.testing.UsesSetState, + org.apache.beam.sdk.testing.UsesSplittableParDo, + org.apache.beam.sdk.testing.UsesDistributionMetrics, + org.apache.beam.sdk.testing.UsesGaugeMetrics, + org.apache.beam.sdk.testing.UsesCommittedMetrics, + org.apache.beam.sdk.testing.LargeKeys$Above10MB, + org.apache.beam.sdk.testing.UsesTimersInParDo, + org.apache.beam.sdk.testing.UsesStatefulParDo, + org.apache.beam.sdk.testing.UsesTestStream + none true http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java index 88ed01e..71edf1a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java @@ -31,9 +31,11 @@ import org.apache.beam.runners.mapreduce.translation.TranslationContext; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; +import org.apache.log4j.BasicConfigurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +64,9 @@ public class MapReduceRunner extends PipelineRunner { @Override public PipelineResult run(Pipeline pipeline) { + BasicConfigurator.configure(); + MetricsEnvironment.setMetricsSupported(true); + TranslationContext context = new TranslationContext(options); GraphConverter graphConverter = new GraphConverter(context); pipeline.traverseTopologically(graphConverter); http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java index 6d7a81a..4ec50bd 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java @@ -17,9 +17,13 @@ */ package org.apache.beam.runners.mapreduce.translation; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -28,21 +32,28 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; */ public class ConfigurationUtils { + private final MapReducePipelineOptions options; + + public ConfigurationUtils(MapReducePipelineOptions options) { + this.options = checkNotNull(options, "options"); + } + public static ResourceId getResourceIdForOutput(String fileName, Configuration conf) { ResourceId outDir = FileSystems.matchNewResource(conf.get(FileOutputFormat.OUTDIR), true); return outDir.resolve(fileName, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); } - public static String getFileOutputDir(String baseFileOutputDir, int stageId) { - if (baseFileOutputDir.endsWith("/")) { - return String.format("%sstage-%d", baseFileOutputDir, stageId); + public String getFileOutputDir(int stageId) { + String fileOutputDir = options.getFileOutputDir(); + if (fileOutputDir.endsWith("/")) { + return String.format("%s%s/stage-%d", fileOutputDir, options.getJobName(), stageId); } else { - return String.format("%s/stage-%d", baseFileOutputDir, stageId); + return String.format("%s/%s/stage-%d", fileOutputDir, options.getJobName(), stageId); } } - public static String getFileOutputPath(String baseFileOutputDir, int stageId, String fileName) { - return String.format("%s/%s", getFileOutputDir(baseFileOutputDir, stageId), fileName); + public String getFileOutputPath(int stageId, String fileName) { + return String.format("%s/%s", getFileOutputDir(stageId), fileName); } public static String toFileName(String tagName) { http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java index b6e134e..608b304 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java @@ -36,10 +36,11 @@ import org.apache.beam.sdk.values.WindowingStrategy; */ public class GraphPlanner { - private final MapReducePipelineOptions options; + private final ConfigurationUtils configUtils; public GraphPlanner(MapReducePipelineOptions options) { - this.options = checkNotNull(options, "options"); + checkNotNull(options, "options"); + this.configUtils = new ConfigurationUtils(options); } public Graphs.FusedGraph plan(Graphs.FusedGraph fusedGraph) { @@ -79,8 +80,7 @@ public class GraphPlanner { } consumer.removeTag(tag); - String filePath = ConfigurationUtils.getFileOutputPath( - options.getFileOutputDir(), fusedStep.getStageId(), fileName); + String filePath = configUtils.getFileOutputPath(fusedStep.getStageId(), fileName); consumer.addStep( Graphs.Step.of( readStepName, @@ -133,8 +133,7 @@ public class GraphPlanner { for (Graphs.Tag sideInTag : sideInputTags) { tupleTagToFilePath.put( sideInTag.getTupleTag(), - ConfigurationUtils.getFileOutputPath( - options.getFileOutputDir(), + configUtils.getFileOutputPath( fusedGraph.getProducer(sideInTag).getStageId(), ConfigurationUtils.toFileName(sideInTag.getName()))); } http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index a0c6626..93ae33a 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -60,11 +60,13 @@ public class JobPrototype { private final int stageId; private final Graphs.FusedStep fusedStep; private final MapReducePipelineOptions options; + private final ConfigurationUtils configUtils; private JobPrototype(int stageId, Graphs.FusedStep fusedStep, MapReducePipelineOptions options) { this.stageId = stageId; this.fusedStep = checkNotNull(fusedStep, "fusedStep"); this.options = checkNotNull(options, "options"); + this.configUtils = new ConfigurationUtils(options); } public Job build(Class jarClass, Configuration initConf) throws IOException { @@ -79,7 +81,7 @@ public class JobPrototype { //TODO: config out dir with PipelineOptions. conf.set( FileOutputFormat.OUTDIR, - ConfigurationUtils.getFileOutputDir(options.getFileOutputDir(), fusedStep.getStageId())); + configUtils.getFileOutputDir(fusedStep.getStageId())); // Setup BoundedSources in BeamInputFormat. Graphs.Step startStep = Iterables.getOnlyElement(fusedStep.getStartSteps());