beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/4] beam git commit: Extracts common logic from TextIO.ReadAll into a utility transform
Date Wed, 26 Jul 2017 00:51:51 GMT
Extracts common logic from TextIO.ReadAll into a utility transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf0b363
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf0b363
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf0b363

Branch: refs/heads/master
Commit: eaf0b36313fcd59963b2efbf16f50dd913da7de2
Parents: e80c83b
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Jul 21 14:09:13 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  | 152 +++++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIO.java     | 135 ++++------------
 2 files changed, 179 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
new file mode 100644
index 0000000..66aa41e
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Reads each filepattern in the input {@link PCollection} using given parameters for splitting
+ * files into offset ranges and for creating a {@link FileBasedSource} for a file.
+ */
+class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>,
PCollection<T>> {
+  private final SerializableFunction<String, Boolean> isSplittable;
+  private final long desiredBundleSizeBytes;
+  private final SerializableFunction<String, FileBasedSource<T>> createSource;
+
+  public ReadAllViaFileBasedSource(
+      SerializableFunction<String, Boolean> isSplittable,
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, FileBasedSource<T>> createSource) {
+    this.isSplittable = isSplittable;
+    this.desiredBundleSizeBytes = desiredBundleSizeBytes;
+    this.createSource = createSource;
+  }
+
+  @Override
+  public PCollection<T> expand(PCollection<String> input) {
+    return input
+        .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
+        .apply(
+            "Split into ranges",
+            ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes)))
+        .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>())
+        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)));
+  }
+
+  private static class ReshuffleWithUniqueKey<T>
+      extends PTransform<PCollection<T>, PCollection<T>> {
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input
+          .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
+          .apply("Reshuffle", Reshuffle.<Integer, T>of())
+          .apply("Values", Values.<T>create());
+    }
+  }
+
+  private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>>
{
+    private int index;
+
+    @Setup
+    public void setup() {
+      this.index = ThreadLocalRandom.current().nextInt();
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      c.output(KV.of(++index, c.element()));
+    }
+  }
+
+  private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> {
+    @ProcessElement
+    public void process(ProcessContext c) throws Exception {
+      MatchResult match = FileSystems.match(c.element());
+      checkArgument(
+          match.status().equals(MatchResult.Status.OK),
+          "Failed to match filepattern %s: %s",
+          c.element(),
+          match.status());
+      for (MatchResult.Metadata metadata : match.metadata()) {
+        c.output(metadata);
+      }
+    }
+  }
+
+  private static class SplitIntoRangesFn
+      extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, OffsetRange>>
{
+    private final SerializableFunction<String, Boolean> isSplittable;
+    private final long desiredBundleSizeBytes;
+
+    private SplitIntoRangesFn(
+        SerializableFunction<String, Boolean> isSplittable, long desiredBundleSizeBytes)
{
+      this.isSplittable = isSplittable;
+      this.desiredBundleSizeBytes = desiredBundleSizeBytes;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) {
+      MatchResult.Metadata metadata = c.element();
+      if (!metadata.isReadSeekEfficient()
+          || !isSplittable.apply(metadata.resourceId().toString())) {
+        c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
+        return;
+      }
+      for (OffsetRange range :
+          new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSizeBytes, 0)) {
+        c.output(KV.of(metadata, range));
+      }
+    }
+  }
+
+  private static class ReadFileRangesFn<T> extends DoFn<KV<MatchResult.Metadata,
OffsetRange>, T> {
+    private final SerializableFunction<String, FileBasedSource<T>> createSource;
+
+    private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>>
createSource) {
+      this.createSource = createSource;
+    }
+
+    @ProcessElement
+    public void process(ProcessContext c) throws IOException {
+      MatchResult.Metadata metadata = c.element().getKey();
+      OffsetRange range = c.element().getValue();
+      FileBasedSource<T> source = createSource.apply(metadata.toString());
+      try (BoundedSource.BoundedReader<T> reader =
+          source
+              .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
+              .createReader(c.getPipelineOptions())) {
+        for (boolean more = reader.start(); more; more = reader.advance()) {
+          c.output(reader.getCurrent());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf0b363/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 7b4c483..73040da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,8 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -37,23 +35,14 @@ import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.io.fs.MatchResult;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
-import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -356,120 +345,50 @@ public class TextIO {
     @Override
     public PCollection<String> expand(PCollection<String> input) {
       return input
-          .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
           .apply(
-              "Split into ranges",
-              ParDo.of(new SplitIntoRangesFn(getCompressionType(), getDesiredBundleSizeBytes())))
-          .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>())
-          .apply("Read", ParDo.of(new ReadTextFn(this)));
+              "Read all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  new IsSplittableFn(getCompressionType()),
+                  getDesiredBundleSizeBytes(),
+                  new CreateTextSourceFn(getCompressionType())))
+          .setCoder(StringUtf8Coder.of());
     }
 
-    private static class ReshuffleWithUniqueKey<T>
-        extends PTransform<PCollection<T>, PCollection<T>> {
-      @Override
-      public PCollection<T> expand(PCollection<T> input) {
-        return input
-            .apply("Unique key", ParDo.of(new AssignUniqueKeyFn<T>()))
-            .apply("Reshuffle", Reshuffle.<Integer, T>of())
-            .apply("Values", Values.<T>create());
-      }
-    }
-
-    private static class AssignUniqueKeyFn<T> extends DoFn<T, KV<Integer, T>>
{
-      private int index;
-
-      @Setup
-      public void setup() {
-        this.index = ThreadLocalRandom.current().nextInt();
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
 
-      @ProcessElement
-      public void process(ProcessContext c) {
-        c.output(KV.of(++index, c.element()));
-      }
+      builder.add(
+          DisplayData.item("compressionType", getCompressionType().toString())
+              .withLabel("Compression Type"));
     }
 
-    private static class ExpandGlobFn extends DoFn<String, Metadata> {
-      @ProcessElement
-      public void process(ProcessContext c) throws Exception {
-        MatchResult match = FileSystems.match(c.element());
-        checkArgument(
-            match.status().equals(Status.OK),
-            "Failed to match filepattern %s: %s",
-            c.element(),
-            match.status());
-        for (Metadata metadata : match.metadata()) {
-          c.output(metadata);
-        }
-      }
-    }
-
-    private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>>
{
+    private static class CreateTextSourceFn
+        implements SerializableFunction<String, FileBasedSource<String>> {
       private final CompressionType compressionType;
-      private final long desiredBundleSize;
 
-      private SplitIntoRangesFn(CompressionType compressionType, long desiredBundleSize)
{
+      private CreateTextSourceFn(CompressionType compressionType) {
         this.compressionType = compressionType;
-        this.desiredBundleSize = desiredBundleSize;
-      }
-
-      @ProcessElement
-      public void process(ProcessContext c) {
-        Metadata metadata = c.element();
-        final boolean isSplittable = isSplittable(metadata, compressionType);
-        if (!isSplittable) {
-          c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
-          return;
-        }
-        for (OffsetRange range :
-            new OffsetRange(0, metadata.sizeBytes()).split(desiredBundleSize, 0)) {
-          c.output(KV.of(metadata, range));
-        }
       }
 
-      static boolean isSplittable(Metadata metadata, CompressionType compressionType) {
-        if (!metadata.isReadSeekEfficient()) {
-          return false;
-        }
-        switch (compressionType) {
-          case AUTO:
-            return !CompressionMode.isCompressed(metadata.resourceId().toString());
-          case UNCOMPRESSED:
-            return true;
-          case GZIP:
-          case BZIP2:
-          case ZIP:
-          case DEFLATE:
-            return false;
-          default:
-            throw new UnsupportedOperationException("Unknown compression type: " + compressionType);
-        }
+      @Override
+      public FileBasedSource<String> apply(String input) {
+        return Read.wrapWithCompression(
+            new TextSource(StaticValueProvider.of(input)), compressionType);
       }
     }
 
-    private static class ReadTextFn extends DoFn<KV<Metadata, OffsetRange>, String>
{
-      private final TextIO.ReadAll spec;
+    private static class IsSplittableFn implements SerializableFunction<String, Boolean>
{
+      private final CompressionType compressionType;
 
-      private ReadTextFn(ReadAll spec) {
-        this.spec = spec;
+      private IsSplittableFn(CompressionType compressionType) {
+        this.compressionType = compressionType;
       }
 
-      @ProcessElement
-      public void process(ProcessContext c) throws IOException {
-        Metadata metadata = c.element().getKey();
-        OffsetRange range = c.element().getValue();
-        FileBasedSource<String> source =
-            TextIO.Read.wrapWithCompression(
-                new TextSource(StaticValueProvider.of(metadata.toString())),
-                spec.getCompressionType());
-        try (BoundedSource.BoundedReader<String> reader =
-            source
-                .createForSubrangeOfFile(metadata, range.getFrom(), range.getTo())
-                .createReader(c.getPipelineOptions())) {
-          for (boolean more = reader.start(); more; more = reader.advance()) {
-            c.output(reader.getCurrent());
-          }
-        }
+      @Override
+      public Boolean apply(String filename) {
+        return compressionType == CompressionType.UNCOMPRESSED
+            || (compressionType == CompressionType.AUTO && !CompressionMode.isCompressed(filename));
       }
     }
   }


Mime
View raw message