beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [33/36] beam git commit: mr-runner: support SourceMetrics, this fixes MetricsTest.testBoundedSourceMetrics().
Date Thu, 07 Sep 2017 18:39:42 GMT
mr-runner: support SourceMetrics, this fixes MetricsTest.testBoundedSourceMetrics().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c62b3ad4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c62b3ad4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c62b3ad4

Branch: refs/heads/mr-runner
Commit: c62b3ad462c2c07ce36cce025dc52204e7eb87d2
Parents: 5248ce4
Author: Pei He <pei@apache.org>
Authored: Fri Sep 1 16:55:19 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Fri Sep 1 17:15:08 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/translation/BeamInputFormat.java  | 39 +++++++++++++++-----
 .../translation/FileReadOperation.java          |  4 ++
 .../translation/FlattenTranslator.java          | 12 +++---
 .../mapreduce/translation/GraphPlanner.java     |  3 +-
 .../mapreduce/translation/JobPrototype.java     |  4 +-
 .../translation/ReadBoundedTranslator.java      |  7 ++--
 .../mapreduce/translation/ReadOperation.java    |  5 ++-
 .../translation/SourceReadOperation.java        |  6 ++-
 8 files changed, 56 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 10d9ada..9dc3396 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -26,12 +26,15 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -87,7 +90,8 @@ public class BeamInputFormat<T> extends InputFormat {
                         .transform(new Function<BoundedSource<?>, ReadOperation.TaggedSource>()
{
                           @Override
                           public ReadOperation.TaggedSource apply(BoundedSource<?>
input) {
-                            return ReadOperation.TaggedSource.of(input, taggedSource.getTag());
+                            return ReadOperation.TaggedSource.of(
+                                taggedSource.getStepName(), input, taggedSource.getTag());
                           }});
                   } catch (Exception e) {
                     Throwables.throwIfUnchecked(e);
@@ -98,7 +102,8 @@ public class BeamInputFormat<T> extends InputFormat {
           .transform(new Function<ReadOperation.TaggedSource, InputSplit>() {
             @Override
             public InputSplit apply(ReadOperation.TaggedSource taggedSource) {
-              return new BeamInputSplit(taggedSource.getSource(), options, taggedSource.getTag());
+              return new BeamInputSplit(taggedSource.getStepName(), taggedSource.getSource(),
+                  options, taggedSource.getTag());
             }})
           .toList();
     } catch (Exception e) {
@@ -113,6 +118,7 @@ public class BeamInputFormat<T> extends InputFormat {
   }
 
   public static class BeamInputSplit<T> extends InputSplit implements Writable {
+    private String stepName;
     private BoundedSource<T> boundedSource;
     private SerializedPipelineOptions options;
     private TupleTag<?> tupleTag;
@@ -121,9 +127,11 @@ public class BeamInputFormat<T> extends InputFormat {
     }
 
     public BeamInputSplit(
+        String stepName,
         BoundedSource<T> boundedSource,
         SerializedPipelineOptions options,
         TupleTag<?> tupleTag) {
+      this.stepName = checkNotNull(stepName, "stepName");
       this.boundedSource = checkNotNull(boundedSource, "boundedSources");
       this.options = checkNotNull(options, "options");
       this.tupleTag = checkNotNull(tupleTag, "tupleTag");
@@ -131,7 +139,7 @@ public class BeamInputFormat<T> extends InputFormat {
 
     public BeamRecordReader<T> createReader() throws IOException {
       return new BeamRecordReader<>(
-          boundedSource.createReader(options.getPipelineOptions()), tupleTag);
+          stepName, boundedSource.createReader(options.getPipelineOptions()), tupleTag);
     }
 
     @Override
@@ -154,6 +162,7 @@ public class BeamInputFormat<T> extends InputFormat {
     @Override
     public void write(DataOutput out) throws IOException {
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
+      StringUtf8Coder.of().encode(stepName, stream);
       SerializableCoder.of(BoundedSource.class).encode(boundedSource, stream);
       SerializableCoder.of(SerializedPipelineOptions.class).encode(options, stream);
       SerializableCoder.of(TupleTag.class).encode(tupleTag, stream);
@@ -170,6 +179,7 @@ public class BeamInputFormat<T> extends InputFormat {
       in.readFully(bytes);
 
       ByteArrayInputStream inStream = new ByteArrayInputStream(bytes);
+      stepName = StringUtf8Coder.of().decode(inStream);
       boundedSource = SerializableCoder.of(BoundedSource.class).decode(inStream);
       options = SerializableCoder.of(SerializedPipelineOptions.class).decode(inStream);
       tupleTag = SerializableCoder.of(TupleTag.class).decode(inStream);
@@ -178,11 +188,15 @@ public class BeamInputFormat<T> extends InputFormat {
 
   private static class BeamRecordReader<T> extends RecordReader {
 
+    private final String stepName;
     private final BoundedSource.BoundedReader<T> reader;
-    private TupleTag<?> tupleTag;
+    private final TupleTag<?> tupleTag;
+    private MetricsReporter metricsReporter;
     private boolean started;
 
-    public BeamRecordReader(BoundedSource.BoundedReader<T> reader, TupleTag<?>
tupleTag) {
+    public BeamRecordReader(
+        String stepName, BoundedSource.BoundedReader<T> reader, TupleTag<?> tupleTag)
{
+      this.stepName = checkNotNull(stepName, "stepName");
       this.reader = checkNotNull(reader, "reader");
       this.tupleTag = checkNotNull(tupleTag, "tupleTag");
       this.started = false;
@@ -191,15 +205,19 @@ public class BeamInputFormat<T> extends InputFormat {
     @Override
     public void initialize(
         InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
{
+      this.metricsReporter = new MetricsReporter(context);
     }
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
-      if (!started) {
-        started = true;
-        return reader.start();
-      } else {
-        return reader.advance();
+      try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+          metricsReporter.getMetricsContainer(stepName))) {
+        if (!started) {
+          started = true;
+          return reader.start();
+        } else {
+          return reader.advance();
+        }
       }
     }
 
@@ -233,6 +251,7 @@ public class BeamInputFormat<T> extends InputFormat {
     @Override
     public void close() throws IOException {
       reader.close();
+      metricsReporter.updateMetrics();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index cbbfbd2..f212252 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -46,15 +46,18 @@ import org.apache.hadoop.io.SequenceFile;
  */
 public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>>
{
 
+  private final String stepName;
   private final String fileName;
   private final Coder<WindowedValue<T>> coder;
   private final TupleTag<?> tupleTag;
 
   public FileReadOperation(
+      String stepName,
       String fileName,
       Coder<WindowedValue<T>> coder,
       TupleTag<?> tupleTag) {
     super();
+    this.stepName = checkNotNull(stepName, "stepName");
     this.fileName = checkNotNull(fileName, "fileName");
     this.coder = checkNotNull(coder, "coder");
     this.tupleTag = checkNotNull(tupleTag, "tupleTag");
@@ -63,6 +66,7 @@ public class FileReadOperation<T> extends ReadOperation<WindowedValue<T>>
{
   @Override
   TaggedSource getTaggedSource(Configuration conf) {
     return TaggedSource.of(
+        stepName,
         new FileBoundedSource<>(fileName, coder, new SerializableConfiguration(conf)),
         tupleTag);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
index 817f2bf..06ad367 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -55,18 +55,20 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC
       }
     }
 
+    String stepName = userGraphContext.getStepName();
     if (inputTagToCount.isEmpty()) {
       // Create a empty source
       Operation<?> operation =
-          new SourceReadOperation(new EmptySource(), userGraphContext.getOnlyOutputTag());
+          new SourceReadOperation(
+              stepName, new EmptySource(), userGraphContext.getOnlyOutputTag());
       context.addInitStep(
-          Graphs.Step.of(userGraphContext.getStepName(), operation),
+          Graphs.Step.of(stepName, operation),
           userGraphContext.getInputTags(),
           userGraphContext.getOutputTags());
     } else if (!containsDuplicates) {
       Operation<?> operation = new FlattenOperation(1);
       context.addInitStep(
-          Graphs.Step.of(userGraphContext.getStepName(), operation),
+          Graphs.Step.of(stepName, operation),
           userGraphContext.getInputTags(),
           userGraphContext.getOutputTags());
     } else {
@@ -79,7 +81,7 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC
         if (dupFactor == 1) {
           intermediateTags.add(inTag);
         } else {
-          String dupStepName = userGraphContext.getStepName() + "/Dup-" + dupFactor;
+          String dupStepName = stepName + "/Dup-" + dupFactor;
           Graphs.Tag outTag = Graphs.Tag.of(
               dupStepName + ".out",
               new TupleTag<T>(),
@@ -93,7 +95,7 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC
         }
       }
       context.addInitStep(
-          Graphs.Step.of(userGraphContext.getStepName(), new FlattenOperation(1)),
+          Graphs.Step.of(stepName, new FlattenOperation(1)),
           intermediateTags,
           userGraphContext.getOutputTags());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 6c79277..09998ea 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -83,7 +83,8 @@ public class GraphPlanner {
           consumer.addStep(
               Graphs.Step.of(
                   readStepName,
-                  new FileReadOperation(filePath, writeValueCoder, tag.getTupleTag())),
+                  new FileReadOperation(
+                      readStepName, filePath, writeValueCoder, tag.getTupleTag())),
               ImmutableList.<Graphs.Tag>of(),
               ImmutableList.of(readOutput));
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 44f279b..e8e6eab 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.counters.Limits;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -77,8 +78,9 @@ public class JobPrototype {
         "io.serializations",
         "org.apache.hadoop.io.serializer.WritableSerialization,"
             + "org.apache.hadoop.io.serializer.JavaSerialization");
+    conf.set("mapreduce.job.counters.group.name.max", "512");
+    Limits.init(conf);
 
-    //TODO: config out dir with PipelineOptions.
     conf.set(
         FileOutputFormat.OUTDIR,
         configUtils.getFileOutputDir(fusedStep.getStageId()));

http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
index 138c00e..5e5c99b 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadBoundedTranslator.java
@@ -27,10 +27,11 @@ class ReadBoundedTranslator<T> extends TransformTranslator.Default<Read.Bounded<
   public void translateNode(Read.Bounded transform, TranslationContext context) {
     TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
 
-    ReadOperation operation =
-        new SourceReadOperation(transform.getSource(), userGraphContext.getOnlyOutputTag());
+    String stepName = userGraphContext.getStepName();
+    ReadOperation operation = new SourceReadOperation(
+        stepName, transform.getSource(), userGraphContext.getOnlyOutputTag());
     context.addInitStep(
-        Graphs.Step.of(userGraphContext.getStepName(), operation),
+        Graphs.Step.of(stepName, operation),
         userGraphContext.getInputTags(),
         userGraphContext.getOutputTags());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
index cb8b00e..a3e1d77 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReadOperation.java
@@ -46,12 +46,13 @@ abstract class ReadOperation<T> extends Operation<T> {
 
   @AutoValue
   abstract static class TaggedSource implements Serializable {
+    abstract String getStepName();
     abstract BoundedSource<?> getSource();
     abstract TupleTag<?> getTag();
 
-    static TaggedSource of(BoundedSource<?> boundedSource, TupleTag<?> tupleTag)
{
+    static TaggedSource of(String stepName, BoundedSource<?> boundedSource, TupleTag<?>
tupleTag) {
       return new org.apache.beam.runners.mapreduce.translation
-          .AutoValue_ReadOperation_TaggedSource(boundedSource, tupleTag);
+          .AutoValue_ReadOperation_TaggedSource(stepName, boundedSource, tupleTag);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
index 19b0320..55b46a4 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/SourceReadOperation.java
@@ -27,12 +27,14 @@ import org.apache.hadoop.conf.Configuration;
  * Operation that reads from {@link BoundedSource}.
  */
 public class SourceReadOperation extends ReadOperation {
+  private final String stepName;
   private final TaggedSource source;
 
-  SourceReadOperation(BoundedSource<?> boundedSource, TupleTag<?> tupleTag) {
+  SourceReadOperation(String stepName, BoundedSource<?> boundedSource, TupleTag<?>
tupleTag) {
+    this.stepName = checkNotNull(stepName, "stepName");
     checkNotNull(boundedSource, "boundedSource");
     checkNotNull(tupleTag, "tupleTag");
-    this.source = TaggedSource.of(boundedSource, tupleTag);
+    this.source = TaggedSource.of(stepName, boundedSource, tupleTag);
   }
 
   @Override


Mime
View raw message