Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F1A58200B36 for ; Wed, 6 Jul 2016 19:20:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EF511160A55; Wed, 6 Jul 2016 17:20:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C4871160A64 for ; Wed, 6 Jul 2016 19:20:20 +0200 (CEST) Received: (qmail 42999 invoked by uid 500); 6 Jul 2016 17:20:20 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 42990 invoked by uid 99); 6 Jul 2016 17:20:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 17:20:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 8C295C030A for ; Wed, 6 Jul 2016 17:20:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 0oB4Fl3kP05C for ; Wed, 6 Jul 2016 17:20:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 9D08160E46 for ; Wed, 6 Jul 2016 17:20:09 +0000 (UTC) Received: (qmail 38059 invoked by uid 99); 6 Jul 2016 17:20:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jul 2016 17:20:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4FD7DE5CE1; Wed, 6 Jul 2016 17:20:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lcwik@apache.org To: commits@beam.incubator.apache.org Date: Wed, 06 Jul 2016 17:20:46 -0000 Message-Id: <8cbdafcce1b348c9a8c024ac48475fce@git.apache.org> In-Reply-To: <3b9c340d491b47249a4d3d4fe2df8fbf@git.apache.org> References: <3b9c340d491b47249a4d3d4fe2df8fbf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [40/50] [abbrv] incubator-beam git commit: [BEAM-142] - BigQueryIO: don't unnecessarily initialize an ExecutorService to validate parameters archived-at: Wed, 06 Jul 2016 17:20:22 -0000 [BEAM-142] - BigQueryIO: don't unnecessarily initialize an ExecutorService to validate parameters By default, BigQueryIO initializes a ExecutorContext, however AppEngine doesn't allow modification of threads to be daemon threads. Using GcsOptions.ExecutorContext to create the executorContext. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6924358e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6924358e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6924358e Branch: refs/heads/runners-spark2 Commit: 6924358e701839f2b2deeb37ac4c106ae03ed731 Parents: 8853118 Author: Lucas Amorim Authored: Tue Jun 28 14:28:17 2016 -0700 Committer: Luke Cwik Committed: Wed Jul 6 10:18:52 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 10 +++---- .../beam/sdk/util/BigQueryTableInserter.java | 31 +++++++++++++------- .../sdk/util/BigQueryTableInserterTest.java | 17 +++++++---- .../apache/beam/sdk/util/BigQueryUtilTest.java | 12 +++++--- .../util/RetryHttpRequestInitializerTest.java | 5 +++- 5 files changed, 48 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index a9d85b8..790e3ff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -1594,7 +1594,7 @@ public class BigQueryIO { TableReference table) { try { Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client); + BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); if (!inserter.isEmpty(table)) { throw new IllegalArgumentException( "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table)); @@ -2084,7 +2084,7 @@ public class BigQueryIO { for (String tableSpec : tableRows.keySet()) { TableReference tableReference = getOrCreateTable(options, tableSpec); flushRows(client, tableReference, tableRows.get(tableSpec), - uniqueIdsForTableRows.get(tableSpec)); + uniqueIdsForTableRows.get(tableSpec), options); } tableRows.clear(); uniqueIdsForTableRows.clear(); @@ -2109,7 +2109,7 @@ public class BigQueryIO { if (!createdTables.contains(tableSpec)) { TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class); Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client); + BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_IF_NEEDED, tableSchema); createdTables.add(tableSpec); @@ -2121,10 +2121,10 @@ public class BigQueryIO { /** Writes the accumulated rows into BigQuery with streaming API. */ private void flushRows(Bigquery client, TableReference tableReference, - List tableRows, List uniqueIds) { + List tableRows, List uniqueIds, BigQueryOptions options) { if (!tableRows.isEmpty()) { try { - BigQueryTableInserter inserter = new BigQueryTableInserter(client); + BigQueryTableInserter inserter = new BigQueryTableInserter(client, options); inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java index f87a3c4..84004a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryTableInserter.java @@ -22,6 +22,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition; +import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import com.google.api.client.util.BackOff; @@ -38,7 +40,6 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +51,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -83,43 +82,51 @@ public class BigQueryTableInserter { private final TableReference defaultRef; private final long maxRowsPerBatch; - private static final ExecutorService executor = MoreExecutors.getExitingExecutorService( - (ThreadPoolExecutor) Executors.newFixedThreadPool(100), 10, TimeUnit.SECONDS); + private ExecutorService executor; /** * Constructs a new row inserter. * * @param client a BigQuery client + * @param options a PipelineOptions object */ - public BigQueryTableInserter(Bigquery client) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options) { this.client = client; this.defaultRef = null; this.maxRowsPerBatch = MAX_ROWS_PER_BATCH; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** * Constructs a new row inserter. * * @param client a BigQuery client + * @param options a PipelineOptions object * @param defaultRef identifies the table to insert into - * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery)} + * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions)} */ @Deprecated - public BigQueryTableInserter(Bigquery client, TableReference defaultRef) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options, + TableReference defaultRef) { this.client = client; this.defaultRef = defaultRef; this.maxRowsPerBatch = MAX_ROWS_PER_BATCH; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** * Constructs a new row inserter. * * @param client a BigQuery client + * @param options a PipelineOptions object + * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery */ - public BigQueryTableInserter(Bigquery client, int maxRowsPerBatch) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options, + int maxRowsPerBatch) { this.client = client; this.defaultRef = null; this.maxRowsPerBatch = maxRowsPerBatch; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** @@ -127,13 +134,15 @@ public class BigQueryTableInserter { * * @param client a BigQuery client * @param defaultRef identifies the default table to insert into - * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, int)} + * @deprecated replaced by {@link #BigQueryTableInserter(Bigquery, PipelineOptions, int)} */ @Deprecated - public BigQueryTableInserter(Bigquery client, TableReference defaultRef, int maxRowsPerBatch) { + public BigQueryTableInserter(Bigquery client, PipelineOptions options, + TableReference defaultRef, int maxRowsPerBatch) { this.client = client; this.defaultRef = defaultRef; this.maxRowsPerBatch = maxRowsPerBatch; + this.executor = options.as(GcsOptions.class).getExecutorService(); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java index 7d9c8a8..344e916 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java @@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import com.google.api.client.googleapis.json.GoogleJsonError; @@ -75,6 +77,7 @@ public class BigQueryTableInserterTest { @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class); @Mock private LowLevelHttpResponse response; private Bigquery bigquery; + private PipelineOptions options; @Before public void setUp() { @@ -97,6 +100,8 @@ public class BigQueryTableInserterTest { new Bigquery.Builder( transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()) .build(); + + options = PipelineOptionsFactory.create(); } @After @@ -139,7 +144,7 @@ public class BigQueryTableInserterTest { when(response.getStatusCode()).thenReturn(200); when(response.getContent()).thenReturn(toStream(testTable)); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); Table ret = inserter.tryCreateTable( new Table(), @@ -160,7 +165,7 @@ public class BigQueryTableInserterTest { public void testCreateTableSucceedsAlreadyExists() throws IOException { when(response.getStatusCode()).thenReturn(409); // 409 means already exists - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); Table ret = inserter.tryCreateTable( new Table(), @@ -191,7 +196,7 @@ public class BigQueryTableInserterTest { .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) .thenReturn(toStream(testTable)); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); Table ret = inserter.tryCreateTable( testTable, @@ -227,7 +232,7 @@ public class BigQueryTableInserterTest { thrown.expect(GoogleJsonResponseException.class); thrown.expectMessage("actually forbidden"); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); try { inserter.tryCreateTable( new Table(), @@ -261,7 +266,7 @@ public class BigQueryTableInserterTest { .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) .thenReturn(toStream(new TableDataInsertAllResponse())); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); inserter.insertAll(ref, rows); verify(response, times(2)).getStatusCode(); @@ -291,7 +296,7 @@ public class BigQueryTableInserterTest { thrown.expect(GoogleJsonResponseException.class); thrown.expectMessage("actually forbidden"); - BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options); try { inserter.insertAll(ref, rows); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java index 65fbeb7..c033a7d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java @@ -31,6 +31,8 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import org.apache.beam.sdk.io.BigQueryIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Sum; @@ -81,10 +83,12 @@ public class BigQueryUtilTest { @Mock private Bigquery.Tables.Get mockTablesGet; @Mock private Bigquery.Tabledata mockTabledata; @Mock private Bigquery.Tabledata.List mockTabledataList; + private PipelineOptions options; @Before public void setUp() { MockitoAnnotations.initMocks(this); + this.options = PipelineOptionsFactory.create(); } @After @@ -369,7 +373,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND, BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null); @@ -387,7 +391,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null); @@ -408,7 +412,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options); try { inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, @@ -432,7 +436,7 @@ public class BigQueryUtilTest { TableReference ref = BigQueryIO .parseTableSpec("project:dataset.table"); - BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, 5); + BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5); List rows = new ArrayList<>(); List ids = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6924358e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java index 7d212d4..83ffaa1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java @@ -30,6 +30,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpResponse; @@ -281,7 +283,8 @@ public class RetryHttpRequestInitializerTest { // RetryHttpInitializer. Bigquery b = new Bigquery.Builder( transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(b); + + BigQueryTableInserter inserter = new BigQueryTableInserter(b, PipelineOptionsFactory.create()); TableReference t = new TableReference() .setProjectId("project").setDatasetId("dataset").setTableId("table");