beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [16/36] beam git commit: mr-runner: support side inputs by reading in all views contents.
Date Thu, 07 Sep 2017 18:39:25 GMT
mr-runner: support side inputs by reading in all views contents.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ebd14c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ebd14c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ebd14c4

Branch: refs/heads/mr-runner
Commit: 0ebd14c446421bdb29a95ae231975875b4532031
Parents: e562a44
Author: Pei He <pei@apache.org>
Authored: Tue Aug 8 17:38:58 2017 +0800
Committer: Pei He <pei@apache.org>
Committed: Thu Aug 31 14:13:49 2017 +0800

----------------------------------------------------------------------
 .../translation/FileReadOperation.java          |   2 +-
 .../translation/FileSideInputReader.java        | 128 +++++++++++++++++++
 .../runners/mapreduce/translation/Graphs.java   |   3 +-
 .../GroupAlsoByWindowsParDoOperation.java       |   3 +-
 .../translation/NormalParDoOperation.java       |   3 +-
 .../mapreduce/translation/ParDoOperation.java   |   6 +-
 .../mapreduce/translation/ParDoTranslator.java  |   1 +
 .../ReifyTimestampAndWindowsParDoOperation.java |   4 +-
 .../translation/TranslationContext.java         |  25 ++++
 9 files changed, 168 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index 6bd893a..70263c3 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -95,7 +95,7 @@ public class FileReadOperation<T> extends SourceOperation<WindowedValue<T>>
{
       FileStatus[] files = fs.globStatus(pattern);
       Queue<SequenceFile.Reader> readers = new LinkedList<>();
       for (FileStatus f : files) {
-        readers.add(new SequenceFile.Reader(fs, files[0].getPath(), conf));
+        readers.add(new SequenceFile.Reader(fs, f.getPath(), conf));
       }
       return new Reader<>(this, readers, coder);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
new file mode 100644
index 0000000..18bff2a
--- /dev/null
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Files based {@link SideInputReader}.
+ */
+public class FileSideInputReader implements SideInputReader {
+
+  private final Map<TupleTag<?>, String> tupleTagToFileName;
+  private final Map<TupleTag<?>, Coder<?>> tupleTagToCoder;
+
+  public FileSideInputReader(List<Graphs.Tag> sideInputTags) {
+    this.tupleTagToFileName = Maps.newHashMap();
+    this.tupleTagToCoder = Maps.newHashMap();
+    for (Graphs.Tag tag : sideInputTags) {
+      tupleTagToFileName.put(tag.getTupleTag(), toFileName(tag.getName()));
+      tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder());
+    }
+  }
+
+  @Nullable
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    String fileName = tupleTagToFileName.get(view.getTagInternal());
+    IterableCoder<WindowedValue<?>> coder =
+        (IterableCoder<WindowedValue<?>>) tupleTagToCoder.get(view.getTagInternal());
+    Coder<WindowedValue<?>> elemCoder = coder.getElemCoder();
+
+    final BoundedWindow sideInputWindow =
+        view.getWindowMappingFn().getSideInputWindow(window);
+
+    Path pattern = new Path(String.format("/tmp/mapreduce/stage-1/%s*", fileName));
+    Configuration conf = new Configuration();
+    conf.set(
+        "io.serializations",
+        "org.apache.hadoop.io.serializer.WritableSerialization,"
+            + "org.apache.hadoop.io.serializer.JavaSerialization");
+    try {
+      FileSystem fs;
+      fs = pattern.getFileSystem(conf);
+      FileStatus[] files = fs.globStatus(pattern);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[0].getPath(), conf);
+
+      List<WindowedValue<?>> availableSideInputs = new ArrayList<>();
+      BytesWritable value = new BytesWritable();
+      while (reader.next(NullWritable.get(), value)) {
+        ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes());
+        availableSideInputs.add(elemCoder.decode(inStream));
+      }
+      Iterable<WindowedValue<?>> sideInputForWindow =
+          Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>()
{
+            @Override
+            public boolean apply(@Nullable WindowedValue<?> sideInputCandidate) {
+              if (sideInputCandidate == null) {
+                return false;
+              }
+              // first match of a sideInputWindow to the elementWindow is good enough.
+              for (BoundedWindow sideInputCandidateWindow: sideInputCandidate.getWindows())
{
+                if (sideInputCandidateWindow.equals(sideInputWindow)) {
+                  return true;
+                }
+              }
+              // no match found.
+              return false;
+            }
+          });
+      return view.getViewFn().apply(sideInputForWindow);
+    } catch (IOException e) {
+      Throwables.throwIfUnchecked(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    return tupleTagToFileName.containsKey(view.getTagInternal());
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return tupleTagToFileName.isEmpty();
+  }
+
+  private String toFileName(String tagName) {
+    return tagName.replaceAll("[^A-Za-z0-9]", "0");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
index 9743d09..b2f793a 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graphs.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.mapreduce.translation;
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.io.Serializable;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
@@ -229,7 +230,7 @@ public class Graphs {
   }
 
   @AutoValue
-  public abstract static class Tag extends Graph.AbstractTag {
+  public abstract static class Tag extends Graph.AbstractTag implements Serializable {
     abstract String getName();
     abstract TupleTag<?> getTupleTag();
     abstract Coder<?> getCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
index 1ae38da..471c7f5 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GroupAlsoByWindowsParDoOperation.java
@@ -40,7 +40,8 @@ public class GroupAlsoByWindowsParDoOperation extends ParDoOperation {
       WindowingStrategy<?, ?> windowingStrategy,
       Coder<?> inputCoder,
       Graphs.Tag outTag) {
-    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), windowingStrategy);
+    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+        ImmutableList.<Graphs.Tag>of(), windowingStrategy);
     this.inputCoder = checkNotNull(inputCoder, "inputCoder");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
index fd1b528..58a7d6d 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/NormalParDoOperation.java
@@ -37,8 +37,9 @@ public class NormalParDoOperation<InputT, OutputT> extends ParDoOperation<InputT
       PipelineOptions options,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
+      List<Graphs.Tag> sideInputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
-    super(options, mainOutputTag, sideOutputTags, windowingStrategy);
+    super(options, mainOutputTag, sideOutputTags, sideInputTags, windowingStrategy);
     this.doFn = checkNotNull(doFn, "doFn");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index c6bf49c..020bfbe 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -24,7 +24,6 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -41,6 +40,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
   protected final SerializedPipelineOptions options;
   protected final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
+  private final List<Graphs.Tag> sideInputTags;
   protected final WindowingStrategy<?, ?> windowingStrategy;
 
   protected DoFnInvoker<InputT, OutputT> doFnInvoker;
@@ -50,11 +50,13 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
       PipelineOptions options,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
+      List<Graphs.Tag> sideInputTags,
       WindowingStrategy<?, ?> windowingStrategy) {
     super(1 + sideOutputTags.size());
     this.options = new SerializedPipelineOptions(checkNotNull(options, "options"));
     this.mainOutputTag = checkNotNull(mainOutputTag, "mainOutputTag");
     this.sideOutputTags = checkNotNull(sideOutputTags, "sideOutputTags");
+    this.sideInputTags = checkNotNull(sideInputTags, "sideInputTags");
     this.windowingStrategy = checkNotNull(windowingStrategy, "windowingStrategy");
   }
 
@@ -74,7 +76,7 @@ public abstract class ParDoOperation<InputT, OutputT> extends Operation<InputT>
     fnRunner = DoFnRunners.simpleRunner(
         options.getPipelineOptions(),
         getDoFn(),
-        NullSideInputReader.empty(),
+        new FileSideInputReader(sideInputTags),
         createOutputManager(),
         mainOutputTag,
         sideOutputTags,

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
index 9bd89fd..ae23f71 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
@@ -35,6 +35,7 @@ class ParDoTranslator<InputT, OutputT>
         userGraphContext.getOptions(),
         transform.getMainOutputTag(),
         transform.getAdditionalOutputTags().getAll(),
+        userGraphContext.getSideInputTags(),
         ((PCollection) userGraphContext.getInput()).getWindowingStrategy());
     context.addInitStep(
         Graphs.Step.of(userGraphContext.getStepName(), operation),

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
index 251828e..459e93b 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ReifyTimestampAndWindowsParDoOperation.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.mapreduce.translation;
 
 import com.google.common.collect.ImmutableList;
+import java.util.Collections;
 import java.util.List;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -36,7 +37,8 @@ public class ReifyTimestampAndWindowsParDoOperation extends ParDoOperation
{
       PipelineOptions options,
       WindowingStrategy<?, ?> windowingStrategy,
       Graphs.Tag outTag) {
-    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(), windowingStrategy);
+    super(options, outTag.getTupleTag(), ImmutableList.<TupleTag<?>>of(),
+        ImmutableList.<Graphs.Tag>of(), windowingStrategy);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index da8ebff..fd6c0ba 100644
--- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -136,6 +136,31 @@ public class TranslationContext {
           .toList();
     }
 
+    public List<Graphs.Tag> getSideInputTags() {
+      if (!(currentNode.getTransform() instanceof ParDo.MultiOutput)) {
+        return ImmutableList.of();
+      }
+      return FluentIterable.from(((ParDo.MultiOutput) currentNode.getTransform()).getSideInputs())
+          .transform(new Function<PValue, Graphs.Tag>() {
+            @Override
+            public Graphs.Tag apply(PValue pValue) {
+              checkState(
+                  pValueToTupleTag.containsKey(pValue),
+                  String.format("Failed to find TupleTag for pValue: %s.", pValue));
+              if (pValue instanceof PCollection) {
+                PCollection<?> pc = (PCollection<?>) pValue;
+                return Graphs.Tag.of(
+                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
+              } else {
+                return Graphs.Tag.of(
+                    pValue.getName(),
+                    pValueToTupleTag.get(pValue),
+                    ((PCollectionView) pValue).getCoderInternal());
+              }
+            }})
+          .toList();
+    }
+
     public List<Graphs.Tag> getOutputTags() {
       if (currentNode.getTransform() instanceof View.CreatePCollectionView) {
         PCollectionView view = ((View.CreatePCollectionView) currentNode.getTransform()).getView();


Mime
View raw message