beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: [BEAM-59] Register standard FileSystems wherever we register IOChannelFactories
Date Tue, 18 Apr 2017 09:12:44 GMT
Repository: beam
Updated Branches:
  refs/heads/master c52ce7c4b -> e5568589c


[BEAM-59] Register standard FileSystems wherever we register IOChannelFactories

Additionally, drop an unnecessary use of `GcsOptions` in
`PipelineRunner`.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b43c92f2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b43c92f2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b43c92f2

Branch: refs/heads/master
Commit: b43c92f208304cfc10d79b140682dfbe6580d7c4
Parents: c52ce7c
Author: Dan Halperin <dhalperi@google.com>
Authored: Mon Apr 17 20:39:48 2017 -0700
Committer: Jean-Baptiste Onofré <jbonofre@apache.org>
Committed: Tue Apr 18 10:33:10 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/utils/SerializedPipelineOptions.java    | 2 ++
 .../beam/runners/spark/translation/SparkRuntimeContext.java   | 2 ++
 .../main/java/org/apache/beam/sdk/runners/PipelineRunner.java | 7 +++----
 .../main/java/org/apache/beam/sdk/testing/TestPipeline.java   | 2 ++
 4 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
index 390e6da..2256bb1 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.IOChannelUtils;
 
@@ -55,6 +56,7 @@ public class SerializedPipelineOptions implements Serializable {
         pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
 
         IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
       } catch (IOException e) {
         throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 4ccfead..9d0f576 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
@@ -130,6 +131,7 @@ public class SparkRuntimeContext implements Serializable {
         }
         // register IO factories.
         IOChannelUtils.registerIOFactoriesAllowOverride(pipelineOptions);
+        FileSystems.setDefaultConfigInWorkers(pipelineOptions);
       }
       return pipelineOptions;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
index 80bb90f..7b2fba3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PipelineRunner.java
@@ -21,9 +21,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 
@@ -41,11 +40,11 @@ public abstract class PipelineRunner<ResultT extends PipelineResult>
{
    * @return The newly created runner.
    */
   public static PipelineRunner<? extends PipelineResult> fromOptions(PipelineOptions
options) {
-    GcsOptions gcsOptions = PipelineOptionsValidator.validate(GcsOptions.class, options);
     checkNotNull(options);
 
     // (Re-)register standard IO factories. Clobbers any prior credentials.
-    IOChannelUtils.registerIOFactoriesAllowOverride(gcsOptions);
+    IOChannelUtils.registerIOFactoriesAllowOverride(options);
+    FileSystems.setDefaultConfigInWorkers(options);
 
     @SuppressWarnings("unchecked")
     PipelineRunner<? extends PipelineResult> result =

http://git-wip-us.apache.org/repos/asf/beam/blob/b43c92f2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index a4ab196..3d3de51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -40,6 +40,7 @@ import java.util.Map.Entry;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -404,6 +405,7 @@ public class TestPipeline extends Pipeline implements TestRule {
       options.setStableUniqueNames(CheckEnabled.ERROR);
 
       IOChannelUtils.registerIOFactoriesAllowOverride(options);
+      FileSystems.setDefaultConfigInWorkers(options);
       return options;
     } catch (IOException e) {
       throw new RuntimeException(


Mime
View raw message