beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: [BEAM-2154] More scalable dynamic BigQueryIO.Write
Date Tue, 09 May 2017 01:20:00 GMT
Repository: beam
Updated Branches:
  refs/heads/master f315cc9ff -> 6a2586a4e


[BEAM-2154] More scalable dynamic BigQueryIO.Write

If too many tables are generated in a bundle, spill and group the results
before writing files. Generating hundreds or thousands of file write buffers
in a single bundle was causing workers to crash with out of memory.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fd1dda09
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fd1dda09
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fd1dda09

Branch: refs/heads/master
Commit: fd1dda09fc4db48cfbc21afd6310320533fffe78
Parents: f315cc9
Author: Reuven Lax <relax@google.com>
Authored: Sat Apr 29 07:33:54 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Mon May 8 18:01:44 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 122 ++++++++++++++++---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  35 +++---
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    |   3 +-
 .../sdk/io/gcp/bigquery/TableRowWriter.java     |  59 +++++----
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 111 ++++++++++++++---
 .../bigquery/WriteGroupedRecordsToFiles.java    |  68 +++++++++++
 .../sdk/io/gcp/bigquery/WritePartition.java     |  38 +++---
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   |   9 +-
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   |   2 +-
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   7 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  81 ++++++++----
 .../sdk/io/gcp/bigquery/FakeJobService.java     |   6 +-
 12 files changed, 396 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index ba64ab1..0abd469 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import java.util.List;
@@ -32,11 +33,15 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -49,6 +54,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -57,6 +63,36 @@ import org.apache.beam.sdk.values.TupleTagList;
 /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
 class BatchLoads<DestinationT>
     extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
+  // The maximum number of file writers to keep open in a single bundle at a time, since file
+  // writers default to 64mb buffers. This comes into play when writing dynamic table destinations.
+  // The first 20 tables from a single BatchLoads transform will write files inline in the
+  // transform. Anything beyond that might be shuffled.  Users using this transform directly who
+  // know that they are running on workers with sufficient memory can increase this by calling
+  // BatchLoads#setMaxNumWritersPerBundle. This allows the workers to do more work in memory, and
+  // save on the cost of shuffling some of this data.
+  // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
+  // their own policy.
+  @VisibleForTesting
+  static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
+
+  @VisibleForTesting
+  // Maximum number of files in a single partition.
+  static final int MAX_NUM_FILES = 10000;
+
+  @VisibleForTesting
+  // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
+  static final long MAX_SIZE_BYTES = 11 * (1L << 40);
+
+  // The maximum size of a single file - 4TiB, just under the 5 TiB limit.
+  static final long DEFAULT_MAX_FILE_SIZE = 4 * (1L << 40);
+
+  // The maximum number of retries to poll the status of a job.
+  // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+  static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+  // The maximum number of retry jobs.
+  static final int MAX_RETRY_JOBS = 3;
+
   private BigQueryServices bigQueryServices;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
@@ -65,6 +101,8 @@ class BatchLoads<DestinationT>
   private final boolean singletonTable;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
   private final Coder<DestinationT> destinationCoder;
+  private int maxNumWritersPerBundle;
+  private long maxFileSize;
 
   BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
              boolean singletonTable,
@@ -76,12 +114,29 @@ class BatchLoads<DestinationT>
     this.singletonTable = singletonTable;
     this.dynamicDestinations = dynamicDestinations;
     this.destinationCoder = destinationCoder;
+    this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
+    this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
   }
 
   void setTestServices(BigQueryServices bigQueryServices) {
     this.bigQueryServices = bigQueryServices;
   }
 
+  /** Get the maximum number of file writers that will be open simultaneously in a bundle. */
+  public int getMaxNumWritersPerBundle() {
+    return maxNumWritersPerBundle;
+  }
+
+  /** Set the maximum number of file writers that will be open simultaneously in a bundle. */
+  public void setMaxNumWritersPerBundle(int maxNumWritersPerBundle) {
+    this.maxNumWritersPerBundle = maxNumWritersPerBundle;
+  }
+
+  @VisibleForTesting
+  void setMaxFileSize(long maxFileSize) {
+    this.maxFileSize = maxFileSize;
+  }
+
   @Override
   public void validate(PipelineOptions options) {
     // We will use a BigQuery load job -- validate the temp location.
@@ -107,21 +162,25 @@ class BatchLoads<DestinationT>
     Pipeline p = input.getPipeline();
     final String stepUuid = BigQueryHelpers.randomUUIDString();
 
+    PCollectionView<String> tempFilePrefix =
+        p.apply("Create", Create.of((Void) null))
+            .apply(
+                "GetTempFilePrefix",
+                ParDo.of(
+                    new DoFn<Void, String>() {
+                      @ProcessElement
+                      public void getTempFilePrefix(ProcessContext c) {
+                        c.output(
+                            resolveTempLocation(
+                                c.getPipelineOptions().getTempLocation(),
+                                "BigQueryWriteTemp",
+                                stepUuid));
+                      }
+                    }))
+            .apply("TempFilePrefixView", View.<String>asSingleton());
+
     // Create a singleton job ID token at execution time. This will be used as the base for all
     // load jobs issued from this instance of the transform.
-    PCollection<String> singleton = p
-        .apply("Create", Create.of((Void) null))
-        .apply("GetTempFilePrefix", ParDo.of(new DoFn<Void, String>() {
-          @ProcessElement
-          public void getTempFilePrefix(ProcessContext c) {
-            c.output(
-                resolveTempLocation(
-                    c.getPipelineOptions().getTempLocation(),
-                    "BigQueryWriteTemp",
-                    stepUuid));
-          }
-        }));
-
     PCollectionView<String> jobIdTokenView =
         p.apply("TriggerIdCreation", Create.of("ignored"))
             .apply(
@@ -144,12 +203,37 @@ class BatchLoads<DestinationT>
     PCollectionView<Map<DestinationT, String>> schemasView =
         inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations));
 
+    TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
+        new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles"){};
+    TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {};
+    PCollectionTuple writeBundlesTuple = inputInGlobalWindow
+            .apply("WriteBundlesToFiles",
+                ParDo.of(new WriteBundlesToFiles<>(stepUuid, unwrittedRecordsTag,
+                    maxNumWritersPerBundle, maxFileSize))
+                .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
+    PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
+        writeBundlesTuple.get(writtenFilesTag)
+        .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+
+    // If the bundles contain too many output tables to be written inline to files (due to memory
+    // limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection.
+    // Group these records by key, and write the files after grouping. Since the record is grouped
+    // by key, we can ensure that only one file is open at a time in each bundle.
+    PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFilesGrouped =
+        writeBundlesTuple
+            .get(unwrittedRecordsTag)
+            .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of()))
+            .apply(GroupByKey.<ShardedKey<DestinationT>, TableRow>create())
+            .apply(
+                ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize))
+                    .withSideInputs(tempFilePrefix))
+            .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+
     // PCollection of filename, file byte size, and table destination.
     PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
-        inputInGlobalWindow
-            .apply("WriteBundlesToFiles", ParDo.of(
-                new WriteBundlesToFiles<DestinationT>(stepUuid)))
-            .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+        PCollectionList.of(writtenFiles).and(writtenFilesGrouped)
+        .apply(Flatten.<Result<DestinationT>>pCollections());
 
     TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
         new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
@@ -164,16 +248,18 @@ class BatchLoads<DestinationT>
     // This transform will look at the set of files written for each table, and if any table has
     // too many files or bytes, will partition that table's files into multiple partitions for
     // loading.
+    PCollection<Void> singleton = p.apply(Create.of((Void) null).withCoder(VoidCoder.of()));
     PCollectionTuple partitions =
         singleton.apply(
             "WritePartition",
             ParDo.of(
                     new WritePartition<>(
                         singletonTable,
+                        tempFilePrefix,
                         resultsView,
                         multiPartitionsTag,
                         singlePartitionTag))
-                .withSideInputs(resultsView)
+                .withSideInputs(tempFilePrefix, resultsView)
                 .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
 
     List<PCollectionView<?>> writeTablesSideInputs =

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 8fb05ff..cf258ca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -658,21 +658,6 @@ public class BigQueryIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {
-    @VisibleForTesting
-    // Maximum number of files in a single partition.
-    static final int MAX_NUM_FILES = 10000;
-
-    @VisibleForTesting
-    // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
-    static final long MAX_SIZE_BYTES = 11 * (1L << 40);
-
-    // The maximum number of retry jobs.
-    static final int MAX_RETRY_JOBS = 3;
-
-    // The maximum number of retries to poll the status of a job.
-    // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
-    static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
     @Nullable abstract ValueProvider<String> getJsonTableRef();
     @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination>
       getTableFunction();
@@ -687,6 +672,8 @@ public class BigQueryIO {
     /** An option to indicate if table validation is desired. Default is true. */
     abstract boolean getValidate();
     abstract BigQueryServices getBigQueryServices();
+    @Nullable abstract Integer getMaxFilesPerBundle();
+    @Nullable abstract Long getMaxFileSize();
 
     abstract Builder<T> toBuilder();
 
@@ -704,6 +691,8 @@ public class BigQueryIO {
       abstract Builder<T> setTableDescription(String tableDescription);
       abstract Builder<T> setValidate(boolean validate);
       abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
+      abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle);
+      abstract Builder<T> setMaxFileSize(Long maxFileSize);
 
       abstract Write<T> build();
     }
@@ -882,6 +871,16 @@ public class BigQueryIO {
       return toBuilder().setBigQueryServices(testServices).build();
     }
 
+    @VisibleForTesting
+    Write<T> withMaxFilesPerBundle(int maxFilesPerBundle) {
+      return toBuilder().setMaxFilesPerBundle(maxFilesPerBundle).build();
+    }
+
+    @VisibleForTesting
+    Write<T> withMaxFileSize(long maxFileSize) {
+      return toBuilder().setMaxFileSize(maxFileSize).build();
+    }
+
     @Override
     public void validate(PipelineOptions pipelineOptions) {
       BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
@@ -993,6 +992,12 @@ public class BigQueryIO {
             dynamicDestinations,
             destinationCoder);
         batchLoads.setTestServices(getBigQueryServices());
+        if (getMaxFilesPerBundle() != null) {
+          batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
+        }
+        if (getMaxFileSize() != null) {
+          batchLoads.setMaxFileSize(getMaxFileSize());
+        }
         return rowsWithDestination.apply(batchLoads);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
index f04e9b9..7aefcfa 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -63,8 +63,7 @@ class ShardedKeyCoder<KeyT>
   public ShardedKey<KeyT> decode(InputStream inStream, Context context)
       throws IOException {
     return new ShardedKey<>(
-        keyCoder.decode(inStream, context.nested()),
-        shardNumberCoder.decode(inStream, context));
+        keyCoder.decode(inStream, context.nested()), shardNumberCoder.decode(inStream, context));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index f9d8785..e2db871 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -18,12 +18,15 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.io.CountingOutputStream;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
+import java.util.UUID;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.io.FileSystems;
@@ -32,20 +35,19 @@ import org.apache.beam.sdk.util.MimeTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */
-class TableRowWriter {
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
+/** Writes {@link TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */
+class TableRowWriter implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(TableRowWriter.class);
 
   private static final Coder<TableRow> CODER = TableRowJsonCoder.of();
   private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-  private final String tempFilePrefix;
-  private String id;
   private ResourceId resourceId;
   private WritableByteChannel channel;
-  protected String mimeType = MimeTypes.TEXT;
   private CountingOutputStream out;
 
-  public static final class Result {
+  private boolean isClosed = false;
+
+  static final class Result {
     final ResourceId resourceId;
     final long byteSize;
 
@@ -55,37 +57,32 @@ class TableRowWriter {
     }
   }
 
-  TableRowWriter(String basename) {
-    this.tempFilePrefix = basename;
-  }
-
-  public final void open(String uId) throws Exception {
-    id = uId;
-    resourceId = FileSystems.matchNewResource(tempFilePrefix + id, false);
-    LOG.debug("Opening {}.", resourceId);
-    channel = FileSystems.create(resourceId, mimeType);
-    try {
-      out = new CountingOutputStream(Channels.newOutputStream(channel));
-      LOG.debug("Writing header to {}.", resourceId);
-    } catch (Exception e) {
-      try {
-        LOG.error("Writing header to {} failed, closing channel.", resourceId);
-        channel.close();
-      } catch (IOException closeException) {
-        LOG.error("Closing channel for {} failed", resourceId);
-      }
-      throw e;
-    }
-    LOG.debug("Starting write of bundle {} to {}.", this.id, resourceId);
+  TableRowWriter(String basename) throws Exception {
+    String uId = UUID.randomUUID().toString();
+    resourceId = FileSystems.matchNewResource(basename + uId, false);
+    LOG.info("Opening TableRowWriter to {}.", resourceId);
+    channel = FileSystems.create(resourceId, MimeTypes.TEXT);
+    out = new CountingOutputStream(Channels.newOutputStream(channel));
   }
 
-  public void write(TableRow value) throws Exception {
+  void write(TableRow value) throws Exception {
     CODER.encode(value, out, Context.OUTER);
     out.write(NEWLINE);
   }
 
-  public final Result close() throws IOException {
+  long getByteSize() {
+    return out.getCount();
+  }
+
+  @Override
+  public void close() throws IOException {
+    checkState(!isClosed, "Already closed");
+    isClosed = true;
     channel.close();
+  }
+
+  Result getResult() {
+    checkState(isClosed, "Not yet closed");
     return new Result(resourceId, out.getCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index b896083..890979b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.io.InputStream;
@@ -29,31 +30,43 @@ import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Writes each bundle of {@link TableRow} elements out to a separate file using {@link
- * TableRowWriter}.
+ * Writes each bundle of {@link TableRow} elements out to separate file using {@link
+ * TableRowWriter}. Elements destined to different destinations are written to separate files.
+ * The transform will not write an element to a file if it is already writing to
+ * {@link #maxNumWritersPerBundle} files and the element is destined to a new destination. In this
+ * case, the element will be spilled into the output, and the {@link WriteGroupedRecordsToFiles}
+ * transform will take care of writing it to a file.
  */
 class WriteBundlesToFiles<DestinationT>
-    extends DoFn<KV<DestinationT, TableRow>, WriteBundlesToFiles.Result<DestinationT>> {
+    extends DoFn<KV<DestinationT, TableRow>, Result<DestinationT>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFiles.class);
 
+  // When we spill records, shard the output keys to prevent hotspots. Experiments running up to
+  // 10TB of data have shown a sharding of 10 to be a good choice.
+  private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
+
   // Map from tablespec to a writer for that table.
   private transient Map<DestinationT, TableRowWriter> writers;
   private transient Map<DestinationT, BoundedWindow> writerWindows;
   private final String stepUuid;
-
+  private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
+  private int maxNumWritersPerBundle;
+  private long maxFileSize;
 
   /**
    * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
@@ -115,30 +128,66 @@ class WriteBundlesToFiles<DestinationT>
     public void verifyDeterministic() {}
   }
 
-  WriteBundlesToFiles(String stepUuid) {
+  WriteBundlesToFiles(
+      String stepUuid,
+      TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag,
+      int maxNumWritersPerBundle,
+      long maxFileSize) {
     this.stepUuid = stepUuid;
+    this.unwrittedRecordsTag = unwrittedRecordsTag;
+    this.maxNumWritersPerBundle = maxNumWritersPerBundle;
+    this.maxFileSize = maxFileSize;
   }
 
   @StartBundle
   public void startBundle() {
-    // This must be done each bundle, as by default the {@link DoFn} might be reused between
+    // This must be done for each bundle, as by default the {@link DoFn} might be reused between
     // bundles.
     this.writers = Maps.newHashMap();
     this.writerWindows = Maps.newHashMap();
   }
 
+  TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePrefix,
+                                       BoundedWindow window) throws Exception {
+    TableRowWriter writer = new TableRowWriter(tempFilePrefix);
+    writers.put(destination, writer);
+    writerWindows.put(destination, window);
+    return writer;
+  }
+
   @ProcessElement
   public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
     String tempFilePrefix = resolveTempLocation(
         c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
-    TableRowWriter writer = writers.get(c.element().getKey());
-    if (writer == null) {
-      writer = new TableRowWriter(tempFilePrefix);
-      writer.open(UUID.randomUUID().toString());
-      writers.put(c.element().getKey(), writer);
-      writerWindows.put(c.element().getKey(), window);
-      LOG.debug("Done opening writer {}", writer);
+    DestinationT destination = c.element().getKey();
+
+    TableRowWriter writer;
+    if (writers.containsKey(destination)) {
+      writer = writers.get(destination);
+    } else {
+      // Only create a new writer if we have fewer than maxNumWritersPerBundle already in this
+      // bundle.
+      if (writers.size() <= maxNumWritersPerBundle) {
+        writer = createAndInsertWriter(destination, tempFilePrefix, window);
+      } else {
+        // This means that we already had too many writers open in this bundle. "spill" this record
+        // into the output. It will be grouped and written to a file in a subsequent stage.
+        c.output(unwrittedRecordsTag,
+            KV.of(ShardedKey.of(destination,
+                ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR)),
+                c.element().getValue()));
+        return;
+      }
     }
+
+    if (writer.getByteSize() > maxFileSize) {
+      // File is too big. Close it and open a new file.
+      writer.close();
+      TableRowWriter.Result result = writer.getResult();
+      c.output(new Result<>(result.resourceId.toString(), result.byteSize, destination));
+      writer = createAndInsertWriter(destination, tempFilePrefix, window);
+    }
+
     try {
       writer.write(c.element().getValue());
     } catch (Exception e) {
@@ -156,14 +205,36 @@ class WriteBundlesToFiles<DestinationT>
 
   @FinishBundle
   public void finishBundle(FinishBundleContext c) throws Exception {
+    List<Exception> exceptionList = Lists.newArrayList();
+    for (TableRowWriter writer : writers.values()) {
+      try {
+        writer.close();
+      } catch (Exception e) {
+        exceptionList.add(e);
+      }
+    }
+    if (!exceptionList.isEmpty()) {
+      Exception e = new IOException("Failed to close some writers");
+      for (Exception thrown : exceptionList) {
+        e.addSuppressed(thrown);
+      }
+      throw e;
+    }
+
     for (Map.Entry<DestinationT, TableRowWriter> entry : writers.entrySet()) {
-      TableRowWriter.Result result = entry.getValue().close();
-      c.output(
-          new Result<>(result.resourceId.toString(), result.byteSize, entry.getKey()),
-          writerWindows.get(entry.getKey()).maxTimestamp(),
-          writerWindows.get(entry.getKey()));
+      try {
+        DestinationT destination = entry.getKey();
+        TableRowWriter writer = entry.getValue();
+        TableRowWriter.Result result = writer.getResult();
+        c.output(
+            new Result<>(result.resourceId.toString(), result.byteSize, destination),
+            writerWindows.get(destination).maxTimestamp(),
+            writerWindows.get(destination));
+      } catch (Exception e) {
+        exceptionList.add(e);
+      }
     }
     writers.clear();
-    writerWindows.clear();
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
new file mode 100644
index 0000000..45dc2a8
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Receives elements grouped by their (sharded) destination, and writes them out to a file.
+ * Since all the elements in the {@link Iterable} are destined to the same table, they are all
+ * written to the same file. Ensures that only one {@link TableRowWriter} is active per bundle.
+ */
+class WriteGroupedRecordsToFiles<DestinationT>
+    extends DoFn<KV<ShardedKey<DestinationT>, Iterable<TableRow>>,
+    WriteBundlesToFiles.Result<DestinationT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(WriteGroupedRecordsToFiles.class);
+
+  private final PCollectionView<String> tempFilePrefix;
+  private final long maxFileSize;
+
+  WriteGroupedRecordsToFiles(PCollectionView<String> tempFilePrefix, long maxFileSize) {
+    this.tempFilePrefix = tempFilePrefix;
+    this.maxFileSize = maxFileSize;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    String tempFilePrefix = c.sideInput(this.tempFilePrefix);
+    TableRowWriter writer = new TableRowWriter(tempFilePrefix);
+    try (TableRowWriter ignored = writer) {
+      for (TableRow tableRow : c.element().getValue()) {
+        if (writer.getByteSize() > maxFileSize) {
+          writer.close();
+          TableRowWriter.Result result = writer.getResult();
+          c.output(new WriteBundlesToFiles.Result<>(
+              result.resourceId.toString(), result.byteSize, c.element().getKey().getKey()));
+          writer = new TableRowWriter(tempFilePrefix);
+        }
+        writer.write(tableRow);
+      }
+    }
+    TableRowWriter.Result result = writer.getResult();
+    c.output(
+        new WriteBundlesToFiles.Result<>(
+            result.resourceId.toString(), result.byteSize, c.element().getKey().getKey()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 66004b2..24693da 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -22,8 +22,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
@@ -35,9 +33,10 @@ import org.apache.beam.sdk.values.TupleTag;
  * tablespec and the list of files corresponding to each partition of that table.
  */
 class WritePartition<DestinationT>
-    extends DoFn<String, KV<ShardedKey<DestinationT>, List<String>>> {
+    extends DoFn<Void, KV<ShardedKey<DestinationT>, List<String>>> {
   private final boolean singletonTable;
-  private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView;
+  private final PCollectionView<String> tempFilePrefix;
+  private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results;
   private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
   private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag;
 
@@ -73,8 +72,8 @@ class WritePartition<DestinationT>
     // Check to see whether we can add to this partition without exceeding the maximum partition
     // size.
     boolean canAccept(int numFiles, long numBytes) {
-      return this.numFiles + numFiles <= Write.MAX_NUM_FILES
-          && this.byteSize + numBytes <= Write.MAX_SIZE_BYTES;
+      return this.numFiles + numFiles <= BatchLoads.MAX_NUM_FILES
+          && this.byteSize + numBytes <= BatchLoads.MAX_SIZE_BYTES;
     }
   }
 
@@ -101,11 +100,13 @@ class WritePartition<DestinationT>
 
   WritePartition(
       boolean singletonTable,
-      PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView,
+      PCollectionView<String> tempFilePrefix,
+      PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results,
       TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
       TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) {
     this.singletonTable = singletonTable;
-    this.resultsView = resultsView;
+    this.results = results;
+    this.tempFilePrefix = tempFilePrefix;
     this.multiPartitionsTag = multiPartitionsTag;
     this.singlePartitionTag = singlePartitionTag;
   }
@@ -113,21 +114,20 @@ class WritePartition<DestinationT>
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     List<WriteBundlesToFiles.Result<DestinationT>> results =
-        Lists.newArrayList(c.sideInput(resultsView));
+        Lists.newArrayList(c.sideInput(this.results));
 
     // If there are no elements to write _and_ the user specified a constant output table, then
     // generate an empty table of that name.
     if (results.isEmpty() && singletonTable) {
-        TableRowWriter writer = new TableRowWriter(c.element());
-        writer.open(UUID.randomUUID().toString());
-        TableRowWriter.Result writerResult = writer.close();
-        // Return a null destination in this case - the constant DynamicDestinations class will
-        // resolve it to the singleton output table.
-        results.add(
-            new Result<DestinationT>(
-                writerResult.resourceId.toString(),
-                writerResult.byteSize,
-                null));
+      String tempFilePrefix = c.sideInput(this.tempFilePrefix);
+      TableRowWriter writer = new TableRowWriter(tempFilePrefix);
+      writer.close();
+      TableRowWriter.Result writerResult = writer.getResult();
+      // Return a null destination in this case - the constant DynamicDestinations class will
+      // resolve it to the singleton output table.
+      results.add(
+          new Result<DestinationT>(
+              writerResult.resourceId.toString(), writerResult.byteSize, null));
     }
 
     Map<DestinationT, DestinationData> currentResults = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index bf9d9f1..f641b32 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -43,7 +42,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Copies temporary tables to destination table.
  */
-class WriteRename extends DoFn<String, Void> {
+class WriteRename extends DoFn<Void, Void> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
 
   private final BigQueryServices bqServices;
@@ -123,13 +122,13 @@ class WriteRename extends DoFn<String, Void> {
 
     String projectId = ref.getProjectId();
     Job lastFailedCopyJob = null;
-    for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
+    for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
       String jobId = jobIdPrefix + "-" + i;
       JobReference jobRef = new JobReference()
           .setProjectId(projectId)
           .setJobId(jobId);
       jobService.startCopyJob(jobRef, copyConfig);
-      Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
+      Job copyJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
       Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
       switch (jobStatus) {
         case SUCCEEDED:
@@ -154,7 +153,7 @@ class WriteRename extends DoFn<String, Void> {
         "Failed to create copy job with id prefix %s, "
             + "reached max retries: %d, last failed copy job: %s.",
         jobIdPrefix,
-        Write.MAX_RETRY_JOBS,
+        BatchLoads.MAX_RETRY_JOBS,
         BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index bc18e8e..db0be3a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.TupleTag;
 /**
  * The result of a {@link BigQueryIO.Write} transform.
  */
-final class WriteResult implements POutput {
+public final class WriteResult implements POutput {
 
   private final Pipeline pipeline;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 83ff16b..c5494d8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -149,11 +148,11 @@ class WriteTables<DestinationT>
 
     String projectId = ref.getProjectId();
     Job lastFailedLoadJob = null;
-    for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
+    for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
       String jobId = jobIdPrefix + "-" + i;
       JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId);
       jobService.startLoadJob(jobRef, loadConfig);
-      Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
+      Job loadJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
       Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
       switch (jobStatus) {
         case SUCCEEDED:
@@ -181,7 +180,7 @@ class WriteTables<DestinationT>
             "Failed to create load job with id prefix %s, "
                 + "reached max retries: %d, last failed load job: %s.",
             jobIdPrefix,
-            Write.MAX_RETRY_JOBS,
+            BatchLoads.MAX_RETRY_JOBS,
             BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 0d3f000..d60c721 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -70,6 +70,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
@@ -490,9 +491,19 @@ public class BigQueryIOTest implements Serializable {
     p.apply("Create SideInput2", Create.of(KV.of("a", "a"), KV.of("b", "b"), KV.of("c", "c")))
         .apply("AsMap", View.<String, String>asMap());
 
-    PCollection<String> users = p.apply("CreateUsers",
-        Create.of("bill1", "sam2", "laurence3")
-            .withCoder(StringUtf8Coder.of()))
+    final List<String> allUsernames = ImmutableList.of("bill", "bob", "randolph");
+    List<String> userList = Lists.newArrayList();
+    // Make sure that we generate enough users so that WriteBundlesToFiles is forced to spill to
+    // WriteGroupedRecordsToFiles.
+    for (int i = 0; i < BatchLoads.DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE * 10; ++i) {
+      // Every user has 10 nicknames.
+      for (int j = 0; j < 1; ++j) {
+        String nickname = allUsernames.get(
+            ThreadLocalRandom.current().nextInt(allUsernames.size()));
+        userList.add(nickname + i);
+      }
+    }
+    PCollection<String> users = p.apply("CreateUsers", Create.of(userList))
         .apply(Window.into(new PartitionedGlobalWindows<>(
             new SerializableFunction<String, String>() {
               @Override
@@ -506,6 +517,8 @@ public class BigQueryIOTest implements Serializable {
     }
     users.apply("WriteBigQuery", BigQueryIO.<String>write()
             .withTestServices(fakeBqServices)
+            .withMaxFilesPerBundle(5)
+            .withMaxFileSize(10)
             .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
             .withFormatFunction(new SerializableFunction<String, TableRow>() {
               @Override
@@ -567,12 +580,24 @@ public class BigQueryIOTest implements Serializable {
     File tempDir = new File(bqOptions.getTempLocation());
     testNumFiles(tempDir, 0);
 
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-1"),
-        containsInAnyOrder(new TableRow().set("name", "bill").set("id", 1)));
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-2"),
-        containsInAnyOrder(new TableRow().set("name", "sam").set("id", 2)));
-    assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-3"),
-        containsInAnyOrder(new TableRow().set("name", "laurence").set("id", 3)));
+    Map<Integer, List<TableRow>> expectedTableRows = Maps.newHashMap();
+    for (int i = 0; i < userList.size(); ++i) {
+      Matcher matcher = userPattern.matcher(userList.get(i));
+      checkState(matcher.matches());
+      String nickname = matcher.group(1);
+      int userid = Integer.valueOf(matcher.group(2));
+      List<TableRow> expected = expectedTableRows.get(userid);
+      if (expected == null) {
+        expected = Lists.newArrayList();
+        expectedTableRows.put(userid, expected);
+      }
+      expected.add(new TableRow().set("name", nickname).set("id", userid));
+    }
+
+    for (Map.Entry<Integer, List<TableRow>> entry : expectedTableRows.entrySet()) {
+      assertThat(datasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()),
+          containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class)));
+    }
   }
 
   @Test
@@ -1655,7 +1680,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWritePartitionSinglePartition() throws Exception {
-    long numFiles = BigQueryIO.Write.MAX_NUM_FILES;
+    long numFiles = BatchLoads.MAX_NUM_FILES;
     long fileSize = 1;
 
     // One partition is needed.
@@ -1665,7 +1690,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWritePartitionManyFiles() throws Exception {
-    long numFiles = BigQueryIO.Write.MAX_NUM_FILES * 3;
+    long numFiles = BatchLoads.MAX_NUM_FILES * 3;
     long fileSize = 1;
 
     // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
@@ -1676,7 +1701,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testWritePartitionLargeFileSize() throws Exception {
     long numFiles = 10;
-    long fileSize = BigQueryIO.Write.MAX_SIZE_BYTES / 3;
+    long fileSize = BatchLoads.MAX_SIZE_BYTES / 3;
 
     // One partition is needed for each group of three files.
     long expectedNumPartitions = 4;
@@ -1726,22 +1751,25 @@ public class BigQueryIOTest implements Serializable {
     TupleTag<KV<ShardedKey<String>, List<String>>> singlePartitionTag =
         new TupleTag<KV<ShardedKey<String>, List<String>>>("singlePartitionTag") {};
 
-    PCollection<WriteBundlesToFiles.Result<String>> filesPCollection =
-        p.apply(Create.of(files)
-        .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of())));
     PCollectionView<Iterable<WriteBundlesToFiles.Result<String>>> resultsView =
-        PCollectionViews.iterableView(
-        filesPCollection,
-        WindowingStrategy.globalDefault(),
-        WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of()));
+        p.apply(
+                Create.of(files)
+                    .withCoder(WriteBundlesToFiles.ResultCoder.of(StringUtf8Coder.of())))
+            .apply(View.<WriteBundlesToFiles.Result<String>>asIterable());
+
+    String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath();
+    PCollectionView<String> tempFilePrefixView =
+        p.apply(Create.of(tempFilePrefix)).apply(View.<String>asSingleton());
 
     WritePartition<String> writePartition =
-        new WritePartition<>(isSingleton, resultsView, multiPartitionsTag, singlePartitionTag);
+        new WritePartition<>(
+            isSingleton, tempFilePrefixView, resultsView, multiPartitionsTag, singlePartitionTag);
 
-    DoFnTester<String, KV<ShardedKey<String>, List<String>>> tester =
+    DoFnTester<Void, KV<ShardedKey<String>, List<String>>> tester =
         DoFnTester.of(writePartition);
     tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
-    tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+    tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix);
+    tester.processElement(null);
 
     List<KV<ShardedKey<String>, List<String>>> partitions;
     if (expectedNumPartitionsPerTable > 1) {
@@ -1897,11 +1925,10 @@ public class BigQueryIOTest implements Serializable {
     int numFiles = 10;
     List<String> fileNames = Lists.newArrayList();
     String tempFilePrefix = bqOptions.getTempLocation() + "/";
-    TableRowWriter writer = new TableRowWriter(tempFilePrefix);
     for (int i = 0; i < numFiles; ++i) {
-      String fileName = String.format("files%05d", i);
-      writer.open(fileName);
-      fileNames.add(writer.close().resourceId.toString());
+      TableRowWriter writer = new TableRowWriter(tempFilePrefix);
+      writer.close();
+      fileNames.add(writer.getResult().resourceId.toString());
     }
     fileNames.add(tempFilePrefix + String.format("files%05d", numFiles));
 
@@ -1992,7 +2019,7 @@ public class BigQueryIOTest implements Serializable {
         CreateDisposition.CREATE_IF_NEEDED,
         tempTablesView);
 
-    DoFnTester<String, Void> tester = DoFnTester.of(writeRename);
+    DoFnTester<Void, Void> tester = DoFnTester.of(writeRename);
     tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     tester.processElement(null);

http://git-wip-us.apache.org/repos/asf/beam/blob/fd1dda09/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 50be0bb..ed6a0be 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -76,9 +76,9 @@ import org.joda.time.Duration;
  */
 class FakeJobService implements JobService, Serializable {
   static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
-  // Whenever a job is started, the first 5 calls to GetJob will report the job as pending,
-  // the next 5 will return the job as running, and only then will the job report as done.
-  private static final int GET_JOBS_TRANSITION_INTERVAL = 5;
+  // Whenever a job is started, the first 2 calls to GetJob will report the job as pending,
+  // the next 2 will return the job as running, and only then will the job report as done.
+  private static final int GET_JOBS_TRANSITION_INTERVAL = 2;
 
   private FakeDatasetService datasetService;
 


Mime
View raw message