beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-59] DataflowRunner: Sink is always a FileBasedSink now
Date Mon, 01 May 2017 05:14:02 GMT
Repository: beam
Updated Branches:
  refs/heads/master 9f2733ac4 -> 1197bef19


[BEAM-59] DataflowRunner: Sink is always a FileBasedSink now


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b51e860c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b51e860c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b51e860c

Branch: refs/heads/master
Commit: b51e860c7830202d0f332a52adae43634e6733fb
Parents: 9f2733a
Author: Dan Halperin <dhalperi@google.com>
Authored: Sun Apr 30 10:47:37 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Sun Apr 30 22:13:45 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/dataflow/DataflowRunner.java    | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b51e860c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a61fe49..97a4ded 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -837,13 +837,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (transform.getSink() instanceof FileBasedSink) {
-        FileBasedSink<?> sink = (FileBasedSink<?>) transform.getSink();
-        if (sink.getBaseOutputFilenameProvider().isAccessible()) {
-          PathValidator validator = runner.options.getPathValidator();
-          validator.validateOutputFilePrefixSupported(
-              sink.getBaseOutputFilenameProvider().get());
-        }
+      FileBasedSink<T> sink = transform.getSink();
+      if (sink.getBaseOutputFilenameProvider().isAccessible()) {
+        PathValidator validator = runner.options.getPathValidator();
+        validator.validateOutputFilePrefixSupported(
+            sink.getBaseOutputFilenameProvider().get());
       }
       return transform.expand(input);
     }


Mime
View raw message