Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 79D7B200C68 for ; Wed, 19 Apr 2017 07:11:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 78863160BAF; Wed, 19 Apr 2017 05:11:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A97A2160BAC for ; Wed, 19 Apr 2017 07:11:01 +0200 (CEST) Received: (qmail 87509 invoked by uid 500); 19 Apr 2017 05:11:00 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 87430 invoked by uid 99); 19 Apr 2017 05:11:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 05:11:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 45CB7E117B; Wed, 19 Apr 2017 05:11:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Wed, 19 Apr 2017 05:11:03 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [04/10] beam git commit: Refactor streaming write branch into separate reusable components. archived-at: Wed, 19 Apr 2017 05:11:03 -0000 Refactor streaming write branch into separate reusable components. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58ed5c7e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58ed5c7e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58ed5c7e Branch: refs/heads/master Commit: 58ed5c7ecd247f9c5e5a15deff40ffa8c800af25 Parents: 67a5f82 Author: Reuven Lax Authored: Tue Mar 28 19:34:56 2017 -0700 Committer: Eugene Kirpichov Committed: Tue Apr 18 21:12:50 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 69 ++++++------ .../beam/sdk/io/gcp/bigquery/CreateTables.java | 100 +++++++++++++++++ .../io/gcp/bigquery/GenerateShardedTable.java | 48 ++++++++ .../beam/sdk/io/gcp/bigquery/PrepareWrite.java | 65 ++++++----- .../sdk/io/gcp/bigquery/StreamWithDeDup.java | 90 --------------- .../sdk/io/gcp/bigquery/StreamingInserts.java | 110 +++++++++++++++++++ .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 82 +------------- .../sdk/io/gcp/bigquery/TableDestination.java | 48 +++++++- .../io/gcp/bigquery/TableDestinationCoder.java | 64 +++++++++++ .../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 71 ++++++++++++ .../gcp/bigquery/TagWithUniqueIdsAndTable.java | 101 ----------------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 18 +-- 12 files changed, 521 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index af0d561..af19b83 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -40,6 +40,7 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; @@ -60,6 +61,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; @@ -67,6 +69,7 @@ import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -681,8 +684,8 @@ public class BigQueryIO { static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; @Nullable abstract ValueProvider getJsonTableRef(); - @Nullable abstract SerializableFunction, TableReference> - getTableRefFunction(); + @Nullable abstract SerializableFunction, TableDestination> + getTableFunction(); @Nullable abstract SerializableFunction getFormatFunction(); /** Table schema. The schema is required only if the table does not exist. */ @Nullable abstract ValueProvider getJsonSchema(); @@ -783,7 +786,7 @@ public class BigQueryIO { private void ensureToNotCalledYet() { checkState( getJsonTableRef() == null && getTable() == null - && getTableRefFunction() == null, "to() already called"); + && getTableFunction() == null, "to() already called"); } /** @@ -802,13 +805,16 @@ public class BigQueryIO { /** Same as {@link #to(String)}, but with a {@link ValueProvider}. */ public Write to(ValueProvider tableSpec) { ensureToNotCalledYet(); + String tableDescription = getTableDescription(); + if (tableDescription == null) { + tableDescription = ""; + } return toBuilder() .setJsonTableRef( NestedValueProvider.of( NestedValueProvider.of(tableSpec, new TableSpecToTableRef()), new TableRefToJson())) - .setTableRefFunction(new TranslateTableSpecFunction( - new ConstantTableSpecFunction(tableSpec))) + .setTableFunction(new ConstantTableFunction(tableSpec, tableDescription)) .build(); } @@ -819,6 +825,8 @@ public class BigQueryIO { public Write to( SerializableFunction, String> tableSpecFunction) { return toTableReference(new TranslateTableSpecFunction(tableSpecFunction)); + ensureToNotCalledYet(); + return toBuilder().setTableFunction(tableFunction).build(); } /** @@ -828,7 +836,7 @@ public class BigQueryIO { private Write toTableReference( SerializableFunction, TableReference> tableRefFunction) { ensureToNotCalledYet(); - return toBuilder().setTableRefFunction(tableRefFunction).build(); + return toBuilder().setTableFunction(tableFunction).build(); } /** @@ -838,32 +846,19 @@ public class BigQueryIO { return toBuilder().setFormatFunction(formatFunction).build(); } - private static class TranslateTableSpecFunction implements - SerializableFunction, TableReference> { - private SerializableFunction, String> tableSpecFunction; - - TranslateTableSpecFunction( - SerializableFunction, String> tableSpecFunction) { - this.tableSpecFunction = tableSpecFunction; - } + static class ConstantTableFunction implements + SerializableFunction, TableDestination> { + private final ValueProvider tableSpec; + private final String tableDescription; - @Override - public TableReference apply(ValueInSingleWindow value) { - return BigQueryHelpers.parseTableSpec(tableSpecFunction.apply(value)); - } - } - - static class ConstantTableSpecFunction implements - SerializableFunction, String> { - private ValueProvider tableSpec; - - ConstantTableSpecFunction(ValueProvider tableSpec) { + ConstantTableFunction(ValueProvider tableSpec, String tableDescription) { this.tableSpec = tableSpec; + this.tableDescription = tableDescription; } @Override - public String apply(ValueInSingleWindow value) { - return tableSpec.get(); + public TableDestination apply(ValueInSingleWindow value) { + return new TableDestination(tableSpec.get(), tableDescription); } } @@ -919,7 +914,7 @@ public class BigQueryIO { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); // Exactly one of the table and table reference can be configured. - checkState(getTableRefFunction() != null, + checkState(getTableFunction() != null, "must set the table reference of a BigQueryIO.Write transform"); checkArgument(getFormatFunction() != null, @@ -978,10 +973,16 @@ public class BigQueryIO { @Override public WriteResult expand(PCollection input) { + PCollection> rowsWithDestination = + input.apply("PrepareWrite", ParDo.of( + new PrepareWrite(getTableFunction(), getFormatFunction()))) + .setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of())); + + // When writing an Unbounded PCollection, or when a tablespec function is defined, we use - // StreamWithDeDup and BigQuery's streaming import API. + // StreamingInserts and BigQuery's streaming import API. if (input.isBounded() == IsBounded.UNBOUNDED) { - return input.apply(new StreamWithDeDup(this)); + return rowsWithDestination.apply(new StreamingInserts(this)); } else { return input.apply(new BatchLoadBigQuery(this)); } @@ -1002,8 +1003,8 @@ public class BigQueryIO { .addIfNotNull(DisplayData.item("schema", getJsonSchema()) .withLabel("Table Schema")); - if (getTableRefFunction() != null) { - builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass()) + if (getTableFunction() != null) { + builder.add(DisplayData.item("tableFn", getTableFunction().getClass()) .withLabel("Table Reference Function")); } @@ -1025,7 +1026,7 @@ public class BigQueryIO { } /** - * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}. + * Returns the table to write, or {@code null} if writing with {@code tableFunction}. * *

If the table's project is not specified, use the executing project. */ @@ -1066,7 +1067,7 @@ public class BigQueryIO { */ @VisibleForTesting static void clearCreatedTables() { - StreamingWriteFn.clearCreatedTables(); + CreateTables.clearCreatedTables(); } ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java new file mode 100644 index 0000000..e216553 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -0,0 +1,100 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; + + +/** + * Creates any tables needed before performing streaming writes to the tables. This is a + * side-effect {l@ink DoFn}, and returns the original collection unchanged. + */ +public class CreateTables extends DoFn, + KV> { + private final CreateDisposition createDisposition; + private final BigQueryServices bqServices; + private final SerializableFunction schemaFunction; + + + /** The list of tables created so far, so we don't try the creation + each time. + * TODO: We should put a bound on memory usage of this. Use guava cache instead. + */ + private static Set createdTables = + Collections.newSetFromMap(new ConcurrentHashMap()); + + public CreateTables(CreateDisposition createDisposition, BigQueryServices bqServices, + SerializableFunction schemaFunction) { + this.createDisposition = createDisposition; + this.bqServices = bqServices; + this.schemaFunction = schemaFunction; + } + + @ProcessElement + public void processElement(ProcessContext context) throws InterruptedException, IOException { + BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); + possibleCreateTable(options, context.element().getKey()); + context.output(context.element()); + } + + private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination) + throws InterruptedException, IOException { + String tableSpec = tableDestination.getTableSpec(); + TableReference tableReference = tableDestination.getTableReference(); + String tableDescription = tableDestination.getTableDescription(); + if (createDisposition != createDisposition.CREATE_NEVER + && !createdTables.contains(tableSpec)) { + synchronized (createdTables) { + // Another thread may have succeeded in creating the table in the meanwhile, so + // check again. This check isn't needed for correctness, but we add it to prevent + // every thread from attempting a create and overwhelming our BigQuery quota. + DatasetService datasetService = bqServices.getDatasetService(options); + if (!createdTables.contains(tableSpec)) { + TableSchema tableSchema = schemaFunction.apply(tableDestination); + if (datasetService.getTable(tableReference) == null) { + datasetService.createTable( + new Table() + .setTableReference(tableReference) + .setSchema(tableSchema) + .setDescription(tableDescription)); + } + createdTables.add(tableSpec); + } + } + } + } + + static void clearCreatedTables() { + synchronized (createdTables) { + createdTables.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java new file mode 100644 index 0000000..da3a70a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java @@ -0,0 +1,48 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; + +/** + * Given a write to a specific table, assign that to one of the + * {@link GenerateShardedTable#numShards} keys assigned to that table. + */ +class GenerateShardedTable extends DoFn, + KV, TableRow>> { + private final int numShards; + + GenerateShardedTable(int numShards) { + this.numShards = numShards; + } + + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) throws IOException { + ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); + // We output on keys 0-50 to ensure that there's enough batching for + // BigQuery. + String tableSpec = context.element().getKey().getTableSpec(); + context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, numShards)), + context.element().getValue())); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java index 0c08e18..7712417 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java @@ -1,3 +1,20 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; @@ -6,8 +23,6 @@ import com.google.common.base.Strings; import java.io.IOException; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; @@ -15,44 +30,38 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.ValueInSingleWindow; /** - * Prepare an input {@link PCollection} for writing to BigQuery. Use the table-reference + * Prepare an input {@link PCollection} for writing to BigQuery. Use the table-reference * function to determine which tables each element is written to, and format the element into a * {@link TableRow} using the user-supplied format function. */ -public class PrepareWrite extends PTransform, PCollection>> { - private static final String NAME = "PrepareWrite"; - private SerializableFunction, TableReference> tableRefFunction; +public class PrepareWrite extends DoFn> { + private SerializableFunction, TableDestination> tableFunction; private SerializableFunction formatFunction; - public PrepareWrite(SerializableFunction, TableReference> tableRefFunction, + public PrepareWrite(SerializableFunction, TableDestination> tableFunction, SerializableFunction formatFunction) { - super(NAME); - this.tableRefFunction = tableRefFunction; + this.tableFunction = tableFunction; this.formatFunction = formatFunction; } - @Override - public PCollection> expand(PCollection input) { - PCollection> elementsByTable = - input.apply(ParDo.of(new DoFn>() { - @ProcessElement - public void processElement(ProcessContext context, BoundedWindow window) throws IOException { - String tableSpec = tableSpecFromWindowedValue( - context.getPipelineOptions().as(BigQueryOptions.class), - ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); - TableRow tableRow = formatFunction.apply(context.element()); - context.output(KV.of(tableSpec, tableRow)); - } - })); - return elementsByTable; + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) throws IOException { + TableDestination tableDestination = tableSpecFromWindowedValue( + context.getPipelineOptions().as(BigQueryOptions.class), + ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); + TableRow tableRow = formatFunction.apply(context.element()); + context.output(KV.of(tableDestination, tableRow)); } - private String tableSpecFromWindowedValue(BigQueryOptions options, + private TableDestination tableSpecFromWindowedValue(BigQueryOptions options, ValueInSingleWindow value) { - TableReference table = tableRefFunction.apply(value); - if (Strings.isNullOrEmpty(table.getProjectId())) { - table.setProjectId(options.getProject()); + TableDestination tableDestination = tableFunction.apply(value); + TableReference tableReference = tableDestination.getTableReference(); + if (Strings.isNullOrEmpty(tableReference.getProjectId())) { + tableReference.setProjectId(options.getProject()); + tableDestination = new TableDestination(tableReference, + tableDestination.getTableDescription()); } - return BigQueryHelpers.toTableSpec(table); + return tableDestination; } } http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java deleted file mode 100644 index 506a564..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableSchema; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.Reshuffle; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** -* PTransform that performs streaming BigQuery write. To increase consistency, -* it leverages BigQuery best effort de-dup mechanism. - */ -class StreamWithDeDup extends PTransform, WriteResult> { - private final Write write; - - /** Constructor. */ - StreamWithDeDup(Write write) { - this.write = write; - } - - @Override - protected Coder getDefaultOutputCoder() { - return VoidCoder.of(); - } - - @Override - public WriteResult expand(PCollection input) { - // A naive implementation would be to simply stream data directly to BigQuery. - // However, this could occasionally lead to duplicated data, e.g., when - // a VM that runs this code is restarted and the code is re-run. - - // The above risk is mitigated in this implementation by relying on - // BigQuery built-in best effort de-dup mechanism. - - // To use this mechanism, each input TableRow is tagged with a generated - // unique id, which is then passed to BigQuery and used to ignore duplicates. - - PCollection, TableRowInfo>> tagged = - input.apply(ParDo.of(new TagWithUniqueIdsAndTable( - input.getPipeline().getOptions().as(BigQueryOptions.class), write))); - - // To prevent having the same TableRow processed more than once with regenerated - // different unique ids, this implementation relies on "checkpointing", which is - // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, - // performed by Reshuffle. - NestedValueProvider schema = - write.getJsonSchema() == null - ? null - : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema()); - tagged - .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) - .apply(Reshuffle., TableRowInfo>of()) - .apply( - ParDo.of( - new StreamingWriteFn( - schema, - write.getCreateDisposition(), - write.getTableDescription(), - write.getBigQueryServices()))); - - return WriteResult.in(input.getPipeline()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java new file mode 100644 index 0000000..37afbdf --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.Reshuffle; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** +* PTransform that performs streaming BigQuery write. To increase consistency, +* it leverages BigQuery best effort de-dup mechanism. + */ + +class StreamingInserts + extends PTransform>, WriteResult> { + private final Write write; + + private static class ConstantSchemaFunction implements + SerializableFunction { + private final @Nullable String jsonSchema; + + ConstantSchemaFunction(TableSchema schema) { + this.jsonSchema = BigQueryHelpers.toJsonString(schema); + } + + @Override + @Nullable + public TableSchema apply(TableDestination table) { + return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); + } + } + + /** Constructor. */ + StreamingInserts(Write write) { + this.write = write; + } + + @Override + protected Coder getDefaultOutputCoder() { + return VoidCoder.of(); + } + + @Override + public WriteResult expand(PCollection> input) { + // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant + // schema function here. If no schema is specified, this function will return null. + SerializableFunction schemaFunction = + new ConstantSchemaFunction(write.getSchema()); + + // A naive implementation would be to simply stream data directly to BigQuery. + // However, this could occasionally lead to duplicated data, e.g., when + // a VM that runs this code is restarted and the code is re-run. + + // The above risk is mitigated in this implementation by relying on + // BigQuery built-in best effort de-dup mechanism. + + // To use this mechanism, each input TableRow is tagged with a generated + // unique id, which is then passed to BigQuery and used to ignore duplicates. + PCollection, TableRowInfo>> tagged = input + .apply("CreateTables", ParDo.of(new CreateTables(write.getCreateDisposition(), + write.getBigQueryServices(), schemaFunction))) + // We create 50 keys per BigQuery table to generate output on. This is few enough that we + // get good batching into BigQuery's insert calls, and enough that we can max out the + // streaming insert quota. + .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable(50))) + .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of())) + .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds())); + + // To prevent having the same TableRow processed more than once with regenerated + // different unique ids, this implementation relies on "checkpointing", which is + // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, + // performed by Reshuffle. + tagged + .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of())) + .apply(Reshuffle., TableRowInfo>of()) + .apply("StreamingWrite", + ParDo.of( + new StreamingWriteFn(write.getBigQueryServices()))); + + return WriteResult.in(input.getPipeline()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java index 1d93fa3..83ed3d2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java @@ -18,28 +18,16 @@ package org.apache.beam.sdk.io.gcp.bigquery; -import static com.google.common.base.Preconditions.checkNotNull; -import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.SystemDoFnInternal; @@ -52,48 +40,19 @@ import org.apache.beam.sdk.values.KV; @VisibleForTesting class StreamingWriteFn extends DoFn, TableRowInfo>, Void> { - /** TableSchema in JSON. Use String to make the class Serializable. */ - @Nullable - private final ValueProvider jsonTableSchema; - - @Nullable private final String tableDescription; - private final BigQueryServices bqServices; /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ private transient Map> tableRows; - private final Write.CreateDisposition createDisposition; - /** The list of unique ids for each BigQuery table row. */ private transient Map> uniqueIdsForTableRows; - /** The list of tables created so far, so we don't try the creation - each time. */ - private static Set createdTables = - Collections.newSetFromMap(new ConcurrentHashMap()); - /** Tracks bytes written, exposed as "ByteCount" Counter. */ private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount"); - /** Constructor. */ - StreamingWriteFn(@Nullable ValueProvider schema, - Write.CreateDisposition createDisposition, - @Nullable String tableDescription, BigQueryServices bqServices) { - this.jsonTableSchema = schema == null ? null : - NestedValueProvider.of(schema, new TableSchemaToJsonSchema()); - this.createDisposition = createDisposition; - this.bqServices = checkNotNull(bqServices, "bqServices"); - this.tableDescription = tableDescription; - } - - /** - * Clear the cached map of created tables. Used for testing. - */ - static void clearCreatedTables() { - synchronized (createdTables) { - createdTables.clear(); - } + StreamingWriteFn(BigQueryServices bqServices) { + this.bqServices = bqServices; } /** Prepares a target BigQuery table. */ @@ -119,9 +78,8 @@ class StreamingWriteFn @FinishBundle public void finishBundle(Context context) throws Exception { BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); - for (Map.Entry> entry : tableRows.entrySet()) { - TableReference tableReference = getOrCreateTable(options, entry.getKey()); + TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey()); flushRows(tableReference, entry.getValue(), uniqueIdsForTableRows.get(entry.getKey()), options); } @@ -132,39 +90,6 @@ class StreamingWriteFn @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - - builder - .addIfNotNull(DisplayData.item("schema", jsonTableSchema) - .withLabel("Table Schema")) - .addIfNotNull(DisplayData.item("tableDescription", tableDescription) - .withLabel("Table Description")); - } - - public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) - throws InterruptedException, IOException { - TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec); - if (createDisposition != createDisposition.CREATE_NEVER - && !createdTables.contains(tableSpec)) { - synchronized (createdTables) { - // Another thread may have succeeded in creating the table in the meanwhile, so - // check again. This check isn't needed for correctness, but we add it to prevent - // every thread from attempting a create and overwhelming our BigQuery quota. - DatasetService datasetService = bqServices.getDatasetService(options); - if (!createdTables.contains(tableSpec)) { - if (datasetService.getTable(tableReference) == null) { - TableSchema tableSchema = BigQueryIO.JSON_FACTORY.fromString( - jsonTableSchema.get(), TableSchema.class); - datasetService.createTable( - new Table() - .setTableReference(tableReference) - .setSchema(tableSchema) - .setDescription(tableDescription)); - } - createdTables.add(tableSpec); - } - } - } - return tableReference; } /** @@ -173,6 +98,7 @@ class StreamingWriteFn private void flushRows(TableReference tableReference, List tableRows, List uniqueIds, BigQueryOptions options) throws InterruptedException { + System.out.println("FlUSHING ROWS " + tableRows.size()); if (!tableRows.isEmpty()) { try { long totalBytes = bqServices.getDatasetService(options).insertAll( http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 3cbbf3b..631afeb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -1,7 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.api.services.bigquery.model.TableReference; + /** - * Created by relax on 3/28/17. + * Encapsulates a BigQuery table destination. */ public class TableDestination { + private final String tableSpec; + private final String tableDescription; + + + public TableDestination(String tableSpec, String tableDescription) { + this.tableSpec = tableSpec; + this.tableDescription = tableDescription; + } + + public TableDestination(TableReference tableReference, String tableDescription) { + this.tableSpec = BigQueryHelpers.toTableSpec(tableReference); + this.tableDescription = tableDescription; + } + + public String getTableSpec() { + return tableSpec; + } + + + public TableReference getTableReference() { + return BigQueryHelpers.parseTableSpec(tableSpec); + } + + public String getTableDescription() { + return tableDescription; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java new file mode 100644 index 0000000..fa24700 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -0,0 +1,64 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.fasterxml.jackson.annotation.JsonCreator; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** + * A coder for {@link TableDestination} objects. + */ +public class TableDestinationCoder extends AtomicCoder { + private static final TableDestinationCoder INSTANCE = new TableDestinationCoder(); + + + @JsonCreator + public static TableDestinationCoder of() { + return INSTANCE; + } + + @Override + public void encode(TableDestination value, OutputStream outStream, Context context) + throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null value"); + } + stringCoder.encode(value.getTableSpec(), outStream, context.nested()); + stringCoder.encode(value.getTableDescription(), outStream, context); + } + + @Override + public TableDestination decode(InputStream inStream, Context context) throws IOException { + return new TableDestination( + stringCoder.decode(inStream, context.nested()), + stringCoder.decode(inStream, context.nested())); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + return; + } + + StringUtf8Coder stringCoder = StringUtf8Coder.of(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java new file mode 100644 index 0000000..6f0186e --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.ValueInSingleWindow; + +/** + * Fn that tags each table row with a unique id and destination table. + * To avoid calling UUID.randomUUID() for each element, which can be costly, + * a randomUUID is generated only once per bucket of data. The actual unique + * id is created by concatenating this randomUUID with a sequential number. + */ +@VisibleForTesting +class TagWithUniqueIds + extends DoFn, TableRow>, KV, TableRowInfo>> { + + private transient String randomUUID; + private transient long sequenceNo = 0L; + + @StartBundle + public void startBundle(Context context) { + randomUUID = UUID.randomUUID().toString(); + } + + /** Tag the input with a unique id. */ + @ProcessElement + public void processElement(ProcessContext context, BoundedWindow window) throws IOException { + String uniqueId = randomUUID + sequenceNo++; + // We output on keys 0-50 to ensure that there's enough batching for + // BigQuery. + context.output(KV.of(context.element().getKey(), + new TableRowInfo(context.element().getValue(), uniqueId))); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java deleted file mode 100644 index 4e50f7c..0000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableReference; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; -import java.io.IOException; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; -import org.apache.beam.sdk.options.BigQueryOptions; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.ValueInSingleWindow; - -/** - * Fn that tags each table row with a unique id and destination table. - * To avoid calling UUID.randomUUID() for each element, which can be costly, - * a randomUUID is generated only once per bucket of data. The actual unique - * id is created by concatenating this randomUUID with a sequential number. - */ -@VisibleForTesting -class TagWithUniqueIdsAndTable - extends DoFn, TableRowInfo>> { - /** TableSpec to write to in the case of a single static destination. */ - private ValueProvider tableSpec = null; - - private final Write write; - - private transient String randomUUID; - private transient long sequenceNo = 0L; - - TagWithUniqueIdsAndTable(BigQueryOptions options, - Write write) { - ValueProvider table = write.getTableWithDefaultProject( - options.as(BigQueryOptions.class)); - if (table != null) { - this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec()); - } - this.write = write; - } - - - @StartBundle - public void startBundle(Context context) { - randomUUID = UUID.randomUUID().toString(); - } - - /** Tag the input with a unique id. */ - @ProcessElement - public void processElement(ProcessContext context, BoundedWindow window) throws IOException { - String uniqueId = randomUUID + sequenceNo++; - ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); - String tableSpec = tableSpecFromWindowedValue( - context.getPipelineOptions().as(BigQueryOptions.class), - ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane())); - // We output on keys 0-50 to ensure that there's enough batching for - // BigQuery. - context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), - new TableRowInfo(write.getFormatFunction().apply(context.element()), uniqueId))); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder.addIfNotNull(DisplayData.item("table", tableSpec)); - builder.add(DisplayData.item("tableFn", write.getTableRefFunction().getClass()) - .withLabel("Table Reference Function")); - } - - @VisibleForTesting - ValueProvider getTableSpec() { - return tableSpec; - } - - - -} http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 499aa74..d953edd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -518,7 +518,6 @@ public class BigQueryIOTest implements Serializable { /** A fake dataset service that can be serialized, for use in testReadFromTable. */ private static class FakeDatasetService implements DatasetService, Serializable { - @Override public Table getTable(TableReference tableRef) throws InterruptedException, IOException { @@ -630,6 +629,7 @@ public class BigQueryIOTest implements Serializable { TableContainer tableContainer = getTableContainer( ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); for (int i = 0; i < rowList.size(); ++i) { + System.out.println("adding row " + rowList.get(i)); tableContainer.addRow(rowList.get(i), insertIdList.get(i)); dataSize += rowList.get(i).toString().length(); } @@ -1121,15 +1121,15 @@ public class BigQueryIOTest implements Serializable { } ); - SerializableFunction, String> tableFunction = - new SerializableFunction, String>() { + SerializableFunction, TableDestination> tableFunction = + new SerializableFunction, TableDestination>() { @Override - public String apply(ValueInSingleWindow input) { + public TableDestination apply(ValueInSingleWindow input) { PartitionedGlobalWindow window = (PartitionedGlobalWindow) input.getWindow(); // Check that we can access the element as well here. checkArgument(window.value.equals(Integer.toString(input.getValue() % 5)), "Incorrect element"); - return "project-id:dataset-id.table-id-" + window.value; + return new TableDestination("project-id:dataset-id.table-id-" + window.value, ""); } }; @@ -1559,14 +1559,6 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testStreamingWriteFnCreateNever() throws Exception { - StreamingWriteFn fn = new StreamingWriteFn( - null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices()); - assertEquals(BigQueryHelpers.parseTableSpec("dataset.table"), - fn.getOrCreateTable(null, "dataset.table")); - } - - @Test public void testCreateNeverWithStreaming() throws Exception { BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); options.setProject("project");