Return-Path: X-Original-To: apmail-beam-commits-archive@minotaur.apache.org Delivered-To: apmail-beam-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7735919C55 for ; Thu, 7 Apr 2016 18:21:17 +0000 (UTC) Received: (qmail 94565 invoked by uid 500); 7 Apr 2016 18:21:17 -0000 Delivered-To: apmail-beam-commits-archive@beam.apache.org Received: (qmail 94518 invoked by uid 500); 7 Apr 2016 18:21:17 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 94505 invoked by uid 99); 7 Apr 2016 18:21:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Apr 2016 18:21:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id CCF6A1A01B9 for ; Thu, 7 Apr 2016 18:21:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id j4HwIEw-93xE for ; Thu, 7 Apr 2016 18:21:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 85C3C5FB2A for ; Thu, 7 Apr 2016 18:21:12 +0000 (UTC) Received: (qmail 93823 invoked by uid 99); 7 Apr 2016 18:21:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Apr 2016 18:21:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CD8DFE07EF; Thu, 7 Apr 2016 18:21:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Thu, 07 Apr 2016 18:21:14 -0000 Message-Id: <0efe0e30ba5b4756921b73f2e5578338@git.apache.org> In-Reply-To: <2c54fae1788246bba4854eea2756300d@git.apache.org> References: <2c54fae1788246bba4854eea2756300d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/12] incubator-beam git commit: [BEAM-151] Break out Dataflow transport creation to another file [BEAM-151] Break out Dataflow transport creation to another file This prevents moving DataflowPipelineOptions into a Dataflow runner maven module. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d0db477a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d0db477a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d0db477a Branch: refs/heads/master Commit: d0db477a0ce436728f71f0f4aec0b0098eac66be Parents: c8cb55a Author: Luke Cwik Authored: Mon Mar 28 13:09:33 2016 -0700 Committer: Luke Cwik Committed: Thu Apr 7 11:19:49 2016 -0700 ---------------------------------------------------------------------- .../options/DataflowPipelineDebugOptions.java | 5 +- .../sdk/runners/DataflowPipelineRunner.java | 6 +- .../dataflow/sdk/util/DataflowTransport.java | 111 +++++++++++++++++++ .../cloud/dataflow/sdk/util/Transport.java | 46 -------- .../sdk/options/GoogleApiDebugOptionsTest.java | 93 ++++++++-------- 5 files changed, 163 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java index b55fa17..6231bd4 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java @@ -20,11 +20,11 @@ package com.google.cloud.dataflow.sdk.options; import com.google.api.services.dataflow.Dataflow; import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.util.DataflowPathValidator; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.GcsStager; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.PathValidator; import com.google.cloud.dataflow.sdk.util.Stager; -import com.google.cloud.dataflow.sdk.util.Transport; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -159,7 +159,8 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions { public static class DataflowClientFactory implements DefaultValueFactory { @Override public Dataflow create(PipelineOptions options) { - return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build(); + return DataflowTransport.newDataflowClient( + options.as(DataflowPipelineOptions.class)).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 5f43cc3..50ca36f 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -99,6 +99,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.MonitoringUtil; @@ -107,7 +108,6 @@ import com.google.cloud.dataflow.sdk.util.PathValidator; import com.google.cloud.dataflow.sdk.util.PropertyNames; import com.google.cloud.dataflow.sdk.util.Reshuffle; import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal; -import com.google.cloud.dataflow.sdk.util.Transport; import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -444,7 +444,7 @@ public class DataflowPipelineRunner extends PipelineRunner throw new RuntimeException("Should not specify the debuggee"); } - Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); + Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build(); Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier); options.setDebuggee(debuggee); @@ -600,7 +600,7 @@ public class DataflowPipelineRunner extends PipelineRunner // regularly and need not be retried automatically. DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(options.getProject(), jobResult.getId(), - Transport.newRawDataflowClient(options).build(), aggregatorTransforms); + DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java new file mode 100644 index 0000000..8de358c --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + ******************************************************************************/ + +package com.google.cloud.dataflow.sdk.util; + +import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory; +import static com.google.cloud.dataflow.sdk.util.Transport.getTransport; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.clouddebugger.v2.Clouddebugger; +import com.google.api.services.dataflow.Dataflow; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.collect.ImmutableList; + +import java.net.MalformedURLException; +import java.net.URL; + +/** + * Helpers for cloud communication. + */ +public class DataflowTransport { + + + private static class ApiComponents { + public String rootUrl; + public String servicePath; + + public ApiComponents(String root, String path) { + this.rootUrl = root; + this.servicePath = path; + } + } + + private static ApiComponents apiComponentsFromUrl(String urlString) { + try { + URL url = new URL(urlString); + String rootUrl = url.getProtocol() + "://" + url.getHost() + + (url.getPort() > 0 ? ":" + url.getPort() : ""); + return new ApiComponents(rootUrl, url.getPath()); + } catch (MalformedURLException e) { + throw new RuntimeException("Invalid URL: " + urlString); + } + } + + /** + * Returns a Google Cloud Dataflow client builder. + */ + public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { + String servicePath = options.getDataflowEndpoint(); + ApiComponents components; + if (servicePath.contains("://")) { + components = apiComponentsFromUrl(servicePath); + } else { + components = new ApiComponents(options.getApiRootUrl(), servicePath); + } + + return new Dataflow.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setRootUrl(components.rootUrl) + .setServicePath(components.servicePath) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { + return new Clouddebugger.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** + * Returns a Dataflow client that does not automatically retry failed + * requests. + */ + public static Dataflow.Builder + newRawDataflowClient(DataflowPipelineOptions options) { + return newDataflowClient(options) + .setHttpRequestInitializer(options.getGcpCredential()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + private static HttpRequestInitializer chainHttpRequestInitializer( + Credential credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index 187d164..5888822 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -24,13 +24,10 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.clouddebugger.v2.Clouddebugger; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.GcsOptions; import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -128,49 +125,6 @@ public class Transport { } /** - * Returns a Google Cloud Dataflow client builder. - */ - public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { - String servicePath = options.getDataflowEndpoint(); - ApiComponents components; - if (servicePath.contains("://")) { - components = apiComponentsFromUrl(servicePath); - } else { - components = new ApiComponents(options.getApiRootUrl(), servicePath); - } - - return new Dataflow.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setApplicationName(options.getAppName()) - .setRootUrl(components.rootUrl) - .setServicePath(components.servicePath) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { - return new Clouddebugger.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Dataflow client that does not automatically retry failed - * requests. - */ - public static Dataflow.Builder - newRawDataflowClient(DataflowPipelineOptions options) { - return newDataflowClient(options) - .setHttpRequestInitializer(options.getGcpCredential()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** * Returns a Cloud Storage client builder. * *

Note: this client's endpoint is not modified by the http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java index 3201608..c2f0bb8 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java @@ -22,8 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import com.google.api.services.bigquery.Bigquery.Datasets.Delete; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Create; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; +import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.Transport; @@ -37,104 +36,104 @@ import org.junit.runners.JUnit4; /** Tests for {@link GoogleApiDebugOptions}. */ @RunWith(JUnit4.class) public class GoogleApiDebugOptionsTest { + private static final String STORAGE_GET_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; + private static final String STORAGE_GET_AND_LIST_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," + + "\"Objects.List\":\"ListTraceDestination\"}"; + private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; + @Test public void testWhenTracingMatches() throws Exception { - String[] args = - new String[] {"--googleApiTrace={\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); - assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get request = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", request.get("$trace")); } @Test public void testWhenTracingDoesNotMatch() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Projects.Jobs.Create\":\"testToken\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.List request = + Transport.newStorageClient(options).build().objects().list("testProjectId"); assertNull(request.get("$trace")); } @Test public void testWithMultipleTraces() throws Exception { - String[] args = new String[] { - "--googleApiTrace={\"Projects.Jobs.Create\":\"CreateTraceDestination\"," - + "\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("CreateTraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("ListTraceDestination", listRequest.get("$trace")); } @Test - public void testMatchingAllDataflowCalls() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Dataflow\":\"TraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + public void testMatchingAllCalls() throws Exception { + String[] args = new String[] {STORAGE_TRACE}; + GcsOptions options = + PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("TraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("TraceDestination", listRequest.get("$trace")); } @Test public void testMatchingAgainstClient() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build(), "TraceDestination")); + Transport.newStorageClient(options).build(), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Delete deleteRequest = Transport.newBigQueryClient(options).build().datasets() - .delete("testProjectId", "testDatasetId"); + Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class)) + .build().datasets().delete("testProjectId", "testDatasetId"); assertNull(deleteRequest.get("$trace")); } @Test public void testMatchingAgainstRequestType() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build().projects().jobs() - .get("aProjectId", "aJobId"), "TraceDestination")); + Transport.newStorageClient(options).build().objects() + .get("aProjectId", "aObjectId"), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertNull(createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(listRequest.get("$trace")); } @Test