From commits-return-114497-archive-asf-public=cust-asf.ponee.io@beam.apache.org Tue Jul 27 17:20:25 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 355E6180648 for ; Tue, 27 Jul 2021 19:20:25 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 7B9B36025A for ; Tue, 27 Jul 2021 17:20:24 +0000 (UTC) Received: (qmail 5376 invoked by uid 500); 27 Jul 2021 17:20:23 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 5367 invoked by uid 99); 27 Jul 2021 17:20:23 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2021 17:20:23 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A7AB581F23; Tue, 27 Jul 2021 17:20:23 +0000 (UTC) Date: Tue, 27 Jul 2021 17:20:19 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: [BEAM-7195] BQ BatchLoads doesn't always create new tables (#14238) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <162740641462.28160.16595865342441217469@gitbox.apache.org> From: altay@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 0e12dd0b7ce347092294fdeb7fb72ec6bd944fb8 X-Git-Newrev: 69822e417fe4b582a73c45c5780f4ff69841d5db X-Git-Rev: 69822e417fe4b582a73c45c5780f4ff69841d5db X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 69822e4 [BEAM-7195] BQ BatchLoads doesn't always create new tables (#14238) 69822e4 is described below commit 69822e417fe4b582a73c45c5780f4ff69841d5db Author: reuvenlax AuthorDate: Tue Jul 27 10:19:30 2021 -0700 [BEAM-7195] BQ BatchLoads doesn't always create new tables (#14238) --- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 67 +++++++++-------- .../io/gcp/bigquery/BigQueryResourceNaming.java | 1 + .../beam/sdk/io/gcp/bigquery/WritePartition.java | 58 ++++++++++++--- .../beam/sdk/io/gcp/bigquery/WriteRename.java | 34 ++++++--- .../beam/sdk/io/gcp/bigquery/WriteTables.java | 87 +++++++++++++++++----- .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 60 ++++++++++----- 6 files changed, 222 insertions(+), 85 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 15ea5c0..16b96bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -30,10 +30,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; @@ -147,8 +145,8 @@ class BatchLoads private ValueProvider loadJobProjectId; private final Coder elementCoder; private final RowWriterFactory rowWriterFactory; - private String kmsKey; - private boolean clusteringEnabled; + private final String kmsKey; + private final boolean clusteringEnabled; // The maximum number of times to retry failed load or copy jobs. private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS; @@ -274,6 +272,8 @@ class BatchLoads private WriteResult expandTriggered(PCollection> input) { Pipeline p = input.getPipeline(); final PCollectionView loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD); + final PCollectionView tempLoadJobIdPrefixView = + createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD); final PCollectionView copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY); final PCollectionView tempFilePrefixView = createTempFilePrefixView(p, loadJobIdPrefixView); @@ -321,9 +321,9 @@ class BatchLoads .plusDelayOf(triggeringFrequency))) .discardingFiredPanes()); - TupleTag, List>> multiPartitionsTag = + TupleTag, WritePartition.Result>> multiPartitionsTag = new TupleTag<>("multiPartitionsTag"); - TupleTag, List>> singlePartitionTag = + TupleTag, WritePartition.Result>> singlePartitionTag = new TupleTag<>("singlePartitionTag"); // If we have non-default triggered output, we can't use the side-input technique used in @@ -331,10 +331,10 @@ class BatchLoads // determinism. PCollectionTuple partitions = results - .apply("AttachSingletonKey", WithKeys.of((Void) null)) + .apply("AttachDestinationKey", WithKeys.of(result -> result.destination)) .setCoder( - KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder))) - .apply("GroupOntoSingleton", GroupByKey.create()) + KvCoder.of(destinationCoder, WriteBundlesToFiles.ResultCoder.of(destinationCoder))) + .apply("GroupFilesByDestination", GroupByKey.create()) .apply("ExtractResultValues", Values.create()) .apply( "WritePartitionTriggered", @@ -350,14 +350,14 @@ class BatchLoads rowWriterFactory)) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - PCollection> tempTables = - writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView); + PCollection> tempTables = + writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); tempTables // Now that the load job has happened, we want the rename to happen immediately. .apply( "Window Into Global Windows", - Window.>into(new GlobalWindows()) + Window.>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))) .apply("Add Void Key", WithKeys.of((Void) null)) .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder())) @@ -382,6 +382,9 @@ class BatchLoads public WriteResult expandUntriggered(PCollection> input) { Pipeline p = input.getPipeline(); final PCollectionView loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD); + final PCollectionView tempLoadJobIdPrefixView = + createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD); + final PCollectionView copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY); final PCollectionView tempFilePrefixView = createTempFilePrefixView(p, loadJobIdPrefixView); PCollection> inputInGlobalWindow = @@ -395,10 +398,10 @@ class BatchLoads ? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, tempFilePrefixView) : writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView); - TupleTag, List>> multiPartitionsTag = - new TupleTag, List>>("multiPartitionsTag") {}; - TupleTag, List>> singlePartitionTag = - new TupleTag, List>>("singlePartitionTag") {}; + TupleTag, WritePartition.Result>> multiPartitionsTag = + new TupleTag, WritePartition.Result>>("multiPartitionsTag") {}; + TupleTag, WritePartition.Result>> singlePartitionTag = + new TupleTag, WritePartition.Result>>("singlePartitionTag") {}; // This transform will look at the set of files written for each table, and if any table has // too many files or bytes, will partition that table's files into multiple partitions for @@ -421,8 +424,8 @@ class BatchLoads rowWriterFactory)) .withSideInputs(tempFilePrefixView) .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))); - PCollection> tempTables = - writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView); + PCollection> tempTables = + writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView); tempTables .apply("ReifyRenameInput", new ReifyAsIterable<>()) @@ -431,7 +434,7 @@ class BatchLoads ParDo.of( new WriteRename( bigQueryServices, - loadJobIdPrefixView, + copyJobIdPrefixView, writeDisposition, createDisposition, maxRetryJobs, @@ -637,23 +640,22 @@ class BatchLoads .apply( "WriteGroupedRecords", ParDo.of( - new WriteGroupedRecordsToFiles( - tempFilePrefix, maxFileSize, rowWriterFactory)) + new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory)) .withSideInputs(tempFilePrefix)) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); } // Take in a list of files and write them to temporary tables. - private PCollection> writeTempTables( - PCollection, List>> input, + private PCollection> writeTempTables( + PCollection, WritePartition.Result>> input, PCollectionView jobIdTokenView) { List> sideInputs = Lists.newArrayList(jobIdTokenView); sideInputs.addAll(dynamicDestinations.getSideInputs()); - Coder, List>> partitionsCoder = + Coder, WritePartition.Result>> partitionsCoder = KvCoder.of( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), - ListCoder.of(StringUtf8Coder.of())); + WritePartition.ResultCoder.INSTANCE); // If the final destination table exists already (and we're appending to it), then the temp // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object @@ -695,20 +697,24 @@ class BatchLoads rowWriterFactory.getSourceFormat(), useAvroLogicalTypes, schemaUpdateOptions)) - .setCoder(KvCoder.of(tableDestinationCoder, StringUtf8Coder.of())); + .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); } // In the case where the files fit into a single load job, there's no need to write temporary // tables and rename. We can load these files directly into the target BigQuery table. void writeSinglePartition( - PCollection, List>> input, + PCollection, WritePartition.Result>> input, PCollectionView loadJobIdPrefixView) { List> sideInputs = Lists.newArrayList(loadJobIdPrefixView); sideInputs.addAll(dynamicDestinations.getSideInputs()); - Coder, List>> partitionsCoder = + + Coder tableDestinationCoder = + clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); + + Coder, WritePartition.Result>> partitionsCoder = KvCoder.of( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), - ListCoder.of(StringUtf8Coder.of())); + WritePartition.ResultCoder.INSTANCE); // Write single partition to final table input .setCoder(partitionsCoder) @@ -731,7 +737,8 @@ class BatchLoads kmsKey, rowWriterFactory.getSourceFormat(), useAvroLogicalTypes, - schemaUpdateOptions)); + schemaUpdateOptions)) + .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE)); } private WriteResult writeResult(Pipeline p) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java index 7e800fd..7eae6fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java @@ -69,6 +69,7 @@ class BigQueryResourceNaming { public enum JobType { LOAD, + TEMP_TABLE_LOAD, COPY, EXPORT, QUERY, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index cd4f163..e1e0566 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -17,8 +17,17 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.KV; @@ -39,7 +48,32 @@ import org.checkerframework.checker.nullness.qual.Nullable; class WritePartition extends DoFn< Iterable>, - KV, List>> { + KV, WritePartition.Result>> { + @AutoValue + abstract static class Result { + public abstract List getFilenames(); + + abstract Boolean isFirstPane(); + } + + static class ResultCoder extends AtomicCoder { + private static final Coder> FILENAMES_CODER = ListCoder.of(StringUtf8Coder.of()); + private static final Coder FIRST_PANE_CODER = BooleanCoder.of(); + static final ResultCoder INSTANCE = new ResultCoder(); + + @Override + public void encode(Result value, OutputStream outStream) throws IOException { + FILENAMES_CODER.encode(value.getFilenames(), outStream); + FIRST_PANE_CODER.encode(value.isFirstPane(), outStream); + } + + @Override + public Result decode(InputStream inStream) throws IOException { + return new AutoValue_WritePartition_Result( + FILENAMES_CODER.decode(inStream), FIRST_PANE_CODER.decode(inStream)); + } + } + private final boolean singletonTable; private final DynamicDestinations dynamicDestinations; private final PCollectionView tempFilePrefix; @@ -47,8 +81,9 @@ class WritePartition private final long maxSizeBytes; private final RowWriterFactory rowWriterFactory; - private @Nullable TupleTag, List>> multiPartitionsTag; - private TupleTag, List>> singlePartitionTag; + private @Nullable TupleTag, WritePartition.Result>> + multiPartitionsTag; + private TupleTag, WritePartition.Result>> singlePartitionTag; private static class PartitionData { private int numFiles = 0; @@ -131,8 +166,8 @@ class WritePartition PCollectionView tempFilePrefix, int maxNumFiles, long maxSizeBytes, - TupleTag, List>> multiPartitionsTag, - TupleTag, List>> singlePartitionTag, + TupleTag, WritePartition.Result>> multiPartitionsTag, + TupleTag, WritePartition.Result>> singlePartitionTag, RowWriterFactory rowWriterFactory) { this.singletonTable = singletonTable; this.dynamicDestinations = dynamicDestinations; @@ -147,7 +182,6 @@ class WritePartition @ProcessElement public void processElement(ProcessContext c) throws Exception { List> results = Lists.newArrayList(c.element()); - // If there are no elements to write _and_ the user specified a constant output table, then // generate an empty table of that name. if (results.isEmpty() && singletonTable) { @@ -161,7 +195,8 @@ class WritePartition BigQueryRowWriter.Result writerResult = writer.getResult(); results.add( - new Result<>(writerResult.resourceId.toString(), writerResult.byteSize, destination)); + new WriteBundlesToFiles.Result<>( + writerResult.resourceId.toString(), writerResult.byteSize, destination)); } Map currentResults = Maps.newHashMap(); @@ -190,11 +225,16 @@ class WritePartition // In the fast-path case where we only output one table, the transform loads it directly // to the final table. In this case, we output on a special TupleTag so the enclosing // transform knows to skip the rename step. - TupleTag, List>> outputTag = + TupleTag, WritePartition.Result>> outputTag = (destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag; for (int i = 0; i < destinationData.getPartitions().size(); ++i) { PartitionData partitionData = destinationData.getPartitions().get(i); - c.output(outputTag, KV.of(ShardedKey.of(destination, i + 1), partitionData.getFilenames())); + c.output( + outputTag, + KV.of( + ShardedKey.of(destination, i + 1), + new AutoValue_WritePartition_Result( + partitionData.getFilenames(), c.pane().isFirst()))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index a45f6f8..80201ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap; import org.slf4j.Logger; @@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) -class WriteRename extends DoFn>, Void> { +class WriteRename extends DoFn>, Void> { private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class); private final BigQueryServices bqServices; @@ -116,12 +117,15 @@ class WriteRename extends DoFn>, Void> { } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - Multimap tempTables = ArrayListMultimap.create(); - for (KV entry : c.element()) { + public void processElement( + @Element Iterable> element, ProcessContext c) + throws Exception { + Multimap tempTables = ArrayListMultimap.create(); + for (KV entry : element) { tempTables.put(entry.getKey(), entry.getValue()); } - for (Map.Entry> entry : tempTables.asMap().entrySet()) { + for (Map.Entry> entry : + tempTables.asMap().entrySet()) { // Process each destination table. // Do not copy if no temp tables are provided. if (!entry.getValue().isEmpty()) { @@ -165,17 +169,27 @@ class WriteRename extends DoFn>, Void> { } private PendingJobData startWriteRename( - TableDestination finalTableDestination, Iterable tempTableNames, ProcessContext c) + TableDestination finalTableDestination, + Iterable tempTableNames, + ProcessContext c) throws Exception { + // The pane may have advanced either here due to triggering or due to an upstream trigger. We + // check the upstream + // trigger to handle the case where an earlier pane triggered the single-partition path. If this + // happened, then the + // table will already exist so we want to append to the table. + boolean isFirstPane = + Iterables.getFirst(tempTableNames, null).isFirstPane() && c.pane().isFirst(); WriteDisposition writeDisposition = - (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; + isFirstPane ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; CreateDisposition createDisposition = - (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; + isFirstPane ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; List tempTables = StreamSupport.stream(tempTableNames.spliterator(), false) - .map(table -> BigQueryHelpers.fromJsonString(table, TableReference.class)) + .map( + result -> + BigQueryHelpers.fromJsonString(result.getTableName(), TableReference.class)) .collect(Collectors.toList()); - ; // Make sure each destination table gets a unique job id. String jobIdPrefix = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 465c924..32ed1fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -26,11 +26,17 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.auto.value.AutoValue; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; @@ -68,7 +74,10 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,8 +98,35 @@ import org.slf4j.LoggerFactory; }) class WriteTables extends PTransform< - PCollection, List>>, - PCollection>> { + PCollection, WritePartition.Result>>, + PCollection>> { + @AutoValue + abstract static class Result { + abstract String getTableName(); + + abstract Boolean isFirstPane(); + } + + static class ResultCoder extends AtomicCoder { + static final ResultCoder INSTANCE = new ResultCoder(); + + @Override + public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull + @Initialized IOException { + StringUtf8Coder.of().encode(value.getTableName(), outStream); + BooleanCoder.of().encode(value.isFirstPane(), outStream); + } + + @Override + public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) + throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull + @Initialized IOException { + return new AutoValue_WriteTables_Result( + StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); + } + } + private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class); private final boolean tempTable; @@ -101,7 +137,7 @@ class WriteTables private final Set schemaUpdateOptions; private final DynamicDestinations dynamicDestinations; private final List> sideInputs; - private final TupleTag> mainOutputTag; + private final TupleTag> mainOutputTag; private final TupleTag temporaryFilesTag; private final ValueProvider loadJobProjectId; private final int maxRetryJobs; @@ -113,7 +149,9 @@ class WriteTables private @Nullable JobService jobService; private class WriteTablesDoFn - extends DoFn, List>, KV> { + extends DoFn< + KV, WritePartition.Result>, KV> { + private Map jsonSchemas = Maps.newHashMap(); // Represents a pending BigQuery load job. @@ -123,18 +161,21 @@ class WriteTables final List partitionFiles; final TableDestination tableDestination; final TableReference tableReference; + final boolean isFirstPane; public PendingJobData( BoundedWindow window, BigQueryHelpers.PendingJob retryJob, List partitionFiles, TableDestination tableDestination, - TableReference tableReference) { + TableReference tableReference, + boolean isFirstPane) { this.window = window; this.retryJob = retryJob; this.partitionFiles = partitionFiles; this.tableDestination = tableDestination; this.tableReference = tableReference; + this.isFirstPane = isFirstPane; } } // All pending load jobs. @@ -149,7 +190,11 @@ class WriteTables } @ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) throws Exception { + public void processElement( + @Element KV, WritePartition.Result> element, + ProcessContext c, + BoundedWindow window) + throws Exception { dynamicDestinations.setSideInputAccessorFromProcessContext(c); DestinationT destination = c.element().getKey().getKey(); TableSchema tableSchema; @@ -199,8 +244,8 @@ class WriteTables tableDestination = tableDestination.withTableReference(tableReference); } - Integer partition = c.element().getKey().getShardNumber(); - List partitionFiles = Lists.newArrayList(c.element().getValue()); + Integer partition = element.getKey().getShardNumber(); + List partitionFiles = Lists.newArrayList(element.getValue().getFilenames()); String jobIdPrefix = BigQueryResourceNaming.createJobIdWithDestination( c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); @@ -212,7 +257,7 @@ class WriteTables WriteDisposition writeDisposition = firstPaneWriteDisposition; CreateDisposition createDisposition = firstPaneCreateDisposition; - if (c.pane().getIndex() > 0 && !tempTable) { + if (!element.getValue().isFirstPane() && !tempTable) { // If writing directly to the destination, then the table is created on the first write // and we should change the disposition for subsequent writes. writeDisposition = WriteDisposition.WRITE_APPEND; @@ -238,7 +283,13 @@ class WriteTables createDisposition, schemaUpdateOptions); pendingJobs.add( - new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference)); + new PendingJobData( + window, + retryJob, + partitionFiles, + tableDestination, + tableReference, + element.getValue().isFirstPane())); } @Teardown @@ -284,7 +335,7 @@ class WriteTables bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); PendingJobManager jobManager = new PendingJobManager(); - for (PendingJobData pendingJob : pendingJobs) { + for (final PendingJobData pendingJob : pendingJobs) { jobManager = jobManager.addPendingJob( pendingJob.retryJob, @@ -299,11 +350,14 @@ class WriteTables BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), pendingJob.tableDestination.getTableDescription()); } + + Result result = + new AutoValue_WriteTables_Result( + BigQueryHelpers.toJsonString(pendingJob.tableReference), + pendingJob.isFirstPane); c.output( mainOutputTag, - KV.of( - pendingJob.tableDestination, - BigQueryHelpers.toJsonString(pendingJob.tableReference)), + KV.of(pendingJob.tableDestination, result), pendingJob.window.maxTimestamp(), pendingJob.window); for (String file : pendingJob.partitionFiles) { @@ -365,8 +419,8 @@ class WriteTables } @Override - public PCollection> expand( - PCollection, List>> input) { + public PCollection> expand( + PCollection, WritePartition.Result>> input) { PCollectionTuple writeTablesOutputs = input.apply( ParDo.of(new WriteTablesDoFn()) @@ -391,7 +445,6 @@ class WriteTables .apply(GroupByKey.create()) .apply(Values.create()) .apply(ParDo.of(new GarbageCollectTemporaryFiles())); - return writeTablesOutputs.get(mainOutputTag); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index bde803d..6799c67 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -65,6 +65,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -72,13 +73,17 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption; +import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder; +import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; @@ -1758,10 +1763,12 @@ public class BigQueryIOWriteTest implements Serializable { } } - TupleTag, List>> multiPartitionsTag = - new TupleTag, List>>("multiPartitionsTag") {}; - TupleTag, List>> singlePartitionTag = - new TupleTag, List>>("singlePartitionTag") {}; + TupleTag, WritePartition.Result>> multiPartitionsTag = + new TupleTag, WritePartition.Result>>( + "multiPartitionsTag") {}; + TupleTag, WritePartition.Result>> singlePartitionTag = + new TupleTag, WritePartition.Result>>( + "singlePartitionTag") {}; String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath(); PCollectionView tempFilePrefixView = @@ -1781,12 +1788,12 @@ public class BigQueryIOWriteTest implements Serializable { DoFnTester< Iterable>, - KV, List>> + KV, WritePartition.Result>> tester = DoFnTester.of(writePartition); tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix); tester.processElement(files); - List, List>> partitions; + List, WritePartition.Result>> partitions; if (expectedNumPartitionsPerTable > 1) { partitions = tester.takeOutputElements(multiPartitionsTag); } else { @@ -1795,12 +1802,12 @@ public class BigQueryIOWriteTest implements Serializable { List> partitionsResult = Lists.newArrayList(); Map> filesPerTableResult = Maps.newHashMap(); - for (KV, List> partition : partitions) { + for (KV, WritePartition.Result> partition : partitions) { String table = partition.getKey().getKey().getTableSpec(); partitionsResult.add(partition.getKey()); List tableFilesResult = filesPerTableResult.computeIfAbsent(table, k -> Lists.newArrayList()); - tableFilesResult.addAll(partition.getValue()); + tableFilesResult.addAll(partition.getValue().getFilenames()); } assertThat( @@ -1847,7 +1854,7 @@ public class BigQueryIOWriteTest implements Serializable { String jobIdToken = "jobId"; final Multimap expectedTempTables = ArrayListMultimap.create(); - List, List>> partitions = Lists.newArrayList(); + List, WritePartition.Result>> partitions = Lists.newArrayList(); for (int i = 0; i < numTables; ++i) { String tableName = String.format("project-id:dataset-id.table%05d", i); TableDestination tableDestination = new TableDestination(tableName, tableName); @@ -1869,7 +1876,10 @@ public class BigQueryIOWriteTest implements Serializable { } filesPerPartition.add(writer.getResult().resourceId.toString()); } - partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition)); + partitions.add( + KV.of( + ShardedKey.of(tableDestination.getTableSpec(), j), + new AutoValue_WritePartition_Result(filesPerPartition, true))); String json = String.format( @@ -1879,8 +1889,11 @@ public class BigQueryIOWriteTest implements Serializable { } } - PCollection, List>> writeTablesInput = - p.apply(Create.of(partitions)); + PCollection, WritePartition.Result>> writeTablesInput = + p.apply( + Create.of(partitions) + .withCoder( + KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), ResultCoder.INSTANCE))); PCollectionView jobIdTokenView = p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton()); List> sideInputs = ImmutableList.of(jobIdTokenView); @@ -1903,18 +1916,25 @@ public class BigQueryIOWriteTest implements Serializable { false, Collections.emptySet()); - PCollection> writeTablesOutput = - writeTablesInput.apply(writeTables); + PCollection> writeTablesOutput = + writeTablesInput + .apply(writeTables) + .setCoder(KvCoder.of(TableDestinationCoderV3.of(), WriteTables.ResultCoder.INSTANCE)); PAssert.thatMultimap(writeTablesOutput) .satisfies( input -> { assertEquals(input.keySet(), expectedTempTables.keySet()); - for (Map.Entry> entry : input.entrySet()) { + for (Map.Entry> entry : + input.entrySet()) { + Iterable tableNames = + StreamSupport.stream(entry.getValue().spliterator(), false) + .map(Result::getTableName) + .collect(Collectors.toList()); @SuppressWarnings("unchecked") String[] expectedValues = Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class); - assertThat(entry.getValue(), containsInAnyOrder(expectedValues)); + assertThat(tableNames, containsInAnyOrder(expectedValues)); } return null; }); @@ -1951,7 +1971,7 @@ public class BigQueryIOWriteTest implements Serializable { Multimap expectedRowsPerTable = ArrayListMultimap.create(); String jobIdToken = "jobIdToken"; Multimap tempTables = ArrayListMultimap.create(); - List> tempTablesElement = Lists.newArrayList(); + List> tempTablesElement = Lists.newArrayList(); for (int i = 0; i < numFinalTables; ++i) { String tableName = "project-id:dataset-id.table_" + i; TableDestination tableDestination = new TableDestination(tableName, "table_" + i + "_desc"); @@ -1971,7 +1991,8 @@ public class BigQueryIOWriteTest implements Serializable { expectedRowsPerTable.putAll(tableDestination, rows); String tableJson = toJsonString(tempTable); tempTables.put(tableDestination, tableJson); - tempTablesElement.add(KV.of(tableDestination, tableJson)); + tempTablesElement.add( + KV.of(tableDestination, new AutoValue_WriteTables_Result(tableJson, true))); } } @@ -1987,7 +2008,8 @@ public class BigQueryIOWriteTest implements Serializable { 3, "kms_key"); - DoFnTester>, Void> tester = DoFnTester.of(writeRename); + DoFnTester>, Void> tester = + DoFnTester.of(writeRename); tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); tester.processElement(tempTablesElement); tester.finishBundle();