beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: TextIO.Read: support ValueProvider
Date Tue, 01 Nov 2016 16:51:17 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master f8e26be7d -> 75bfd781f


TextIO.Read: support ValueProvider

Add test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4ebdf0bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4ebdf0bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4ebdf0bf

Branch: refs/heads/master
Commit: 4ebdf0bfcd17ab5df9b5d7132507c49979392721
Parents: f8e26be
Author: sammcveety <sam.mcveety@gmail.com>
Authored: Sun Oct 16 18:37:51 2016 -0400
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Nov 1 09:43:53 2016 -0700

----------------------------------------------------------------------
 .../dataflow/internal/ReadTranslator.java       | 12 ++--
 .../DataflowPipelineTranslatorTest.java         | 28 ++++++++++
 .../apache/beam/sdk/io/CompressedSource.java    |  2 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java | 59 ++++++++++++++------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 43 +++++++++++---
 5 files changed, 116 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
index 83836c0..b3af165 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationCo
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.values.PValue;
@@ -50,10 +51,13 @@ public class ReadTranslator implements TransformTranslator<Read.Bounded<?>>
{
       // TODO: Move this validation out of translation once IOChannelUtils is portable
       // and can be reconstructed on the worker.
       if (source instanceof FileBasedSource) {
-        String filePatternOrSpec = ((FileBasedSource<?>) source).getFileOrPatternSpec();
-        context.getPipelineOptions()
-               .getPathValidator()
-               .validateInputFilePatternSupported(filePatternOrSpec);
+        ValueProvider<String> filePatternOrSpec =
+            ((FileBasedSource<?>) source).getFileOrPatternSpecProvider();
+        if (filePatternOrSpec.isAccessible()) {
+          context.getPipelineOptions()
+              .getPathValidator()
+              .validateInputFilePatternSupported(filePatternOrSpec.get());
+        }
       }
 
       context.addStep(transform, "ParallelRead");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 762844b..c925454 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -67,6 +67,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -732,6 +733,33 @@ public class DataflowPipelineTranslatorTest implements Serializable {
         Collections.<DataflowPackage>emptyList());
   }
 
+  private static class TestValueProvider implements ValueProvider<String>, Serializable
{
+    @Override
+    public boolean isAccessible() {
+      return false;
+    }
+
+    @Override
+    public String get() {
+      throw new RuntimeException("Should not be called.");
+    }
+  }
+
+  @Test
+  public void testInaccessibleProvider() throws Exception {
+    DataflowPipelineOptions options = buildPipelineOptions();
+    Pipeline pipeline = Pipeline.create(options);
+    DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
+
+    pipeline.apply(TextIO.Read.from(new TestValueProvider()).withoutValidation());
+
+    // Check that translation does not fail.
+    t.translate(
+        pipeline,
+        (DataflowRunner) pipeline.getRunner(),
+        Collections.<DataflowPackage>emptyList());
+  }
+
   @Test
   public void testToSingletonTranslation() throws Exception {
     // A "change detector" test that makes sure the translation

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
index e0f1b59..f33b9bd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java
@@ -301,7 +301,7 @@ public class CompressedSource<T> extends FileBasedSource<T>
{
    */
   private CompressedSource(
       FileBasedSource<T> sourceDelegate, DecompressingChannelFactory channelFactory)
{
-    super(sourceDelegate.getFileOrPatternSpec(), Long.MAX_VALUE);
+    super(sourceDelegate.getFileOrPatternSpecProvider(), Long.MAX_VALUE);
     this.sourceDelegate = sourceDelegate;
     this.channelFactory = channelFactory;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index b073236..e0fc6b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -39,6 +39,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -77,7 +79,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
   // Package-private for testing.
   static final int THREAD_POOL_SIZE = 128;
 
-  private final String fileOrPatternSpec;
+  private final ValueProvider<String> fileOrPatternSpec;
   private final Mode mode;
 
   /**
@@ -99,6 +101,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
    * @param minBundleSize minimum bundle size in bytes.
    */
   public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
+    this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
+  }
+
+  /**
+   * Create a {@code FileBaseSource} based on a file or a file pattern specification.
+   * Same as the {@code String} constructor, but accepting a {@link ValueProvider}
+   * to allow for runtime configuration of the source.
+   */
+  public FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize)
{
     super(0, Long.MAX_VALUE, minBundleSize);
     mode = Mode.FILEPATTERN;
     this.fileOrPatternSpec = fileOrPatternSpec;
@@ -124,10 +135,14 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
       long startOffset, long endOffset) {
     super(startOffset, endOffset, minBundleSize);
     mode = Mode.SINGLE_FILE_OR_SUBRANGE;
-    this.fileOrPatternSpec = fileName;
+    this.fileOrPatternSpec = StaticValueProvider.of(fileName);
   }
 
   public final String getFileOrPatternSpec() {
+    return fileOrPatternSpec.get();
+  }
+
+  public final ValueProvider<String> getFileOrPatternSpecProvider() {
     return fileOrPatternSpec;
   }
 
@@ -149,7 +164,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
         end,
         getEndOffset());
 
-    FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
+    checkState(fileOrPatternSpec.isAccessible(),
+               "Subrange creation should only happen at execution time.");
+    FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec.get(), start,
end);
     if (start > 0 || end != Long.MAX_VALUE) {
       checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
           "Source created for the range [%s,%s) must be a subrange source", start, end);
@@ -186,12 +203,14 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     // we perform the size estimation of files and file patterns using the interface provided
by
     // IOChannelFactory.
 
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
     if (mode == Mode.FILEPATTERN) {
+      checkState(fileOrPatternSpec.isAccessible(),
+                 "Size estimation should be done at execution time.");
+      IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
       // TODO Implement a more efficient parallel/batch size estimation mechanism for file
patterns.
       long startTime = System.currentTimeMillis();
       long totalSize = 0;
-      Collection<String> inputs = factory.match(fileOrPatternSpec);
+      Collection<String> inputs = factory.match(fileOrPatternSpec.get());
       if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
         totalSize = getExactTotalSizeOfFiles(inputs, factory);
         LOG.debug("Size estimation of all files of pattern {} took {} ms",
@@ -274,7 +293,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-    builder.add(DisplayData.item("filePattern", getFileOrPatternSpec())
+    String patternDisplay = getFileOrPatternSpecProvider().isAccessible()
+      ? getFileOrPatternSpecProvider().get()
+      : getFileOrPatternSpecProvider().toString();
+    builder.add(DisplayData.item("filePattern", patternDisplay)
       .withLabel("File Pattern"));
   }
 
@@ -307,7 +329,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
       ListeningExecutorService service =
           MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
       try {
-        for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) {
+        checkState(fileOrPatternSpec.isAccessible(),
+                   "Bundle splitting should only happen at execution time.");
+        for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec.get()))
{
           futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
         }
         List<? extends FileBasedSource<T>> splitResults =
@@ -346,8 +370,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     // We split a file-based source into subranges only if the file is efficiently seekable.
     // If a file is not efficiently seekable it would be highly inefficient to create and
read a
     // source based on a subrange of that file.
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    return factory.isReadSeekEfficient(fileOrPatternSpec);
+    checkState(fileOrPatternSpec.isAccessible(),
+        "isSplittable should only be called at runtime.");
+    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
+    return factory.isReadSeekEfficient(fileOrPatternSpec.get());
   }
 
   @Override
@@ -357,7 +383,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec);
+      Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec.get());
       List<FileBasedReader<T>> fileReaders = new ArrayList<>();
       for (String fileName : files) {
         long endOffset;
@@ -387,9 +413,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
   public String toString() {
     switch (mode) {
       case FILEPATTERN:
-        return fileOrPatternSpec;
+        return fileOrPatternSpec.toString();
       case SINGLE_FILE_OR_SUBRANGE:
-        return fileOrPatternSpec + " range " + super.toString();
+        return fileOrPatternSpec.toString() + " range " + super.toString();
       default:
         throw new IllegalStateException("Unexpected mode: " + mode);
     }
@@ -420,8 +446,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     checkArgument(
             mode != Mode.FILEPATTERN, "Cannot determine the exact end offset of a file pattern");
     if (getEndOffset() == Long.MAX_VALUE) {
-      IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-      return factory.getSizeBytes(fileOrPatternSpec);
+      IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec.get());
+      return factory.getSizeBytes(fileOrPatternSpec.get());
     } else {
       return getEndOffset();
     }
@@ -493,8 +519,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T>
{
     @Override
     protected final boolean startImpl() throws IOException {
       FileBasedSource<T> source = getCurrentSource();
-      IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec());
-      this.channel = factory.open(source.getFileOrPatternSpec());
+      IOChannelFactory factory = IOChannelUtils.getFactory(
+        source.getFileOrPatternSpecProvider().get());
+      this.channel = factory.open(source.getFileOrPatternSpecProvider().get());
 
       if (channel instanceof SeekableByteChannel) {
         SeekableByteChannel seekChannel = (SeekableByteChannel) channel;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4ebdf0bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 2dbcda7..84c24ea 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -44,6 +45,8 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -142,6 +145,13 @@ public class TextIO {
     }
 
     /**
+     * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
+     */
+    public static Bound<String> from(ValueProvider<String> filepattern) {
+      return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern);
+    }
+
+    /**
      * Returns a transform for reading text files that uses the given
      * {@code Coder<T>} to decode each of the lines of the file into a
      * value of type {@code T}.
@@ -194,7 +204,7 @@ public class TextIO {
      */
     public static class Bound<T> extends PTransform<PBegin, PCollection<T>>
{
       /** The filepattern to read from. */
-      @Nullable private final String filepattern;
+      @Nullable private final ValueProvider<String> filepattern;
 
       /** The Coder to use to decode each line. */
       private final Coder<T> coder;
@@ -209,8 +219,8 @@ public class TextIO {
         this(null, null, coder, true, TextIO.CompressionType.AUTO);
       }
 
-      private Bound(String name, String filepattern, Coder<T> coder, boolean validate,
-          TextIO.CompressionType compressionType) {
+      private Bound(@Nullable String name, @Nullable ValueProvider<String> filepattern,
+          Coder<T> coder, boolean validate, TextIO.CompressionType compressionType)
{
         super(name);
         this.coder = coder;
         this.filepattern = filepattern;
@@ -227,6 +237,16 @@ public class TextIO {
 
        */
       public Bound<T> from(String filepattern) {
+        checkNotNull(filepattern, "Filepattern cannot be empty.");
+        return new Bound<>(name, StaticValueProvider.of(filepattern), coder, validate,
+                           compressionType);
+      }
+
+      /**
+       * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
+       */
+      public Bound<T> from(ValueProvider<String> filepattern) {
+        checkNotNull(filepattern, "Filepattern cannot be empty.");
         return new Bound<>(name, filepattern, coder, validate, compressionType);
       }
 
@@ -278,14 +298,15 @@ public class TextIO {
         }
 
         if (validate) {
+          checkState(filepattern.isAccessible(), "Cannot validate with a RVP.");
           try {
             checkState(
-                !IOChannelUtils.getFactory(filepattern).match(filepattern).isEmpty(),
+              !IOChannelUtils.getFactory(filepattern.get()).match(filepattern.get()).isEmpty(),
                 "Unable to find any files matching %s",
                 filepattern);
           } catch (IOException e) {
             throw new IllegalStateException(
-                String.format("Failed to validate %s", filepattern), e);
+              String.format("Failed to validate %s", filepattern.get()), e);
           }
         }
 
@@ -324,12 +345,14 @@ public class TextIO {
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
 
+        String filepatternDisplay = filepattern.isAccessible()
+          ? filepattern.get() : filepattern.toString();
         builder
             .add(DisplayData.item("compressionType", compressionType.toString())
               .withLabel("Compression Type"))
             .addIfNotDefault(DisplayData.item("validation", validate)
               .withLabel("Validation Enabled"), true)
-            .addIfNotNull(DisplayData.item("filePattern", filepattern)
+            .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
               .withLabel("File Pattern"));
       }
 
@@ -339,7 +362,7 @@ public class TextIO {
       }
 
       public String getFilepattern() {
-        return filepattern;
+        return filepattern.get();
       }
 
       public boolean needsValidation() {
@@ -870,6 +893,12 @@ public class TextIO {
       this.coder = coder;
     }
 
+    @VisibleForTesting
+    TextSource(ValueProvider<String> fileSpec, Coder<T> coder) {
+      super(fileSpec, 1L);
+      this.coder = coder;
+    }
+
     private TextSource(String fileName, long start, long end, Coder<T> coder) {
       super(fileName, 1L, start, end);
       this.coder = coder;


Mime
View raw message