beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Add getRegion to the DataflowPipelineOptions
Date Wed, 22 Feb 2017 22:48:02 GMT
Repository: beam
Updated Branches:
  refs/heads/master 0f9fefe74 -> 17ab371f1


Add getRegion to the DataflowPipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/26c42b98
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/26c42b98
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/26c42b98

Branch: refs/heads/master
Commit: 26c42b9802ac88d2710071ff30fae2575b84c0ed
Parents: 0f9fefe
Author: Dan Halperin <dhalperi@google.com>
Authored: Wed Feb 22 10:00:11 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Feb 22 14:47:44 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowClient.java   |  44 +++---
 .../runners/dataflow/DataflowPipelineJob.java   |   2 +-
 .../options/DataflowPipelineOptions.java        |  17 +++
 .../dataflow/DataflowPipelineJobTest.java       | 153 +++++++++++--------
 .../runners/dataflow/DataflowRunnerTest.java    |  64 ++++----
 5 files changed, 168 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/26c42b98/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
index 3536d72..dfd1c2b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowClient.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs;
+import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
@@ -40,15 +40,15 @@ import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 public class DataflowClient {
 
   public static DataflowClient create(DataflowPipelineOptions options) {
-    return new DataflowClient(options.getDataflowClient(), options.getProject());
+    return new DataflowClient(options.getDataflowClient(), options);
   }
 
   private final Dataflow dataflow;
-  private final String projectId;
+  private final DataflowPipelineOptions options;
 
-  private DataflowClient(Dataflow dataflow, String projectId) {
+  private DataflowClient(Dataflow dataflow, DataflowPipelineOptions options) {
     this.dataflow = checkNotNull(dataflow, "dataflow");
-    this.projectId = checkNotNull(projectId, "options");
+    this.options = checkNotNull(options, "options");
   }
 
   /**
@@ -56,7 +56,8 @@ public class DataflowClient {
    */
   public Job createJob(@Nonnull Job job) throws IOException {
     checkNotNull(job, "job");
-    Jobs.Create jobsCreate = dataflow.projects().jobs().create(projectId, job);
+    Jobs.Create jobsCreate = dataflow.projects().locations().jobs()
+        .create(options.getProject(), options.getRegion(), job);
     return jobsCreate.execute();
   }
 
@@ -65,8 +66,8 @@ public class DataflowClient {
    * the {@link DataflowPipelineOptions}.
    */
   public ListJobsResponse listJobs(@Nullable String pageToken) throws IOException {
-    Jobs.List jobsList = dataflow.projects().jobs()
-        .list(projectId)
+    Jobs.List jobsList = dataflow.projects().locations().jobs()
+        .list(options.getProject(), options.getRegion())
         .setPageToken(pageToken);
     return jobsList.execute();
   }
@@ -77,8 +78,8 @@ public class DataflowClient {
   public Job updateJob(@Nonnull String jobId, @Nonnull Job content) throws IOException {
     checkNotNull(jobId, "jobId");
     checkNotNull(content, "content");
-    Jobs.Update jobsUpdate = dataflow.projects().jobs()
-        .update(projectId, jobId, content);
+    Jobs.Update jobsUpdate = dataflow.projects().locations().jobs()
+        .update(options.getProject(), options.getRegion(), jobId, content);
     return jobsUpdate.execute();
   }
 
@@ -87,8 +88,8 @@ public class DataflowClient {
    */
   public Job getJob(@Nonnull String jobId) throws IOException {
     checkNotNull(jobId, "jobId");
-    Jobs.Get jobsGet = dataflow.projects().jobs()
-        .get(projectId, jobId);
+    Jobs.Get jobsGet = dataflow.projects().locations().jobs()
+        .get(options.getProject(), options.getRegion(), jobId);
     return jobsGet.execute();
   }
 
@@ -97,8 +98,8 @@ public class DataflowClient {
    */
   public JobMetrics getJobMetrics(@Nonnull String jobId) throws IOException {
     checkNotNull(jobId, "jobId");
-    Jobs.GetMetrics jobsGetMetrics = dataflow.projects().jobs()
-        .getMetrics(projectId, jobId);
+    Jobs.GetMetrics jobsGetMetrics = dataflow.projects().locations().jobs()
+        .getMetrics(options.getProject(), options.getRegion(), jobId);
     return jobsGetMetrics.execute();
   }
 
@@ -108,8 +109,8 @@ public class DataflowClient {
   public ListJobMessagesResponse listJobMessages(
       @Nonnull String jobId, @Nullable String pageToken) throws IOException {
     checkNotNull(jobId, "jobId");
-    Jobs.Messages.List jobMessagesList = dataflow.projects().jobs().messages()
-        .list(projectId, jobId)
+    Jobs.Messages.List jobMessagesList = dataflow.projects().locations().jobs().messages()
+        .list(options.getProject(), options.getRegion(), jobId)
         .setPageToken(pageToken);
     return jobMessagesList.execute();
   }
@@ -117,24 +118,27 @@ public class DataflowClient {
   /**
    * Leases the work item for {@code jobId}.
    */
+  @SuppressWarnings("unused")  // used internally in the Cloud Dataflow execution environment.
   public LeaseWorkItemResponse leaseWorkItem(
       @Nonnull String jobId, @Nonnull LeaseWorkItemRequest request) throws IOException {
     checkNotNull(jobId, "jobId");
     checkNotNull(request, "request");
-    Jobs.WorkItems.Lease jobWorkItemsLease = dataflow.projects().jobs().workItems()
-        .lease(projectId, jobId, request);
+    Jobs.WorkItems.Lease jobWorkItemsLease = dataflow.projects().locations().jobs().workItems()
+        .lease(options.getProject(), options.getRegion(), jobId, request);
     return jobWorkItemsLease.execute();
   }
 
   /**
    * Reports the status of the work item for {@code jobId}.
    */
+  @SuppressWarnings("unused")  // used internally in the Cloud Dataflow execution environment.
   public ReportWorkItemStatusResponse reportWorkItemStatus(
       @Nonnull String jobId, @Nonnull ReportWorkItemStatusRequest request) throws IOException
{
     checkNotNull(jobId, "jobId");
     checkNotNull(request, "request");
-    Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus = dataflow.projects().jobs().workItems()
-        .reportStatus(projectId, jobId, request);
+    Jobs.WorkItems.ReportStatus jobWorkItemsReportStatus =
+        dataflow.projects().locations().jobs().workItems()
+            .reportStatus(options.getProject(), options.getRegion(), jobId, request);
     return jobWorkItemsReportStatus.execute();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/26c42b98/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
index 0da7137..950a9d3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java
@@ -242,7 +242,7 @@ public class DataflowPipelineJob implements PipelineResult {
   @VisibleForTesting
   State waitUntilFinish(
       Duration duration,
-      MonitoringUtil.JobMessagesHandler messageHandler,
+      @Nullable MonitoringUtil.JobMessagesHandler messageHandler,
       Sleeper sleeper,
       NanoClock nanoClock) throws IOException, InterruptedException {
     MonitoringUtil monitor = new MonitoringUtil(dataflowClient);

http://git-wip-us.apache.org/repos/asf/beam/blob/26c42b98/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 5ddc5d0..8689c3e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -97,6 +97,23 @@ public interface DataflowPipelineOptions
   void setServiceAccount(String value);
 
   /**
+   * The Google Compute Engine
+   * <a href="https://cloud.google.com/compute/docs/regions-zones/regions-zones">region</a>
+   * for creating Dataflow jobs.
+   *
+   * <p>NOTE: The Cloud Dataflow service does not yet honor this setting. However,
once service
+   * support is added then users of this SDK will be able to control the region.
+   */
+  @Hidden
+  @Experimental
+  @Description("The Google Compute Engine region for creating Dataflow jobs. See "
+      + "https://cloud.google.com/compute/docs/regions-zones/regions-zones for a list of
valid "
+      + "options. Default is up to the Dataflow service.")
+  @Default.String("us-central1")
+  String getRegion();
+  void setRegion(String region);
+
+  /**
    * Returns a default staging location under {@link GcpOptions#getGcpTempLocation}.
    */
   class StagingLocationFactory implements DefaultValueFactory<String> {

http://git-wip-us.apache.org/repos/asf/beam/blob/26c42b98/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
index 2690e71..a2d67a0 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java
@@ -30,7 +30,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
@@ -39,9 +38,9 @@ import static org.mockito.Mockito.when;
 import com.google.api.client.util.NanoClock;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.Dataflow;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.GetMetrics;
-import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages;
+import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Get;
+import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.GetMetrics;
+import com.google.api.services.dataflow.Dataflow.Projects.Locations.Jobs.Messages;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricStructuredName;
@@ -64,12 +63,13 @@ import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.NoopPathValidator;
+import org.apache.beam.sdk.util.TestCredential;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.TaggedPValue;
@@ -88,16 +88,19 @@ import org.mockito.MockitoAnnotations;
  */
 @RunWith(JUnit4.class)
 public class DataflowPipelineJobTest {
-  private static final String PROJECT_ID = "someProject";
+  private static final String PROJECT_ID = "some-project";
+  private static final String REGION_ID = "some-region-2b";
   private static final String JOB_ID = "1234";
-  private static final String REPLACEMENT_JOB_ID = "replacementJobId";
+  private static final String REPLACEMENT_JOB_ID = "4321";
 
   @Mock
   private Dataflow mockWorkflowClient;
   @Mock
   private Dataflow.Projects mockProjects;
   @Mock
-  private Dataflow.Projects.Jobs mockJobs;
+  private Dataflow.Projects.Locations mockLocations;
+  @Mock
+  private Dataflow.Projects.Locations.Jobs mockJobs;
   @Rule
   public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper();
 
@@ -114,11 +117,17 @@ public class DataflowPipelineJobTest {
     MockitoAnnotations.initMocks(this);
 
     when(mockWorkflowClient.projects()).thenReturn(mockProjects);
-    when(mockProjects.jobs()).thenReturn(mockJobs);
+    when(mockProjects.locations()).thenReturn(mockLocations);
+    when(mockLocations.jobs()).thenReturn(mockJobs);
 
     options = PipelineOptionsFactory.as(TestDataflowPipelineOptions.class);
     options.setDataflowClient(mockWorkflowClient);
     options.setProject(PROJECT_ID);
+    options.setRegion(REGION_ID);
+    options.setRunner(DataflowRunner.class);
+    options.setTempLocation("gs://fakebucket/temp");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setGcpCredential(new TestCredential());
   }
 
   /**
@@ -128,8 +137,8 @@ public class DataflowPipelineJobTest {
    *
    * @param pollingInterval The initial polling interval given.
    * @param retries The number of retries made
-   * @param timeSleptMillis The amount of time slept by the clock. This is checked
-   * against the valid interval.
+   * @param timeSleptMillis The amount of time slept by the clock. This is checked against
the valid
+   * interval.
    */
   private void checkValidInterval(Duration pollingInterval, int retries, long timeSleptMillis)
{
     long highSum = 0;
@@ -137,7 +146,7 @@ public class DataflowPipelineJobTest {
     for (int i = 0; i < retries; i++) {
       double currentInterval =
           pollingInterval.getMillis()
-          * Math.pow(DataflowPipelineJob.DEFAULT_BACKOFF_EXPONENT, i);
+              * Math.pow(DataflowPipelineJob.DEFAULT_BACKOFF_EXPONENT, i);
       double randomOffset = 0.5 * currentInterval;
       highSum += Math.round(currentInterval + randomOffset);
       lowSum += Math.round(currentInterval - randomOffset);
@@ -147,19 +156,20 @@ public class DataflowPipelineJobTest {
 
   @Test
   public void testWaitToFinishMessagesFail() throws Exception {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
     Job statusResponse = new Job();
     statusResponse.setCurrentState("JOB_STATE_" + State.DONE.name());
-    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
 
     MonitoringUtil.JobMessagesHandler jobHandler = mock(MonitoringUtil.JobMessagesHandler.class);
-    Dataflow.Projects.Jobs.Messages mockMessages =
-        mock(Dataflow.Projects.Jobs.Messages.class);
-    Messages.List listRequest = mock(Dataflow.Projects.Jobs.Messages.List.class);
+    Dataflow.Projects.Locations.Jobs.Messages mockMessages =
+        mock(Dataflow.Projects.Locations.Jobs.Messages.class);
+    Messages.List listRequest = mock(Dataflow.Projects.Locations.Jobs.Messages.List.class);
     when(mockJobs.messages()).thenReturn(mockMessages);
-    when(mockMessages.list(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(listRequest);
+    when(mockMessages.list(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(listRequest);
     when(listRequest.setPageToken(eq((String) null))).thenReturn(listRequest);
     when(listRequest.execute()).thenThrow(SocketTimeoutException.class);
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
@@ -174,7 +184,8 @@ public class DataflowPipelineJobTest {
   }
 
   public State mockWaitToFinishInState(State state) throws Exception {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
     Job statusResponse = new Job();
     statusResponse.setCurrentState("JOB_STATE_" + state.name());
@@ -182,7 +193,7 @@ public class DataflowPipelineJobTest {
       statusResponse.setReplacedByJobId(REPLACEMENT_JOB_ID);
     }
 
-    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
@@ -247,9 +258,10 @@ public class DataflowPipelineJobTest {
 
   @Test
   public void testWaitToFinishFail() throws Exception {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
-    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenThrow(IOException.class);
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
@@ -267,9 +279,10 @@ public class DataflowPipelineJobTest {
 
   @Test
   public void testWaitToFinishTimeFail() throws Exception {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
-    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenThrow(IOException.class);
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
@@ -286,11 +299,12 @@ public class DataflowPipelineJobTest {
 
   @Test
   public void testCumulativeTimeOverflow() throws Exception {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
     Job statusResponse = new Job();
     statusResponse.setCurrentState("JOB_STATE_RUNNING");
-    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
 
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
@@ -310,12 +324,13 @@ public class DataflowPipelineJobTest {
 
   @Test
   public void testGetStateReturnsServiceState() throws Exception {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
     Job statusResponse = new Job();
     statusResponse.setCurrentState("JOB_STATE_" + State.RUNNING.name());
 
-    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
 
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
@@ -331,9 +346,10 @@ public class DataflowPipelineJobTest {
 
   @Test
   public void testGetStateWithExceptionReturnsUnknown() throws Exception {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
-    when(mockJobs.get(eq(PROJECT_ID), eq(JOB_ID))).thenReturn(statusRequest);
+    when(mockJobs.get(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID))).thenReturn(statusRequest);
     when(statusRequest.execute()).thenThrow(IOException.class);
     DataflowAggregatorTransforms dataflowAggregatorTransforms =
         mock(DataflowAggregatorTransforms.class);
@@ -359,21 +375,21 @@ public class DataflowPipelineJobTest {
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
     AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, TestPipeline.create());
+        appliedPTransform(fullName, pTransform, Pipeline.create(options));
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator,
pTransform).asMap(),
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform,
stepName));
 
     GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
     JobMetrics jobMetrics = new JobMetrics();
     when(getMetrics.execute()).thenReturn(jobMetrics);
 
     jobMetrics.setMetrics(ImmutableList.<MetricUpdate>of());
 
     Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
     Job modelJob = new Job();
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
@@ -395,21 +411,21 @@ public class DataflowPipelineJobTest {
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
     AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, TestPipeline.create());
+        appliedPTransform(fullName, pTransform, Pipeline.create(options));
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator,
pTransform).asMap(),
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform,
stepName));
 
     GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
     JobMetrics jobMetrics = new JobMetrics();
     when(getMetrics.execute()).thenReturn(jobMetrics);
 
     jobMetrics.setMetrics(null);
 
     Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
     Job modelJob = new Job();
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
@@ -433,14 +449,14 @@ public class DataflowPipelineJobTest {
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
     AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, TestPipeline.create());
+        appliedPTransform(fullName, pTransform, Pipeline.create(options));
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator,
pTransform).asMap(),
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform,
stepName));
 
     GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
     JobMetrics jobMetrics = new JobMetrics();
     when(getMetrics.execute()).thenReturn(jobMetrics);
 
@@ -456,7 +472,7 @@ public class DataflowPipelineJobTest {
     jobMetrics.setMetrics(ImmutableList.of(update));
 
     Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
     Job modelJob = new Job();
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
@@ -478,7 +494,7 @@ public class DataflowPipelineJobTest {
     String aggregatorName = "agg";
     Aggregator<Long, Long> aggregator = new TestAggregator<>(combineFn, aggregatorName);
 
-    Pipeline p = TestPipeline.create();
+    Pipeline p = Pipeline.create(options);
 
     @SuppressWarnings("unchecked")
     PTransform<PInput, POutput> pTransform = mock(PTransform.class);
@@ -495,12 +511,12 @@ public class DataflowPipelineJobTest {
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(
-                                aggregator, pTransform, aggregator, otherTransform).asMap(),
+            aggregator, pTransform, aggregator, otherTransform).asMap(),
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(
             appliedTransform, stepName, otherAppliedTransform, otherStepName));
 
     GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
     JobMetrics jobMetrics = new JobMetrics();
     when(getMetrics.execute()).thenReturn(jobMetrics);
 
@@ -525,7 +541,7 @@ public class DataflowPipelineJobTest {
     jobMetrics.setMetrics(ImmutableList.of(updateOne, updateTwo));
 
     Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
     Job modelJob = new Job();
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
@@ -552,14 +568,14 @@ public class DataflowPipelineJobTest {
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
     AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, TestPipeline.create());
+        appliedPTransform(fullName, pTransform, Pipeline.create(options));
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator,
pTransform).asMap(),
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform,
stepName));
 
     GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
     JobMetrics jobMetrics = new JobMetrics();
     when(getMetrics.execute()).thenReturn(jobMetrics);
 
@@ -574,7 +590,7 @@ public class DataflowPipelineJobTest {
     jobMetrics.setMetrics(ImmutableList.of(ignoredUpdate));
 
     Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
     Job modelJob = new Job();
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
@@ -617,19 +633,19 @@ public class DataflowPipelineJobTest {
     String stepName = "s1";
     String fullName = "Foo/Bar/Baz";
     AppliedPTransform<?, ?, ?> appliedTransform =
-        appliedPTransform(fullName, pTransform, TestPipeline.create());
+        appliedPTransform(fullName, pTransform, Pipeline.create(options));
 
     DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(
         ImmutableSetMultimap.<Aggregator<?, ?>, PTransform<?, ?>>of(aggregator,
pTransform).asMap(),
         ImmutableMap.<AppliedPTransform<?, ?, ?>, String>of(appliedTransform,
stepName));
 
     GetMetrics getMetrics = mock(GetMetrics.class);
-    when(mockJobs.getMetrics(PROJECT_ID, JOB_ID)).thenReturn(getMetrics);
+    when(mockJobs.getMetrics(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getMetrics);
     IOException cause = new IOException();
     when(getMetrics.execute()).thenThrow(cause);
 
     Get getState = mock(Get.class);
-    when(mockJobs.get(PROJECT_ID, JOB_ID)).thenReturn(getState);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(getState);
     Job modelJob = new Job();
     when(getState.execute()).thenReturn(modelJob);
     modelJob.setCurrentState(State.RUNNING.toString());
@@ -683,6 +699,7 @@ public class DataflowPipelineJobTest {
 
 
   private static class FastNanoClockAndFuzzySleeper implements NanoClock, Sleeper {
+
     private long fastNanoTime;
 
     public FastNanoClockAndFuzzySleeper() {
@@ -702,8 +719,10 @@ public class DataflowPipelineJobTest {
 
   @Test
   public void testCancelUnterminatedJobThatSucceeds() throws IOException {
-    Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
-    when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+    Dataflow.Projects.Locations.Jobs.Update update =
+        mock(Dataflow.Projects.Locations.Jobs.Update.class);
+    when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
+        .thenReturn(update);
     when(update.execute()).thenReturn(new Job());
 
     DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
@@ -713,21 +732,24 @@ public class DataflowPipelineJobTest {
     content.setProjectId(PROJECT_ID);
     content.setId(JOB_ID);
     content.setRequestedState("JOB_STATE_CANCELLED");
-    verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
+    verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), eq(content));
     verifyNoMoreInteractions(mockJobs);
   }
 
   @Test
   public void testCancelUnterminatedJobThatFails() throws IOException {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Get.class);
 
     Job statusResponse = new Job();
     statusResponse.setCurrentState("JOB_STATE_RUNNING");
-    when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
 
-    Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
-    when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+    Dataflow.Projects.Locations.Jobs.Update update = mock(
+        Dataflow.Projects.Locations.Jobs.Update.class);
+    when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
+        .thenReturn(update);
     when(update.execute()).thenThrow(new IOException());
 
     DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
@@ -741,21 +763,24 @@ public class DataflowPipelineJobTest {
     content.setProjectId(PROJECT_ID);
     content.setId(JOB_ID);
     content.setRequestedState("JOB_STATE_CANCELLED");
-    verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
-    verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID));
+    verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), eq(content));
+    verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID);
   }
 
   @Test
   public void testCancelTerminatedJob() throws IOException {
-    Dataflow.Projects.Jobs.Get statusRequest = mock(Dataflow.Projects.Jobs.Get.class);
+    Dataflow.Projects.Locations.Jobs.Get statusRequest = mock(
+        Dataflow.Projects.Locations.Jobs.Get.class);
 
     Job statusResponse = new Job();
     statusResponse.setCurrentState("JOB_STATE_FAILED");
-    when(mockJobs.get(anyString(), anyString())).thenReturn(statusRequest);
+    when(mockJobs.get(PROJECT_ID, REGION_ID, JOB_ID)).thenReturn(statusRequest);
     when(statusRequest.execute()).thenReturn(statusResponse);
 
-    Dataflow.Projects.Jobs.Update update = mock(Dataflow.Projects.Jobs.Update.class);
-    when(mockJobs.update(anyString(), anyString(), any(Job.class))).thenReturn(update);
+    Dataflow.Projects.Locations.Jobs.Update update = mock(
+        Dataflow.Projects.Locations.Jobs.Update.class);
+    when(mockJobs.update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), any(Job.class)))
+        .thenReturn(update);
     when(update.execute()).thenThrow(new IOException());
 
     DataflowPipelineJob job = new DataflowPipelineJob(JOB_ID, options, null);
@@ -765,8 +790,8 @@ public class DataflowPipelineJobTest {
     content.setProjectId(PROJECT_ID);
     content.setId(JOB_ID);
     content.setRequestedState("JOB_STATE_CANCELLED");
-    verify(mockJobs).update(eq(PROJECT_ID), eq(JOB_ID), eq(content));
-    verify(mockJobs).get(eq(PROJECT_ID), eq(JOB_ID));
+    verify(mockJobs).update(eq(PROJECT_ID), eq(REGION_ID), eq(JOB_ID), eq(content));
+    verify(mockJobs).get(PROJECT_ID, REGION_ID, JOB_ID);
     verifyNoMoreInteractions(mockJobs);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/26c42b98/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d847880..4719217 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -113,6 +113,7 @@ public class DataflowRunnerTest {
   private static final String NON_EXISTENT_BUCKET = "gs://non-existent-bucket/location";
 
   private static final String PROJECT_ID = "some-project";
+  private static final String REGION_ID = "some-region-1";
 
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -121,7 +122,7 @@ public class DataflowRunnerTest {
   @Rule
   public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class);
 
-  private Dataflow.Projects.Jobs mockJobs;
+  private Dataflow.Projects.Locations.Jobs mockJobs;
   private GcsUtil mockGcsUtil;
 
   // Asserts that the given Job has all expected fields set.
@@ -159,7 +160,7 @@ public class DataflowRunnerTest {
     // The dataflow pipeline attempts to output to this location.
     when(mockGcsUtil.bucketAccessible(GcsPath.fromUri("gs://bucket/object"))).thenReturn(true);
 
-    mockJobs = mock(Dataflow.Projects.Jobs.class);
+    mockJobs = mock(Dataflow.Projects.Locations.Jobs.class);
   }
 
   private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) {
@@ -176,14 +177,17 @@ public class DataflowRunnerTest {
   private Dataflow buildMockDataflow() throws IOException {
     Dataflow mockDataflowClient = mock(Dataflow.class);
     Dataflow.Projects mockProjects = mock(Dataflow.Projects.class);
-    Dataflow.Projects.Jobs.Create mockRequest =
-        mock(Dataflow.Projects.Jobs.Create.class);
-    Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class);
+    Dataflow.Projects.Locations mockLocations = mock(Dataflow.Projects.Locations.class);
+    Dataflow.Projects.Locations.Jobs.Create mockRequest =
+        mock(Dataflow.Projects.Locations.Jobs.Create.class);
+    Dataflow.Projects.Locations.Jobs.List mockList = mock(
+        Dataflow.Projects.Locations.Jobs.List.class);
 
     when(mockDataflowClient.projects()).thenReturn(mockProjects);
-    when(mockProjects.jobs()).thenReturn(mockJobs);
-    when(mockJobs.create(eq(PROJECT_ID), isA(Job.class))).thenReturn(mockRequest);
-    when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList);
+    when(mockProjects.locations()).thenReturn(mockLocations);
+    when(mockLocations.jobs()).thenReturn(mockJobs);
+    when(mockJobs.create(eq(PROJECT_ID), eq(REGION_ID), isA(Job.class))).thenReturn(mockRequest);
+    when(mockJobs.list(eq(PROJECT_ID), eq(REGION_ID))).thenReturn(mockList);
     when(mockList.setPageToken(anyString())).thenReturn(mockList);
     when(mockList.execute())
         .thenReturn(
@@ -226,6 +230,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     options.setProject(PROJECT_ID);
     options.setTempLocation(VALID_TEMP_BUCKET);
+    options.setRegion(REGION_ID);
     // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath.
     options.setFilesToStage(new LinkedList<String>());
     options.setDataflowClient(buildMockDataflow());
@@ -304,7 +309,7 @@ public class DataflowRunnerTest {
     assertEquals("newid", job.getJobId());
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -331,8 +336,10 @@ public class DataflowRunnerTest {
   public void testRunReturnDifferentRequestId() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
     Dataflow mockDataflowClient = options.getDataflowClient();
-    Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class);
-    when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class)))
+    Dataflow.Projects.Locations.Jobs.Create mockRequest = mock(
+        Dataflow.Projects.Locations.Jobs.Create.class);
+    when(mockDataflowClient.projects().locations().jobs()
+        .create(eq(PROJECT_ID), eq(REGION_ID), any(Job.class)))
         .thenReturn(mockRequest);
     Job resultJob = new Job();
     resultJob.setId("newid");
@@ -347,7 +354,7 @@ public class DataflowRunnerTest {
     } catch (DataflowJobAlreadyExistsException expected) {
       assertThat(expected.getMessage(),
           containsString("If you want to submit a second job, try again by setting a "
-            + "different name using --jobName."));
+              + "different name using --jobName."));
       assertEquals(expected.getJob().getJobId(), resultJob.getId());
     }
   }
@@ -362,7 +369,7 @@ public class DataflowRunnerTest {
     assertEquals("newid", job.getJobId());
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -384,8 +391,10 @@ public class DataflowRunnerTest {
     options.setUpdate(true);
     options.setJobName("oldJobName");
     Dataflow mockDataflowClient = options.getDataflowClient();
-    Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class);
-    when(mockDataflowClient.projects().jobs().create(eq(PROJECT_ID), any(Job.class)))
+    Dataflow.Projects.Locations.Jobs.Create mockRequest = mock(
+        Dataflow.Projects.Locations.Jobs.Create.class);
+    when(mockDataflowClient.projects().locations().jobs()
+        .create(eq(PROJECT_ID), eq(REGION_ID), any(Job.class)))
         .thenReturn(mockRequest);
     final Job resultJob = new Job();
     resultJob.setId("newid");
@@ -434,6 +443,7 @@ public class DataflowRunnerTest {
     options.setTempLocation(VALID_TEMP_BUCKET);
     options.setTempDatasetId(cloudDataflowDataset);
     options.setProject(PROJECT_ID);
+    options.setRegion(REGION_ID);
     options.setJobName("job");
     options.setDataflowClient(buildMockDataflow());
     options.setGcsUtil(mockGcsUtil);
@@ -445,7 +455,7 @@ public class DataflowRunnerTest {
     assertEquals("newid", job.getJobId());
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     Job workflowJob = jobCaptor.getValue();
     assertValidJob(workflowJob);
 
@@ -485,7 +495,7 @@ public class DataflowRunnerTest {
   public void detectClassPathResourceWithFileResources() throws Exception {
     File file = tmpFolder.newFile("file");
     File file2 = tmpFolder.newFile("file2");
-    URLClassLoader classLoader = new URLClassLoader(new URL[]{
+    URLClassLoader classLoader = new URLClassLoader(new URL[] {
         file.toURI().toURL(),
         file2.toURI().toURL()
     });
@@ -506,7 +516,7 @@ public class DataflowRunnerTest {
   @Test
   public void detectClassPathResourceWithNonFileResources() throws Exception {
     String url = "http://www.google.com/all-the-secrets.jar";
-    URLClassLoader classLoader = new URLClassLoader(new URL[]{
+    URLClassLoader classLoader = new URLClassLoader(new URL[] {
         new URL(url)
     });
     thrown.expect(IllegalArgumentException.class);
@@ -542,7 +552,7 @@ public class DataflowRunnerTest {
     p.run();
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -568,7 +578,7 @@ public class DataflowRunnerTest {
     p.run();
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -592,7 +602,7 @@ public class DataflowRunnerTest {
     DataflowRunner.fromOptions(options);
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -657,7 +667,7 @@ public class DataflowRunnerTest {
     DataflowRunner.fromOptions(options);
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -672,7 +682,7 @@ public class DataflowRunnerTest {
     DataflowRunner.fromOptions(options);
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 
@@ -687,9 +697,9 @@ public class DataflowRunnerTest {
     DataflowRunner.fromOptions(options);
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
-   }
+  }
 
   @Test
   public void testNoProjectFails() {
@@ -933,7 +943,7 @@ public class DataflowRunnerTest {
     Pipeline p = Pipeline.create(options);
 
     p.apply(Create.of(Arrays.asList(1, 2, 3)))
-     .apply(new TestTransform());
+        .apply(new TestTransform());
 
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage(Matchers.containsString("no translator registered"));
@@ -942,7 +952,7 @@ public class DataflowRunnerTest {
             p, (DataflowRunner) p.getRunner(), Collections.<DataflowPackage>emptyList());
 
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
-    Mockito.verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture());
+    Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
     assertValidJob(jobCaptor.getValue());
   }
 


Mime
View raw message