beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-551] Fix handling of TextIO.Sink
Date Sat, 10 Dec 2016 19:12:48 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master a834fb0eb -> e841b1a21


[BEAM-551] Fix handling of TextIO.Sink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ebc4653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ebc4653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ebc4653

Branch: refs/heads/master
Commit: 9ebc4653afffff39aa40217635dbe6f22450be78
Parents: a834fb0
Author: Sam McVeety <sgmc@google.com>
Authored: Wed Dec 7 13:27:53 2016 -0800
Committer: Davor Bonaci <davor@google.com>
Committed: Sat Dec 10 11:11:38 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  7 ++-
 .../runners/dataflow/DataflowRunnerTest.java    | 20 +++++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 62 ++++++++++++--------
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  6 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 19 ++++++
 .../org/apache/beam/sdk/io/XmlSinkTest.java     |  4 +-
 6 files changed, 89 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ebc4653/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 85318e6..d902ccb 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2065,8 +2065,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
     public PDone expand(PCollection<T> input) {
       if (transform.getSink() instanceof FileBasedSink) {
         FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
-        PathValidator validator = runner.options.getPathValidator();
-        validator.validateOutputFilePrefixSupported(sink.getBaseOutputFilename());
+        if (sink.getBaseOutputFilenameProvider().isAccessible()) {
+          PathValidator validator = runner.options.getPathValidator();
+          validator.validateOutputFilePrefixSupported(
+              sink.getBaseOutputFilenameProvider().get());
+        }
       }
       return transform.expand(input);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ebc4653/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 133ae8a..4159b61 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
 import org.apache.beam.sdk.testing.ExpectedLogs;
@@ -313,6 +314,25 @@ public class DataflowRunnerTest {
     assertValidJob(jobCaptor.getValue());
   }
 
+  /** Options for testing. */
+  public interface RuntimeTestOptions extends PipelineOptions {
+    ValueProvider<String> getInput();
+    void setInput(ValueProvider<String> value);
+
+    ValueProvider<String> getOutput();
+    void setOutput(ValueProvider<String> value);
+  }
+
+  @Test
+  public void testTextIOWithRuntimeParameters() throws IOException {
+    DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
+    RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
+    Pipeline p = buildDataflowPipeline(dataflowOptions);
+    p
+        .apply(TextIO.Read.from(options.getInput()).withoutValidation())
+        .apply(TextIO.Write.to(options.getOutput()).withoutValidation());
+  }
+
   @Test
   public void testRunReturnDifferentRequestId() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ebc4653/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 1396ab6..33296b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -42,7 +42,9 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -201,8 +203,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   /**
    * Returns the base output filename for this file based sink.
    */
-  public String getBaseOutputFilename() {
-    return baseOutputFilename.get();
+  public ValueProvider<String> getBaseOutputFilenameProvider() {
+    return baseOutputFilename;
   }
 
   @Override
@@ -290,7 +292,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     protected final FileBasedSink<T> sink;
 
     /** Directory for temporary output files. */
-    protected final String tempDirectory;
+    protected final ValueProvider<String> tempDirectory;
 
     /** Constructs a temporary file path given the temporary directory and a filename. */
     protected static String buildTemporaryFilename(String tempDirectory, String filename)
@@ -308,22 +310,31 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * @param sink the FileBasedSink that will be used to configure this write operation.
      */
     public FileBasedWriteOperation(FileBasedSink<T> sink) {
-      this(sink, buildTemporaryDirectoryName(sink.getBaseOutputFilename()));
+      this(sink, NestedValueProvider.of(
+          sink.getBaseOutputFilenameProvider(), new TemporaryDirectoryBuilder()));
     }
 
-    private static String buildTemporaryDirectoryName(String baseOutputFilename) {
-      try {
-        IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename);
-        Path baseOutputPath = factory.toPath(baseOutputFilename);
-        return baseOutputPath
-            .resolveSibling(
-                "temp-beam-"
-                    + baseOutputPath.getFileName()
-                    + "-"
-                    + Instant.now().toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
-            .toString();
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+    private static class TemporaryDirectoryBuilder
+        implements SerializableFunction<String, String> {
+      // The intent of the code is to have a consistent value of tempDirectory across
+      // all workers, which wouldn't happen if now() was called inline.
+      Instant now = Instant.now();
+
+      @Override
+      public String apply(String baseOutputFilename) {
+        try {
+          IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename);
+          Path baseOutputPath = factory.toPath(baseOutputFilename);
+          return baseOutputPath
+              .resolveSibling(
+                  "temp-beam-"
+                  + baseOutputPath.getFileName()
+                  + "-"
+                  + now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
+              .toString();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
       }
     }
 
@@ -334,6 +345,10 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      * @param tempDirectory the base directory to be used for temporary output files.
      */
     public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
+      this(sink, StaticValueProvider.of(tempDirectory));
+    }
+
+    private FileBasedWriteOperation(FileBasedSink<T> sink, ValueProvider<String>
tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
     }
@@ -452,8 +467,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      */
     protected final void removeTemporaryFiles(List<String> knownFiles, PipelineOptions
options)
         throws IOException {
-      LOG.debug("Removing temporary bundle output files in {}.", tempDirectory);
-      IOChannelFactory factory = IOChannelUtils.getFactory(tempDirectory);
+      String tempDir = tempDirectory.get();
+      LOG.debug("Removing temporary bundle output files in {}.", tempDir);
+      IOChannelFactory factory = IOChannelUtils.getFactory(tempDir);
 
       // To partially mitigate the effects of filesystems with eventually-consistent
       // directory matching APIs, we remove not only files that the filesystem says exist
@@ -461,17 +477,17 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       // (produced by successfully completed bundles).
       // This may still fail to remove temporary outputs of some failed bundles, but at least
       // the common case (where all bundles succeed) is guaranteed to be fully addressed.
-      Collection<String> matches = factory.match(factory.resolve(tempDirectory, "*"));
+      Collection<String> matches = factory.match(factory.resolve(tempDir, "*"));
       Set<String> allMatches = new HashSet<>(matches);
       allMatches.addAll(knownFiles);
       LOG.debug(
           "Removing {} temporary files found under {} ({} matched glob, {} known files)",
           allMatches.size(),
-          tempDirectory,
+          tempDir,
           matches.size(),
           allMatches.size() - matches.size());
       factory.remove(allMatches);
-      factory.remove(ImmutableList.of(tempDirectory));
+      factory.remove(ImmutableList.of(tempDir));
     }
 
     /**
@@ -569,7 +585,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     public final void open(String uId) throws Exception {
       this.id = uId;
       filename = FileBasedWriteOperation.buildTemporaryFilename(
-          getWriteOperation().tempDirectory, uId);
+          getWriteOperation().tempDirectory.get(), uId);
       LOG.debug("Opening {}.", filename);
       final WritableByteChannelFactory factory =
           getWriteOperation().getSink().writableByteChannelFactory;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ebc4653/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 4ab3843..930ca29 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -231,7 +231,9 @@ public class FileBasedSinkTest {
       assertFalse(temporaryFiles.get(i).exists());
     }
 
-    assertFalse(new File(writeOp.tempDirectory).exists());
+    assertFalse(new File(writeOp.tempDirectory.get()).exists());
+    // Test that repeated requests of the temp directory return a stable result.
+    assertEquals(writeOp.tempDirectory.get(), writeOp.tempDirectory.get());
   }
 
   /**
@@ -487,7 +489,7 @@ public class FileBasedSinkTest {
     final FileBasedWriter<String> writer =
         writeOp.createWriter(null);
     final String expectedFilename =
-        writeOp.tempDirectory + "/" + testUid;
+        writeOp.tempDirectory.get() + "/" + testUid;
 
     final List<String> expected = new ArrayList<>();
     expected.add("header");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ebc4653/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index d3a5d5e..472399a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -88,6 +88,7 @@ import org.apache.beam.sdk.io.TextIO.TextSource;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -629,6 +630,24 @@ public class TextIOTest {
     pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
   }
 
+  /** Options for testing. */
+  public interface RuntimeTestOptions extends PipelineOptions {
+    ValueProvider<String> getInput();
+    void setInput(ValueProvider<String> value);
+
+    ValueProvider<String> getOutput();
+    void setOutput(ValueProvider<String> value);
+  }
+
+  @Test
+  public void testRuntimeOptionsNotCalledInApply() throws Exception {
+    Pipeline pipeline = TestPipeline.create();
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    pipeline
+        .apply(TextIO.Read.from(options.getInput()).withoutValidation())
+        .apply(TextIO.Write.to(options.getOutput()).withoutValidation());
+  }
+
   @Test
   public void testReadWithoutValidationFlag() throws Exception {
     TextIO.Read.Bound<String> read = TextIO.Read.from("gs://bucket/foo*/baz");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ebc4653/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index f9a9655..96b8c57 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -146,7 +146,7 @@ public class XmlSinkTest {
     assertEquals(testRootElement, writeOp.getSink().rootElementName);
     assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writeOp.tempDirectory).toPath();
+    Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());
     assertThat(
         tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
@@ -163,7 +163,7 @@ public class XmlSinkTest {
             .createWriteOperation(options);
     XmlWriter<Bird> writer = writeOp.createWriter(options);
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writer.getWriteOperation().tempDirectory).toPath();
+    Path tempPath = new File(writer.getWriteOperation().tempDirectory.get()).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());
     assertThat(
         tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));


Mime
View raw message