Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4AAF8200B36 for ; Wed, 6 Jul 2016 19:20:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4938F160A55; Wed, 6 Jul 2016 17:20:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1DA06160A64 for ; Wed, 6 Jul 2016 19:20:18 +0200 (CEST) Received: (qmail 42713 invoked by uid 500); 6 Jul 2016 17:20:18 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 42704 invoked by uid 99); 6 Jul 2016 17:20:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 17:20:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id DF1051A07AB for ; Wed, 6 Jul 2016 17:20:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ByxgtQ3Wnv27 for ; Wed, 6 Jul 2016 17:20:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 8F10160E43 for ; Wed, 6 Jul 2016 17:20:09 +0000 (UTC) Received: (qmail 38055 invoked by uid 99); 6 Jul 2016 17:20:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 17:20:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48C5FE1021; Wed, 6 Jul 2016 17:20:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Wed, 06 Jul 2016 17:20:44 -0000 Message-Id: In-Reply-To: <3b9c340d491b47249a4d3d4fe2df8fbf@git.apache.org> References: <3b9c340d491b47249a4d3d4fe2df8fbf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/50] [abbrv] incubator-beam git commit: Replacing BigQuery direct calls with BigQueryServices abstraction archived-at: Wed, 06 Jul 2016 17:20:20 -0000 Replacing BigQuery direct calls with BigQueryServices abstraction Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d71f3850 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d71f3850 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d71f3850 Branch: refs/heads/runners-spark2 Commit: d71f38503cc283d1fd0d444c8452c699ebe819e8 Parents: ed32d10 Author: Pei He Authored: Tue Jun 28 17:49:52 2016 -0700 Committer: Luke Cwik Committed: Wed Jul 6 10:18:52 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 118 ++++++++----------- .../apache/beam/sdk/util/BigQueryServices.java | 12 ++ .../beam/sdk/util/BigQueryServicesImpl.java | 37 ++++++ .../org/apache/beam/sdk/io/BigQueryIOTest.java | 39 ++++-- 4 files changed, 128 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d71f3850/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 790e3ff..7955022 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.util.BigQueryServices.DatasetService; import org.apache.beam.sdk.util.BigQueryServices.JobService; import org.apache.beam.sdk.util.BigQueryServicesImpl; import org.apache.beam.sdk.util.BigQueryTableInserter; -import org.apache.beam.sdk.util.BigQueryTableRowIterator; import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -85,7 +84,6 @@ import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; -import com.google.api.services.bigquery.model.QueryRequest; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -386,7 +384,7 @@ public class BigQueryIO { */ final boolean validate; @Nullable final Boolean flattenResults; - @Nullable final BigQueryServices testBigQueryServices; + @Nullable BigQueryServices bigQueryServices; private static final String QUERY_VALIDATION_FAILURE_ERROR = "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" @@ -403,18 +401,18 @@ public class BigQueryIO { null /* jsonTableRef */, true /* validate */, null /* flattenResults */, - null /* testBigQueryServices */); + null /* bigQueryServices */); } private Bound( String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate, - @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) { + @Nullable Boolean flattenResults, @Nullable BigQueryServices bigQueryServices) { super(name); this.jsonTableRef = jsonTableRef; this.query = query; this.validate = validate; this.flattenResults = flattenResults; - this.testBigQueryServices = testBigQueryServices; + this.bigQueryServices = bigQueryServices; } /** @@ -434,7 +432,7 @@ public class BigQueryIO { */ public Bound from(TableReference table) { return new Bound( - name, query, toJsonString(table), validate, flattenResults, testBigQueryServices); + name, query, toJsonString(table), validate, flattenResults, bigQueryServices); } /** @@ -449,7 +447,7 @@ public class BigQueryIO { */ public Bound fromQuery(String query) { return new Bound(name, query, jsonTableRef, validate, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), testBigQueryServices); + MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), bigQueryServices); } /** @@ -458,7 +456,7 @@ public class BigQueryIO { * occurs. */ public Bound withoutValidation() { - return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices); + return new Bound(name, query, jsonTableRef, false, flattenResults, bigQueryServices); } /** @@ -469,7 +467,7 @@ public class BigQueryIO { * from a table will cause an error during validation. */ public Bound withoutResultFlattening() { - return new Bound(name, query, jsonTableRef, validate, false, testBigQueryServices); + return new Bound(name, query, jsonTableRef, validate, false, bigQueryServices); } @VisibleForTesting @@ -499,36 +497,28 @@ public class BigQueryIO { } if (validate) { + BigQueryServices bqServices = getBigQueryServices(); // Check for source table/query presence for early failure notification. // Note that a presence check can fail if the table or dataset are created by earlier // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these // cases the withoutValidation method can be used to disable the check. if (table != null) { - verifyDatasetPresence(bqOptions, table); - verifyTablePresence(bqOptions, table); + DatasetService datasetService = bqServices.getDatasetService(bqOptions); + verifyDatasetPresence(datasetService, table); + verifyTablePresence(datasetService, table); } if (query != null) { - dryRunQuery(bqOptions, query); + JobService jobService = bqServices.getJobService(bqOptions); + try { + jobService.dryRunQuery(bqOptions.getProject(), query); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e); + } } } } - private static void dryRunQuery(BigQueryOptions options, String query) { - Bigquery client = Transport.newBigQueryClient(options).build(); - QueryRequest request = new QueryRequest(); - request.setQuery(query); - request.setDryRun(true); - - try { - BigQueryTableRowIterator.executeWithBackOff( - client.jobs().query(options.getProject(), request), QUERY_VALIDATION_FAILURE_ERROR, - query); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e); - } - } - @Override public PCollection apply(PInput input) { String uuid = randomUUIDString(); @@ -669,11 +659,10 @@ public class BigQueryIO { } private BigQueryServices getBigQueryServices() { - if (testBigQueryServices != null) { - return testBigQueryServices; - } else { - return new BigQueryServicesImpl(); + if (bigQueryServices == null) { + bigQueryServices = new BigQueryServicesImpl(); } + return bigQueryServices; } } @@ -1443,8 +1432,7 @@ public class BigQueryIO { // An option to indicate if table validation is desired. Default is true. final boolean validate; - // A fake or mock BigQueryServices for tests. - @Nullable private BigQueryServices testBigQueryServices; + @Nullable private BigQueryServices bigQueryServices; private static class TranslateTableSpecFunction implements SerializableFunction { @@ -1475,14 +1463,14 @@ public class BigQueryIO { CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, true /* validate */, - null /* testBigQueryServices */); + null /* bigQueryServices */); } private Bound(String name, @Nullable String jsonTableRef, @Nullable SerializableFunction tableRefFunction, @Nullable String jsonSchema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, - @Nullable BigQueryServices testBigQueryServices) { + @Nullable BigQueryServices bigQueryServices) { super(name); this.jsonTableRef = jsonTableRef; this.tableRefFunction = tableRefFunction; @@ -1490,7 +1478,7 @@ public class BigQueryIO { this.createDisposition = checkNotNull(createDisposition, "createDisposition"); this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); this.validate = validate; - this.testBigQueryServices = testBigQueryServices; + this.bigQueryServices = bigQueryServices; } /** @@ -1510,7 +1498,7 @@ public class BigQueryIO { */ public Bound to(TableReference table) { return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, testBigQueryServices); + writeDisposition, validate, bigQueryServices); } /** @@ -1539,7 +1527,7 @@ public class BigQueryIO { public Bound toTableReference( SerializableFunction tableRefFunction) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, testBigQueryServices); + writeDisposition, validate, bigQueryServices); } /** @@ -1550,7 +1538,7 @@ public class BigQueryIO { */ public Bound withSchema(TableSchema schema) { return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema), - createDisposition, writeDisposition, validate, testBigQueryServices); + createDisposition, writeDisposition, validate, bigQueryServices); } /** @@ -1560,7 +1548,7 @@ public class BigQueryIO { */ public Bound withCreateDisposition(CreateDisposition createDisposition) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, testBigQueryServices); + writeDisposition, validate, bigQueryServices); } /** @@ -1570,7 +1558,7 @@ public class BigQueryIO { */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, testBigQueryServices); + writeDisposition, validate, bigQueryServices); } /** @@ -1580,7 +1568,7 @@ public class BigQueryIO { */ public Bound withoutValidation() { return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, false, testBigQueryServices); + writeDisposition, false, bigQueryServices); } @VisibleForTesting @@ -1590,18 +1578,18 @@ public class BigQueryIO { } private static void verifyTableEmpty( - BigQueryOptions options, + DatasetService datasetService, TableReference table) { try { - Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); - if (!inserter.isEmpty(table)) { + boolean isEmpty = datasetService.isTableEmpty( + table.getProjectId(), table.getDatasetId(), table.getTableId()); + if (!isEmpty) { throw new IllegalArgumentException( "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table)); } - } catch (IOException e) { + } catch (IOException | InterruptedException e) { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); - if (errorExtractor.itemNotFound(e)) { + if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) { // Nothing to do. If the table does not exist, it is considered empty. } else { throw new RuntimeException( @@ -1633,16 +1621,17 @@ public class BigQueryIO { if (jsonTableRef != null && validate) { TableReference table = getTableWithDefaultProject(options); + DatasetService datasetService = getBigQueryServices().getDatasetService(options); // Check for destination table presence and emptiness for early failure notification. // Note that a presence check can fail when the table or dataset is created by an earlier // stage of the pipeline. For these cases the #withoutValidation method can be used to // disable the check. - verifyDatasetPresence(options, table); + verifyDatasetPresence(datasetService, table); if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) { - verifyTablePresence(options, table); + verifyTablePresence(datasetService, table); } if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) { - verifyTableEmpty(options, table); + verifyTableEmpty(datasetService, table); } } @@ -1663,7 +1652,7 @@ public class BigQueryIO { checkArgument( !Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Write needs a GCS temp location to store temp files."); - if (testBigQueryServices == null) { + if (bigQueryServices == null) { try { GcsPath.fromUri(tempLocation); } catch (IllegalArgumentException e) { @@ -1789,11 +1778,10 @@ public class BigQueryIO { } private BigQueryServices getBigQueryServices() { - if (testBigQueryServices != null) { - return testBigQueryServices; - } else { - return new BigQueryServicesImpl(); + if (bigQueryServices == null) { + bigQueryServices = new BigQueryServicesImpl(); } + return bigQueryServices; } } @@ -1985,12 +1973,9 @@ public class BigQueryIO { } } - private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) { + private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { try { - Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableRowIterator.executeWithBackOff( - client.datasets().get(table.getProjectId(), table.getDatasetId()), - RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)); + datasetService.getDataset(table.getProjectId(), table.getDatasetId()); } catch (Exception e) { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { @@ -2006,12 +1991,9 @@ public class BigQueryIO { } } - private static void verifyTablePresence(BigQueryOptions options, TableReference table) { + private static void verifyTablePresence(DatasetService datasetService, TableReference table) { try { - Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableRowIterator.executeWithBackOff( - client.tables().get(table.getProjectId(), table.getDatasetId(), table.getTableId()), - RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)); + datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId()); } catch (Exception e) { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d71f3850/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java index f82edf4..514e005 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java @@ -116,6 +116,18 @@ public interface BigQueryServices extends Serializable { throws IOException, InterruptedException; /** + * Returns true if the table is empty. + */ + boolean isTableEmpty(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException; + + /** + * Gets the specified {@link Dataset} resource by dataset ID. + */ + Dataset getDataset(String projectId, String datasetId) + throws IOException, InterruptedException; + + /** * Create a {@link Dataset} with the given {@code location} and {@code description}. */ void createDataset(String projectId, String datasetId, String location, String description) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d71f3850/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java index 01ea45f..1aadeb2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java @@ -36,6 +36,7 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableDataList; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.hadoop.util.ApiErrorExtractor; @@ -307,6 +308,42 @@ public class BigQueryServicesImpl implements BigQueryServices { backoff); } + @Override + public boolean isTableEmpty(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + TableDataList dataList = executeWithRetries( + client.tabledata().list(projectId, datasetId, tableId), + String.format( + "Unable to list table data: %s, aborting after %d retries.", + tableId, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff); + return dataList.getRows() == null || dataList.getRows().isEmpty(); + } + + /** + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . + */ + @Override + public Dataset getDataset(String projectId, String datasetId) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + return executeWithRetries( + client.datasets().get(projectId, datasetId), + String.format( + "Unable to get dataset: %s, aborting after %d retries.", + datasetId, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff); + } + /** * {@inheritDoc} * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d71f3850/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index f0d3fce..43bf314 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -438,14 +438,22 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testValidateReadSetsDefaultProject() { + public void testValidateReadSetsDefaultProject() throws Exception { + String projectId = "someproject"; + String datasetId = "somedataset"; BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - options.setProject("someproject"); + options.setProject(projectId); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withDatasetService(mockDatasetService); + when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow( + new RuntimeException("Unable to confirm BigQuery dataset presence")); Pipeline p = TestPipeline.create(options); TableReference tableRef = new TableReference(); - tableRef.setDatasetId("somedataset"); + tableRef.setDatasetId(datasetId); tableRef.setTableId("sometable"); thrown.expect(RuntimeException.class); @@ -453,7 +461,8 @@ public class BigQueryIOTest implements Serializable { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - p.apply(BigQueryIO.Read.from(tableRef)); + p.apply(BigQueryIO.Read.from(tableRef) + .withTestServices(fakeBqServices)); } @Test @@ -759,15 +768,24 @@ public class BigQueryIOTest implements Serializable { assertThat(displayData, hasDisplayItem("validation", false)); } - private void testWriteValidatesDataset(boolean streaming) { + private void testWriteValidatesDataset(boolean streaming) throws Exception { + String projectId = "someproject"; + String datasetId = "somedataset"; + BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); - options.setProject("someproject"); + options.setProject(projectId); options.setStreaming(streaming); + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withDatasetService(mockDatasetService); + when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow( + new RuntimeException("Unable to confirm BigQuery dataset presence")); + Pipeline p = TestPipeline.create(options); TableReference tableRef = new TableReference(); - tableRef.setDatasetId("somedataset"); + tableRef.setDatasetId(datasetId); tableRef.setTableId("sometable"); thrown.expect(RuntimeException.class); @@ -779,16 +797,17 @@ public class BigQueryIOTest implements Serializable { .apply(BigQueryIO.Write .to(tableRef) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withSchema(new TableSchema())); + .withSchema(new TableSchema()) + .withTestServices(fakeBqServices)); } @Test - public void testWriteValidatesDatasetBatch() { + public void testWriteValidatesDatasetBatch() throws Exception { testWriteValidatesDataset(false); } @Test - public void testWriteValidatesDatasetStreaming() { + public void testWriteValidatesDatasetStreaming() throws Exception { testWriteValidatesDataset(true); }