beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/7] incubator-beam git commit: Move tempLocation to PipelineOptions.
Date Thu, 24 Mar 2016 20:42:45 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 1c21aa2d5 -> c1de175bd


Move tempLocation to PipelineOptions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bc0659a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bc0659a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bc0659a

Branch: refs/heads/master
Commit: 8bc0659af754786677446f2f9941702f9e9ee5be
Parents: 45309ca
Author: Pei He <peihe0@gmail.com>
Authored: Mon Mar 14 16:02:32 2016 -0700
Committer: Pei He <peihe0@gmail.com>
Committed: Mon Mar 14 16:53:49 2016 -0700

----------------------------------------------------------------------
 .../FlinkGroupAlsoByWindowWrapper.java          | 11 ++++++++-
 .../sdk/options/DataflowPipelineOptions.java    | 26 +++-----------------
 .../dataflow/sdk/options/PipelineOptions.java   | 14 +++++++++++
 .../sdk/runners/DataflowPipelineRunner.java     |  4 +++
 .../sdk/runners/DataflowPipelineRunnerTest.java |  6 ++---
 5 files changed, 33 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index e115a15..b413d7a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -476,6 +476,15 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
           @Override
           public void setStableUniqueNames(CheckEnabled enabled) {
           }
+
+          @Override
+          public String getTempLocation() {
+            return null;
+          }
+
+          @Override
+          public void setTempLocation(String tempLocation) {
+          }
         };
       }
       return options;
@@ -628,4 +637,4 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
     // restore the timerInternals.
     this.timerInternals.restoreTimerInternals(reader, inputKvCoder, windowCoder);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
index 1aa4342..6794032 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineOptions.java
@@ -34,8 +34,6 @@ public interface DataflowPipelineOptions extends
     GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions,
     DataflowProfilingOptions {
 
-  static final String DATAFLOW_STORAGE_LOCATION = "Dataflow Storage Location";
-
   @Description("Project id. Required when running a Dataflow in the cloud. "
       + "See https://cloud.google.com/storage/docs/projects for further details.")
   @Override
@@ -46,36 +44,18 @@ public interface DataflowPipelineOptions extends
   void setProject(String value);
 
   /**
-   * GCS path for temporary files, e.g. gs://bucket/object
-   *
-   * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
-   *
-   * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()}
must be set. If
-   * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
-   * {@link #getStagingLocation()}.
-   */
-  @Description("GCS path for temporary files, eg \"gs://bucket/object\". "
-      + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
-      + "At least one of tempLocation or stagingLocation must be set. If tempLocation is
unset, "
-      + "defaults to using stagingLocation.")
-  @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
-  String getTempLocation();
-  void setTempLocation(String value);
-
-  /**
    * GCS path for staging local files, e.g. gs://bucket/object
    *
    * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://"
    *
-   * <p>At least one of {@link #getTempLocation()} or {@link #getStagingLocation()}
must be set. If
-   * {@link #getTempLocation()} is not set, then the Dataflow pipeline defaults to using
-   * {@link #getStagingLocation()}.
+   * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()}
+   * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow
+   * pipeline defaults to using {@link PipelineOptions#getTempLocation()}.
    */
   @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". "
       + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". "
       + "At least one of stagingLocation or tempLocation must be set. If stagingLocation
is unset, "
       + "defaults to using tempLocation.")
-  @Validation.Required(groups = {DATAFLOW_STORAGE_LOCATION})
   String getStagingLocation();
   void setStagingLocation(String value);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java
index 923033d..4c33a22 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/PipelineOptions.java
@@ -245,4 +245,18 @@ public interface PipelineOptions {
   @Default.Enum("WARNING")
   CheckEnabled getStableUniqueNames();
   void setStableUniqueNames(CheckEnabled enabled);
+
+  /**
+   * A pipeline level default location for storing temporary files.
+   *
+   * <p>This can be a path of any file system.
+   *
+   * <p>{@link #getTempLocation()} can be used as a default location in other
+   * {@link PipelineOptions}.
+   *
+   * <p>If it is unset, {@link PipelineRunner} can override it.
+   */
+  @Description("A pipeline level default location for storing temporary files.")
+  String getTempLocation();
+  void setTempLocation(String value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index c90b904..d716b95 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -251,6 +251,10 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
     }
 
     PathValidator validator = dataflowOptions.getPathValidator();
+    Preconditions.checkArgument(!(Strings.isNullOrEmpty(dataflowOptions.getTempLocation())
+        && Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())),
+        "Missing required value: at least one of tempLocation or stagingLocation must be
set.");
+
     if (dataflowOptions.getStagingLocation() != null) {
       validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bc0659a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
index c5f2d3f..300d5d5 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java
@@ -649,10 +649,8 @@ public class DataflowPipelineRunnerTest {
     options.setProject("foo-project");
 
     thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Missing required value for group");
-    thrown.expectMessage(DataflowPipelineOptions.DATAFLOW_STORAGE_LOCATION);
-    thrown.expectMessage("getStagingLocation");
-    thrown.expectMessage("getTempLocation");
+    thrown.expectMessage(
+        "Missing required value: at least one of tempLocation or stagingLocation must be
set.");
 
     DataflowPipelineRunner.fromOptions(options);
   }


Mime
View raw message