beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: Fix auto-sharding parameter for BigQuery sink with FILE LOADS
Date Wed, 10 Mar 2021 22:12:44 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c989cc  Fix auto-sharding parameter for BigQuery sink with FILE LOADS
     new 63c13d6  Merge pull request #14183 from [BEAM-11772] Fix auto-sharding parameter
for BigQuery sink with FILE LOADS
9c989cc is described below

commit 9c989ccde6a4053d49631e21fea4b89193a1fca0
Author: sychen <sychen@google.com>
AuthorDate: Tue Mar 9 22:36:36 2021 -0800

    Fix auto-sharding parameter for BigQuery sink with FILE LOADS
---
 .../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java  | 20 ++++++++++++--------
 .../python/apache_beam/io/gcp/bigquery_file_loads.py | 13 +++++++++----
 2 files changed, 21 insertions(+), 12 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 1828192..1372e3f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -116,6 +116,10 @@ class BatchLoads<DestinationT, ElementT>
   // written.
   static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
 
+  // If using auto-sharding for unbounded data, we batch the records before triggering file
write
+  // to avoid generating too many small files.
+  static final Duration FILE_TRIGGERING_BATCHING_DURATION = Duration.standardSeconds(1);
+
   // The maximum number of retries to poll the status of a job.
   // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
   static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
@@ -294,9 +298,9 @@ class BatchLoads<DestinationT, ElementT>
                   .discardingFiredPanes());
       results = writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);
     } else {
-      // In the case of dynamic sharding, however, we use a default triggering and instead
apply the
-      // user supplied triggeringFrequency to the sharding transform. See
-      // writeDynamicallyShardedFilesTriggered.
+      // In the case of dynamic sharding, however, we use a default trigger since the transform
+      // performs sharding also batches elements to avoid generating too many tiny files.
User
+      // trigger is applied right after writes to limit the number of load jobs.
       PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
           input.apply(
               "rewindowIntoGlobal",
@@ -569,15 +573,15 @@ class BatchLoads<DestinationT, ElementT>
   // of filename, file byte size, and table destination.
   PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesTriggered(
       PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String>
tempFilePrefix) {
-    // In contrast to fixed sharding with triggering, here we use a global window with default
-    // trigger and apply the user supplied triggeringFrequency in the subsequent GroupIntoBatches
-    // transform. We also ensure that the files are written if a threshold number of records
are
-    // ready. Dynamic sharding is achieved via the withShardedKey() option provided by
+    // In contrast to fixed sharding with user trigger, here we use a global window with
default
+    // trigger and rely on GroupIntoBatches transform to group, batch and at the same time
+    // parallelize properly. We also ensure that the files are written if a threshold number
of
+    // records are ready. Dynamic sharding is achieved via the withShardedKey() option provided
by
     // GroupIntoBatches.
     return input
         .apply(
             GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
-                .withMaxBufferingDuration(triggeringFrequency)
+                .withMaxBufferingDuration(FILE_TRIGGERING_BATCHING_DURATION)
                 .withShardedKey())
         .setCoder(
             KvCoder.of(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 298ead6..608301e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -69,6 +69,10 @@ _MAXIMUM_SOURCE_URIS = 10 * 1000
 # this many records are written.
 _FILE_TRIGGERING_RECORD_COUNT = 500000
 
+# If using auto-sharding for unbounded data, we batch the records before
+# triggering file write to avoid generating too many small files.
+_FILE_TRIGGERING_BATCHING_DURATION_SECS = 1
+
 
 def _generate_job_name(job_name, job_type, step_name):
   return bigquery_tools.generate_bq_job_name(
@@ -729,9 +733,10 @@ class BigQueryBatchFileLoads(beam.PTransform):
     # We use only the user-supplied trigger on the actual BigQuery load.
     # This allows us to offload the data to the filesystem.
     #
-    # In the case of auto sharding, however, we use a default triggering and
-    # instead apply the user supplied triggering_frequency to the transfrom that
-    # performs sharding.
+    # In the case of dynamic sharding, however, we use a default trigger since
+    # the transform performs sharding also batches elements to avoid generating
+    # too many tiny files. User trigger is applied right after writes to limit
+    # the number of load jobs.
     if self.is_streaming_pipeline and not self.with_auto_sharding:
       return beam.WindowInto(beam.window.GlobalWindows(),
                              trigger=trigger.Repeatedly(
@@ -822,7 +827,7 @@ class BigQueryBatchFileLoads(beam.PTransform):
             lambda kv: (bigquery_tools.get_hashable_destination(kv[0]), kv[1]))
         | 'WithAutoSharding' >> GroupIntoBatches.WithShardedKey(
             batch_size=_FILE_TRIGGERING_RECORD_COUNT,
-            max_buffering_duration_secs=self.triggering_frequency,
+            max_buffering_duration_secs=_FILE_TRIGGERING_BATCHING_DURATION_SECS,
             clock=clock)
         | 'FromHashableTableRefAndDropShard' >> beam.Map(
             lambda kvs:


Mime
View raw message