beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-93] Add subnetwork support and increment Dataflow API dependency
Date Sun, 06 Mar 2016 05:19:13 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master bf15d2f3c -> 22ff05c49


[BEAM-93] Add subnetwork support and increment Dataflow API dependency


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab10ac35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab10ac35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab10ac35

Branch: refs/heads/master
Commit: ab10ac3560ee38398ff222f552e372e91f1ca4af
Parents: bf15d2f
Author: sammcveety <sam.mcveety@gmail.com>
Authored: Wed Mar 2 21:27:08 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Sat Mar 5 21:16:12 2016 -0800

----------------------------------------------------------------------
 pom.xml                                         |  2 +-
 .../DataflowPipelineWorkerPoolOptions.java      | 12 +++++++
 .../sdk/runners/DataflowPipelineTranslator.java |  3 ++
 .../runners/DataflowPipelineTranslatorTest.java | 34 ++++++++++++++++++++
 4 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index de47ff5..f9dbab7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
     <avro.version>1.7.7</avro.version>
     <bigquery.version>v2-rev248-1.21.0</bigquery.version>
     <bigtable.version>0.2.3</bigtable.version>
-    <dataflow.version>v1b3-rev19-1.21.0</dataflow.version>
+    <dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
     <dataflow.proto.version>0.5.160222</dataflow.proto.version>
     <datastore.version>v1beta2-rev1-4.0.0</datastore.version>
     <google-clients.version>1.21.0</google-clients.version>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
index 25d1589..be5cfdc 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
@@ -145,6 +145,18 @@ public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions
{
   void setNetwork(String value);
 
   /**
+   * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a>
for launching
+   * workers.
+   *
+   * <p>Default is up to the Dataflow service.  Expected format is zones/ZONE/subnetworks/SUBNETWORK.
+   */
+  @Description("GCE subnetwork for launching workers. For more information, see the reference
"
+      + "documentation https://cloud.google.com/compute/docs/networking. "
+      + "Default is up to the Dataflow service.")
+  String getSubnetwork();
+  void setSubnetwork(String value);
+
+  /**
    * GCE <a href="https://developers.google.com/compute/docs/zones"
    * >availability zone</a> for launching workers.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index ae3a403..d0cc4e5 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -442,6 +442,9 @@ public class DataflowPipelineTranslator {
       if (!Strings.isNullOrEmpty(options.getNetwork())) {
         workerPool.setNetwork(options.getNetwork());
       }
+      if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
+        workerPool.setSubnetwork(options.getSubnetwork());
+      }
       if (options.getDiskSizeGb() > 0) {
         workerPool.setDiskSizeGb(options.getDiskSizeGb());
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab10ac35/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
index 72090a0..497552f 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -231,6 +231,40 @@ public class DataflowPipelineTranslatorTest {
   }
 
   @Test
+  public void testSubnetworkConfig() throws IOException {
+    final String testSubnetwork = "zones/ZONE/subnetworks/SUBNETWORK";
+
+    DataflowPipelineOptions options = buildPipelineOptions();
+    options.setSubnetwork(testSubnetwork);
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertEquals(testSubnetwork,
+        job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+  }
+
+  @Test
+  public void testSubnetworkConfigMissing() throws IOException {
+    DataflowPipelineOptions options = buildPipelineOptions();
+
+    DataflowPipeline p = buildPipeline(options);
+    p.traverseTopologically(new RecordingPipelineVisitor());
+    Job job =
+        DataflowPipelineTranslator.fromOptions(options)
+            .translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
+            .getJob();
+
+    assertEquals(1, job.getEnvironment().getWorkerPools().size());
+    assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+  }
+
+  @Test
   public void testScalingAlgorithmMissing() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
 


Mime
View raw message