beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [22/51] [abbrv] incubator-beam git commit: [BEAM-383] Modified BigQueryIO to write based on number of files and file sizes
Date Sat, 06 Aug 2016 02:52:40 GMT
[BEAM-383] Modified BigQueryIO to write based on number of files and file sizes


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8db6114e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8db6114e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8db6114e

Branch: refs/heads/python-sdk
Commit: 8db6114e2087cafc4369f6ec85b04f978dfb1984
Parents: 595d2d4
Author: Ian Zhou <ianzhou@google.com>
Authored: Wed Jul 20 15:56:21 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Aug 3 23:40:27 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 585 ++++++++++++++-----
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |   7 +
 .../io/gcp/bigquery/BigQueryServicesImpl.java   |  51 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 213 ++++++-
 4 files changed, 693 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 8741c9c..2ba7562 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
@@ -33,9 +34,6 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.BigQueryOptions;
@@ -44,6 +42,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -52,7 +51,13 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.FileIOChannelFactory;
+import org.apache.beam.sdk.util.GcsIOChannelFactory;
+import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -80,6 +85,7 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
@@ -93,6 +99,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.io.CountingOutputStream;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -110,6 +117,8 @@ import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -196,7 +205,8 @@ import javax.annotation.Nullable;
  * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should
  * append to an existing table, replace the table, or verify that the table is
  * empty. Note that the dataset being written to must already exist. Unbounded PCollections can only
- * be written using {@link WriteDisposition#WRITE_EMPTY} or {@link WriteDisposition#WRITE_APPEND}.
+ * be written using {@link Write.WriteDisposition#WRITE_EMPTY} or
+ * {@link Write.WriteDisposition#WRITE_APPEND}.
  *
  * <h3>Sharding BigQuery output tables</h3>
  * <p>A common use case is to dynamically generate BigQuery table names based on
@@ -1412,6 +1422,19 @@ public class BigQueryIO {
      * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
      */
     public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
+      // Maximum number of files in a single partition.
+      static final int MAX_NUM_FILES = 10000;
+
+      // Maximum number of bytes in a single partition.
+      static final long MAX_SIZE_BYTES = 3 * (1L << 40);
+
+      // The maximum number of retry jobs.
+      static final int MAX_RETRY_JOBS = 3;
+
+      // The maximum number of retries to poll the status of a job.
+      // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+      static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
       @Nullable final String jsonTableRef;
 
       @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
@@ -1666,7 +1689,8 @@ public class BigQueryIO {
 
       @Override
       public PDone apply(PCollection<TableRow> input) {
-        BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+        Pipeline p = input.getPipeline();
+        BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
         BigQueryServices bqServices = getBigQueryServices();
 
         // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
@@ -1680,13 +1704,13 @@ public class BigQueryIO {
         if (Strings.isNullOrEmpty(table.getProjectId())) {
           table.setProjectId(options.getProject());
         }
-        String jobIdToken = randomUUIDString();
+        String jobIdToken = "beam_job_" + randomUUIDString();
         String tempLocation = options.getTempLocation();
         String tempFilePrefix;
         try {
           IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
           tempFilePrefix = factory.resolve(
-                  factory.resolve(tempLocation, "BigQuerySinkTemp"),
+                  factory.resolve(tempLocation, "BigQueryWriteTemp"),
                   jobIdToken);
         } catch (IOException e) {
           throw new RuntimeException(
@@ -1694,16 +1718,120 @@ public class BigQueryIO {
               e);
         }
 
-        return input.apply("Write", org.apache.beam.sdk.io.Write.to(
-            new BigQuerySink(
+        PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+
+        PCollection<TableRow> inputInGlobalWindow =
+            input.apply(
+                Window.<TableRow>into(new GlobalWindows())
+                    .triggering(DefaultTrigger.of())
+                    .discardingFiredPanes());
+
+        PCollection<KV<String, Long>> results = inputInGlobalWindow
+            .apply("WriteBundles",
+                ParDo.of(new WriteBundles(tempFilePrefix)));
+
+        TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+            new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+        TupleTag<KV<Long, List<String>>> singlePartitionTag =
+            new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+        PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+            .apply("ResultsView", View.<KV<String, Long>>asIterable());
+        PCollectionTuple partitions = singleton.apply(ParDo
+            .of(new WritePartition(
+                resultsView,
+                multiPartitionsTag,
+                singlePartitionTag))
+            .withSideInputs(resultsView)
+            .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+        // Write multiple partitions to separate temporary tables
+        PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+            .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+            .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+                false,
+                bqServices,
                 jobIdToken,
-                table,
+                tempFilePrefix,
+                toJsonString(table),
                 jsonSchema,
-                getWriteDisposition(),
-                getCreateDisposition(),
+                WriteDisposition.WRITE_EMPTY,
+                CreateDisposition.CREATE_IF_NEEDED)));
+
+        PCollectionView<Iterable<String>> tempTablesView = tempTables
+            .apply("TempTablesView", View.<String>asIterable());
+        singleton.apply(ParDo
+            .of(new WriteRename(
+                bqServices,
+                jobIdToken,
+                toJsonString(table),
+                writeDisposition,
+                createDisposition,
+                tempTablesView))
+            .withSideInputs(tempTablesView));
+
+        // Write single partition to final table
+        partitions.get(singlePartitionTag)
+            .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+            .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+                true,
+                bqServices,
+                jobIdToken,
                 tempFilePrefix,
-                input.getCoder(),
-                bqServices)));
+                toJsonString(table),
+                jsonSchema,
+                writeDisposition,
+                createDisposition)));
+
+        return PDone.in(input.getPipeline());
+      }
+
+      private class WriteBundles extends OldDoFn<TableRow, KV<String, Long>> {
+        private TableRowWriter writer = null;
+        private final String tempFilePrefix;
+
+        WriteBundles(String tempFilePrefix) {
+          this.tempFilePrefix = tempFilePrefix;
+        }
+
+        @Override
+        public void processElement(ProcessContext c) throws Exception {
+          if (writer == null) {
+            writer = new TableRowWriter(tempFilePrefix);
+            writer.open(UUID.randomUUID().toString());
+            LOG.debug("Done opening writer {}", writer);
+          }
+          try {
+            writer.write(c.element());
+          } catch (Exception e) {
+            // Discard write result and close the write.
+            try {
+              writer.close();
+              // The writer does not need to be reset, as this OldDoFn cannot be reused.
+            } catch (Exception closeException) {
+              // Do not mask the exception that caused the write to fail.
+              e.addSuppressed(closeException);
+            }
+            throw e;
+          }
+        }
+
+        @Override
+        public void finishBundle(Context c) throws Exception {
+          if (writer != null) {
+            c.output(writer.close());
+            writer = null;
+          }
+        }
+
+        @Override
+        public void populateDisplayData(DisplayData.Builder builder) {
+          super.populateDisplayData(builder);
+
+          builder
+              .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+                  .withLabel("Temporary File Prefix"));
+        }
       }
 
       @Override
@@ -1784,192 +1912,361 @@ public class BigQueryIO {
       }
     }
 
-    /** Disallow construction of utility class. */
-    private Write() {}
-  }
-
-  /**
-   * {@link BigQuerySink} is implemented as a {@link FileBasedSink}.
-   *
-   * <p>It uses BigQuery load job to import files into BigQuery.
-   */
-  static class BigQuerySink extends FileBasedSink<TableRow> {
-    private final String jobIdToken;
-    @Nullable private final String jsonTable;
-    @Nullable private final String jsonSchema;
-    private final WriteDisposition writeDisposition;
-    private final CreateDisposition createDisposition;
-    private final Coder<TableRow> coder;
-    private final BigQueryServices bqServices;
-
-    public BigQuerySink(
-        String jobIdToken,
-        @Nullable TableReference table,
-        @Nullable String jsonSchema,
-        WriteDisposition writeDisposition,
-        CreateDisposition createDisposition,
-        String tempFile,
-        Coder<TableRow> coder,
-        BigQueryServices bqServices) {
-      super(tempFile, ".json");
-      this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
-      if (table == null) {
-        this.jsonTable = null;
-      } else {
-        checkArgument(!Strings.isNullOrEmpty(table.getProjectId()),
-            "Table %s should have a project specified", table);
-        this.jsonTable = toJsonString(table);
-      }
-      this.jsonSchema = jsonSchema;
-      this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
-      this.createDisposition = checkNotNull(createDisposition, "createDisposition");
-      this.coder = checkNotNull(coder, "coder");
-      this.bqServices = checkNotNull(bqServices, "bqServices");
-     }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation(
-        PipelineOptions options) {
-      return new BigQueryWriteOperation(this);
-    }
+    static class TableRowWriter {
+      private static final Coder<TableRow> CODER = TableRowJsonCoder.of();
+      private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
+      private final String tempFilePrefix;
+      private String id;
+      private String fileName;
+      private WritableByteChannel channel;
+      protected String mimeType = MimeTypes.TEXT;
+      private CountingOutputStream out;
+
+      TableRowWriter(String basename) {
+        this.tempFilePrefix = basename;
+      }
+
+      public final void open(String uId) throws Exception {
+        id = uId;
+        fileName = tempFilePrefix + id;
+        LOG.debug("Opening {}.", fileName);
+        channel = IOChannelUtils.create(fileName, mimeType);
+        try {
+          out = new CountingOutputStream(Channels.newOutputStream(channel));
+          LOG.debug("Writing header to {}.", fileName);
+        } catch (Exception e) {
+          try {
+            LOG.error("Writing header to {} failed, closing channel.", fileName);
+            channel.close();
+          } catch (IOException closeException) {
+            LOG.error("Closing channel for {} failed", fileName);
+          }
+          throw e;
+        }
+        LOG.debug("Starting write of bundle {} to {}.", this.id, fileName);
+      }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
+      public void write(TableRow value) throws Exception {
+        CODER.encode(value, out, Context.OUTER);
+        out.write(NEWLINE);
+      }
 
-      builder
-          .addIfNotNull(DisplayData.item("schema", jsonSchema)
-            .withLabel("Table Schema"))
-          .addIfNotNull(DisplayData.item("tableSpec", jsonTable)
-            .withLabel("Table Specification"));
+      public final KV<String, Long> close() throws IOException {
+        channel.close();
+        return KV.of(fileName, out.getCount());
+      }
     }
 
-    private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
-      // The maximum number of retry load jobs.
-      private static final int MAX_RETRY_LOAD_JOBS = 3;
+    /**
+     * Partitions temporary files based on number of files and file sizes.
+     */
+    static class WritePartition extends OldDoFn<String, KV<Long, List<String>>> {
+      private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
+      private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
+      private TupleTag<KV<Long, List<String>>> singlePartitionTag;
 
-      // The maximum number of retries to poll the status of a load job.
-      // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
-      private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+      public WritePartition(
+          PCollectionView<Iterable<KV<String, Long>>> resultsView,
+          TupleTag<KV<Long, List<String>>> multiPartitionsTag,
+          TupleTag<KV<Long, List<String>>> singlePartitionTag) {
+        this.resultsView = resultsView;
+        this.multiPartitionsTag = multiPartitionsTag;
+        this.singlePartitionTag = singlePartitionTag;
+      }
 
-      private final BigQuerySink bigQuerySink;
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
+        if (results.isEmpty()) {
+          TableRowWriter writer = new TableRowWriter(c.element());
+          writer.open(UUID.randomUUID().toString());
+          results.add(writer.close());
+        }
 
-      private BigQueryWriteOperation(BigQuerySink sink) {
-        super(checkNotNull(sink, "sink"));
-        this.bigQuerySink = sink;
+        long partitionId = 0;
+        int currNumFiles = 0;
+        long currSizeBytes = 0;
+        List<String> currResults = Lists.newArrayList();
+        for (int i = 0; i < results.size(); ++i) {
+          KV<String, Long> fileResult = results.get(i);
+          if (currNumFiles + 1 > Bound.MAX_NUM_FILES
+              || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) {
+            c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+            currResults = Lists.newArrayList();
+            currNumFiles = 0;
+            currSizeBytes = 0;
+          }
+          ++currNumFiles;
+          currSizeBytes += fileResult.getValue();
+          currResults.add(fileResult.getKey());
+        }
+        if (partitionId == 0) {
+          c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults));
+        } else {
+          c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
+        }
       }
 
       @Override
-      public FileBasedWriter<TableRow> createWriter(PipelineOptions options) throws Exception {
-        return new TableRowWriter(this, bigQuerySink.coder);
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+      }
+    }
+
+    /**
+     * Writes partitions to BigQuery tables.
+     */
+    static class WriteTables extends OldDoFn<KV<Long, Iterable<List<String>>>, String> {
+      private final boolean singlePartition;
+      private final BigQueryServices bqServices;
+      private final String jobIdToken;
+      private final String tempFilePrefix;
+      private final String jsonTableRef;
+      private final String jsonSchema;
+      private final WriteDisposition writeDisposition;
+      private final CreateDisposition createDisposition;
+
+      public WriteTables(
+          boolean singlePartition,
+          BigQueryServices bqServices,
+          String jobIdToken,
+          String tempFilePrefix,
+          String jsonTableRef,
+          String jsonSchema,
+          WriteDisposition writeDisposition,
+          CreateDisposition createDisposition) {
+        this.singlePartition = singlePartition;
+        this.bqServices = bqServices;
+        this.jobIdToken = jobIdToken;
+        this.tempFilePrefix = tempFilePrefix;
+        this.jsonTableRef = jsonTableRef;
+        this.jsonSchema = jsonSchema;
+        this.writeDisposition = writeDisposition;
+        this.createDisposition = createDisposition;
       }
 
       @Override
-      public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
-          throws IOException, InterruptedException {
-        try {
-          BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-          List<String> tempFiles = Lists.newArrayList();
-          for (FileResult result : writerResults) {
-            tempFiles.add(result.getFilename());
-          }
-          if (!tempFiles.isEmpty()) {
-              load(
-                  bigQuerySink.bqServices.getJobService(bqOptions),
-                  bigQuerySink.jobIdToken,
-                  fromJsonString(bigQuerySink.jsonTable, TableReference.class),
-                  tempFiles,
-                  fromJsonString(bigQuerySink.jsonSchema, TableSchema.class),
-                  bigQuerySink.writeDisposition,
-                  bigQuerySink.createDisposition);
-          }
-        } finally {
-          removeTemporaryFiles(options);
+      public void processElement(ProcessContext c) throws Exception {
+        List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
+        String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
+        TableReference ref = fromJsonString(jsonTableRef, TableReference.class);
+        if (!singlePartition) {
+          ref.setTableId(jobIdPrefix);
         }
+
+        load(
+            bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+            jobIdPrefix,
+            ref,
+            fromJsonString(jsonSchema, TableSchema.class),
+            partition,
+            writeDisposition,
+            createDisposition);
+        c.output(toJsonString(ref));
+
+        removeTemporaryFiles(c.getPipelineOptions(), partition);
       }
 
-      /**
-       * Import files into BigQuery with load jobs.
-       *
-       * <p>Returns if files are successfully loaded into BigQuery.
-       * Throws a RuntimeException if:
-       *     1. The status of one load job is UNKNOWN. This is to avoid duplicating data.
-       *     2. It exceeds {@code MAX_RETRY_LOAD_JOBS}.
-       *
-       * <p>If a load job failed, it will try another load job with a different job id.
-       */
       private void load(
           JobService jobService,
           String jobIdPrefix,
           TableReference ref,
-          List<String> gcsUris,
           @Nullable TableSchema schema,
+          List<String> gcsUris,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition) throws InterruptedException, IOException {
         JobConfigurationLoad loadConfig = new JobConfigurationLoad()
-            .setSourceUris(gcsUris)
             .setDestinationTable(ref)
             .setSchema(schema)
+            .setSourceUris(gcsUris)
             .setWriteDisposition(writeDisposition.name())
             .setCreateDisposition(createDisposition.name())
             .setSourceFormat("NEWLINE_DELIMITED_JSON");
 
-        boolean retrying = false;
         String projectId = ref.getProjectId();
-        for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) {
+        for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
           String jobId = jobIdPrefix + "-" + i;
-          if (retrying) {
-            LOG.info("Previous load jobs failed, retrying.");
-          }
-          LOG.info("Starting BigQuery load job: {}", jobId);
+          LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
           JobReference jobRef = new JobReference()
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startLoadJob(jobRef, loadConfig);
           Status jobStatus =
-              parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES));
+              parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
           switch (jobStatus) {
             case SUCCEEDED:
               return;
             case UNKNOWN:
-              throw new RuntimeException("Failed to poll the load job status.");
+              throw new RuntimeException("Failed to poll the load job status of job " + jobId);
             case FAILED:
               LOG.info("BigQuery load job failed: {}", jobId);
-              retrying = true;
               continue;
             default:
-              throw new IllegalStateException("Unexpected job status: " + jobStatus);
+              throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
+                  jobStatus, jobId));
           }
         }
-        throw new RuntimeException(
-            "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS);
+        throw new RuntimeException(String.format("Failed to create the load job %s, reached max "
+            + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+      }
+
+      private void removeTemporaryFiles(PipelineOptions options, Collection<String> matches)
+          throws IOException {
+        String pattern = tempFilePrefix + "*";
+        LOG.debug("Finding temporary files matching {}.", pattern);
+        IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
+        if (factory instanceof GcsIOChannelFactory) {
+          GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options);
+          gcsUtil.remove(matches);
+        } else if (factory instanceof FileIOChannelFactory) {
+          for (String filename : matches) {
+            LOG.debug("Removing file {}", filename);
+            boolean exists = Files.deleteIfExists(Paths.get(filename));
+            if (!exists) {
+              LOG.debug("{} does not exist.", filename);
+            }
+          }
+        } else {
+          throw new IOException("Unrecognized file system.");
+        }
       }
-    }
 
-    private static class TableRowWriter extends FileBasedWriter<TableRow> {
-      private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-      private final Coder<TableRow> coder;
-      private OutputStream out;
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
 
-      public TableRowWriter(
-          FileBasedWriteOperation<TableRow> writeOperation, Coder<TableRow> coder) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.TEXT;
-        this.coder = coder;
+        builder
+            .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken)
+                .withLabel("Job ID Token"))
+            .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+                .withLabel("Temporary File Prefix"))
+            .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
+                .withLabel("Table Reference"))
+            .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
+                .withLabel("Table Schema"));
+      }
+    }
+
+    /**
+     * Copies temporary tables to destination table.
+     */
+    static class WriteRename extends OldDoFn<String, Void> {
+      private final BigQueryServices bqServices;
+      private final String jobIdToken;
+      private final String jsonTableRef;
+      private final WriteDisposition writeDisposition;
+      private final CreateDisposition createDisposition;
+      private final PCollectionView<Iterable<String>> tempTablesView;
+
+      public WriteRename(
+          BigQueryServices bqServices,
+          String jobIdToken,
+          String jsonTableRef,
+          WriteDisposition writeDisposition,
+          CreateDisposition createDisposition,
+          PCollectionView<Iterable<String>> tempTablesView) {
+        this.bqServices = bqServices;
+        this.jobIdToken = jobIdToken;
+        this.jsonTableRef = jsonTableRef;
+        this.writeDisposition = writeDisposition;
+        this.createDisposition = createDisposition;
+        this.tempTablesView = tempTablesView;
       }
 
       @Override
-      protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        out = Channels.newOutputStream(channel);
+      public void processElement(ProcessContext c) throws Exception {
+        List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
+
+        // Do not copy if not temp tables are provided
+        if (tempTablesJson.size() == 0) {
+          return;
+        }
+
+        List<TableReference> tempTables = Lists.newArrayList();
+        for (String table : tempTablesJson) {
+          tempTables.add(fromJsonString(table, TableReference.class));
+        }
+        copy(
+            bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+            jobIdToken,
+            fromJsonString(jsonTableRef, TableReference.class),
+            tempTables,
+            writeDisposition,
+            createDisposition);
+
+        DatasetService tableService =
+            bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+        removeTemporaryTables(tableService, tempTables);
+      }
+
+      private void copy(
+          JobService jobService,
+          String jobIdPrefix,
+          TableReference ref,
+          List<TableReference> tempTables,
+          WriteDisposition writeDisposition,
+          CreateDisposition createDisposition) throws InterruptedException, IOException {
+        JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy()
+            .setSourceTables(tempTables)
+            .setDestinationTable(ref)
+            .setWriteDisposition(writeDisposition.name())
+            .setCreateDisposition(createDisposition.name());
+
+        String projectId = ref.getProjectId();
+        for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+          String jobId = jobIdPrefix + "-" + i;
+          LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS);
+          JobReference jobRef = new JobReference()
+              .setProjectId(projectId)
+              .setJobId(jobId);
+          jobService.startCopyJob(jobRef, copyConfig);
+          Status jobStatus =
+              parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
+          switch (jobStatus) {
+            case SUCCEEDED:
+              return;
+            case UNKNOWN:
+              throw new RuntimeException("Failed to poll the copy job status of job " + jobId);
+            case FAILED:
+              LOG.info("BigQuery copy job failed: {}", jobId);
+              continue;
+            default:
+              throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
+                  jobStatus, jobId));
+          }
+        }
+        throw new RuntimeException(String.format("Failed to create the copy job %s, reached max "
+            + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS));
+      }
+
+      private void removeTemporaryTables(DatasetService tableService,
+          List<TableReference> tempTables) throws Exception {
+        for (TableReference tableRef : tempTables) {
+          tableService.deleteTable(
+              tableRef.getProjectId(),
+              tableRef.getDatasetId(),
+              tableRef.getTableId());
+        }
       }
 
       @Override
-      public void write(TableRow value) throws Exception {
-        // Use Context.OUTER to encode and NEWLINE as the delimeter.
-        coder.encode(value, out, Context.OUTER);
-        out.write(NEWLINE);
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
+
+        builder
+            .addIfNotNull(DisplayData.item("jobIdToken", jobIdToken)
+                .withLabel("Job ID Token"))
+            .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
+                .withLabel("Table Reference"))
+            .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+                .withLabel("Write Disposition"))
+            .add(DisplayData.item("createDisposition", createDisposition.toString())
+                .withLabel("Create Disposition"));
       }
     }
+
+    /** Disallow construction of utility class. */
+    private Write() {}
   }
 
   private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
@@ -2093,8 +2390,8 @@ public class BigQueryIO {
             TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
             Bigquery client = Transport.newBigQueryClient(options).build();
             BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
-            inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
-                CreateDisposition.CREATE_IF_NEEDED, tableSchema);
+            inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND,
+                Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema);
             createdTables.add(tableSpec);
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 29a335d..0af6df8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.Table;
@@ -83,6 +84,12 @@ interface BigQueryServices extends Serializable {
         throws IOException, InterruptedException;
 
     /**
+     * Start a BigQuery copy job.
+     */
+    void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
+        throws IOException, InterruptedException;
+
+    /**
      * Waits for the job is Done, and returns the job.
      *
      * <p>Returns null if the {@code maxAttempts} retries reached.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index ef17e0f..bd1097f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -39,6 +39,7 @@ import com.google.api.services.bigquery.model.JobConfiguration;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
@@ -124,9 +125,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public void startLoadJob(
@@ -142,9 +143,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
@@ -160,9 +161,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
@@ -175,6 +176,24 @@ class BigQueryServicesImpl implements BigQueryServices {
       startJob(job, errorExtractor, client);
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
+     */
+    @Override
+    public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
+        throws IOException, InterruptedException {
+      Job job = new Job()
+          .setJobReference(jobRef)
+          .setConfiguration(
+              new JobConfiguration().setCopy(copyConfig));
+
+      startJob(job, errorExtractor, client);
+    }
+
     private static void startJob(Job job,
       ApiErrorExtractor errorExtractor,
       Bigquery client) throws IOException, InterruptedException {
@@ -320,9 +339,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public Table getTable(String projectId, String datasetId, String tableId)
@@ -341,9 +360,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public void deleteTable(String projectId, String datasetId, String tableId)
@@ -377,9 +396,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public Dataset getDataset(String projectId, String datasetId)
@@ -398,9 +417,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public void createDataset(
@@ -456,9 +475,9 @@ class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC .
+     * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts.
      */
     @Override
     public void deleteDataset(String projectId, String datasetId)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8db6114e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 7d2df62..1ea1f94 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,14 +26,17 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
@@ -44,6 +47,9 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WritePartition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteRename;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteTables;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.BigQueryOptions;
@@ -58,16 +64,23 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.api.client.util.Data;
 import com.google.api.client.util.Strings;
@@ -76,6 +89,7 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatistics2;
@@ -110,6 +124,9 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -123,6 +140,8 @@ import javax.annotation.Nullable;
 @RunWith(JUnit4.class)
 public class BigQueryIOTest implements Serializable {
 
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+
   // Status.UNKNOWN maps to null
   private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
       Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
@@ -275,6 +294,12 @@ public class BigQueryIOTest implements Serializable {
     }
 
     @Override
+    public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
+        throws IOException, InterruptedException {
+      startJob(jobRef);
+    }
+
+    @Override
     public Job pollJob(JobReference jobRef, int maxAttempts)
         throws InterruptedException {
       if (!Strings.isNullOrEmpty(executingProject)) {
@@ -565,7 +590,8 @@ public class BigQueryIOTest implements Serializable {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(new FakeJobService()
             .startJobReturns("done", "done", "done")
-            .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED));
+            .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED))
+        .withDatasetService(mockDatasetService);
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -584,7 +610,6 @@ public class BigQueryIOTest implements Serializable {
     p.run();
 
     logged.verifyInfo("Starting BigQuery load job");
-    logged.verifyInfo("Previous load jobs failed, retrying.");
     File tempDir = new File(bqOptions.getTempLocation());
     assertEquals(0, tempDir.listFiles(new FileFilter() {
       @Override
@@ -613,7 +638,7 @@ public class BigQueryIOTest implements Serializable {
         .withoutValidation());
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Failed to poll the load job status.");
+    thrown.expectMessage("Failed to poll the load job status");
     p.run();
 
     File tempDir = new File(bqOptions.getTempLocation());
@@ -1228,4 +1253,186 @@ public class BigQueryIOTest implements Serializable {
 
     p.run();
   }
+
+  @Test
+  public void testWritePartitionEmptyData() throws Exception {
+    final long numFiles = 0;
+    final long fileSize = 0;
+
+    // An empty file is created for no input data. One partition is needed.
+    final long expectedNumPartitions = 1;
+    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+  }
+
+  @Test
+  public void testWritePartitionSinglePartition() throws Exception {
+    final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES;
+    final long fileSize = 1;
+
+    // One partition is needed.
+    final long expectedNumPartitions = 1;
+    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+  }
+
+  @Test
+  public void testWritePartitionManyFiles() throws Exception {
+    final long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3;
+    final long fileSize = 1;
+
+    // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
+    final long expectedNumPartitions = 3;
+    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+  }
+
+  @Test
+  public void testWritePartitionLargeFileSize() throws Exception {
+    final long numFiles = 10;
+    final long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3;
+
+    // One partition is needed for each group of three files.
+    final long expectedNumPartitions = 4;
+    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+  }
+
+  private void testWritePartition(long numFiles, long fileSize, long expectedNumPartitions)
+      throws Exception {
+    final List<Long> expectedPartitionIds = Lists.newArrayList();
+    for (long i = 1; i <= expectedNumPartitions; ++i) {
+      expectedPartitionIds.add(i);
+    }
+
+    final List<KV<String, Long>> files = Lists.newArrayList();
+    final List<String> fileNames = Lists.newArrayList();
+    for (int i = 0; i < numFiles; ++i) {
+      String fileName = String.format("files%05d", i);
+      fileNames.add(fileName);
+      files.add(KV.of(fileName, fileSize));
+    }
+
+    TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<Long, List<String>>> singlePartitionTag =
+        new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+    final PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView(
+        TestPipeline.create(),
+        WindowingStrategy.globalDefault(),
+        KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
+
+    WritePartition writePartition =
+        new WritePartition(filesView, multiPartitionsTag, singlePartitionTag);
+
+    DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition);
+    tester.setSideInput(filesView, GlobalWindow.INSTANCE, files);
+    tester.processElement(tmpFolder.getRoot().getAbsolutePath());
+
+    List<KV<Long, List<String>>> partitions;
+    if (expectedNumPartitions > 1) {
+      partitions = tester.takeSideOutputElements(multiPartitionsTag);
+    } else {
+      partitions = tester.takeSideOutputElements(singlePartitionTag);
+    }
+    List<Long> partitionIds = Lists.newArrayList();
+    List<String> partitionFileNames = Lists.newArrayList();
+    for (KV<Long, List<String>> partition : partitions) {
+      partitionIds.add(partition.getKey());
+      for (String name : partition.getValue()) {
+        partitionFileNames.add(name);
+      }
+    }
+
+    assertEquals(expectedPartitionIds, partitionIds);
+    if (numFiles == 0) {
+      assertThat(partitionFileNames, Matchers.hasSize(1));
+      assertTrue(Files.exists(Paths.get(partitionFileNames.get(0))));
+      assertThat(Files.readAllBytes(Paths.get(partitionFileNames.get(0))).length,
+          Matchers.equalTo(0));
+    } else {
+      assertEquals(fileNames, partitionFileNames);
+    }
+  }
+
+  @Test
+  public void testWriteTables() throws Exception {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done", "done", "done")
+            .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED))
+        .withDatasetService(mockDatasetService);
+
+    final long numPartitions = 3;
+    final long numFilesPerPartition = 10;
+    final String jobIdToken = "jobIdToken";
+    final String tempFilePrefix = "tempFilePrefix";
+    final String jsonTable = "{}";
+    final String jsonSchema = "{}";
+    final List<String> expectedTempTables = Lists.newArrayList();
+
+    final List<KV<Long, Iterable<List<String>>>> partitions = Lists.newArrayList();
+    for (long i = 0; i < numPartitions; ++i) {
+      List<String> filesPerPartition = Lists.newArrayList();
+      for (int j = 0; j < numFilesPerPartition; ++j) {
+        filesPerPartition.add(String.format("files%05d", j));
+      }
+      partitions.add(KV.of(i, (Iterable<List<String>>) Collections.singleton(filesPerPartition)));
+      expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
+    }
+
+    WriteTables writeTables = new WriteTables(
+        false,
+        fakeBqServices,
+        jobIdToken,
+        tempFilePrefix,
+        jsonTable,
+        jsonSchema,
+        WriteDisposition.WRITE_EMPTY,
+        CreateDisposition.CREATE_IF_NEEDED);
+
+    DoFnTester<KV<Long, Iterable<List<String>>>, String> tester = DoFnTester.of(writeTables);
+    for (KV<Long, Iterable<List<String>>> partition : partitions) {
+      tester.processElement(partition);
+    }
+
+    List<String> tempTables = tester.takeOutputElements();
+
+    logged.verifyInfo("Starting BigQuery load job");
+
+    assertEquals(expectedTempTables, tempTables);
+  }
+
+  @Test
+  public void testWriteRename() throws Exception {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done")
+            .pollJobReturns(Status.FAILED, Status.SUCCEEDED))
+        .withDatasetService(mockDatasetService);
+
+    final long numTempTables = 3;
+    final String jobIdToken = "jobIdToken";
+    final String jsonTable = "{}";
+    final List<String> tempTables = Lists.newArrayList();
+    for (long i = 0; i < numTempTables; ++i) {
+      tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
+    }
+
+    final PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView(
+        TestPipeline.create(),
+        WindowingStrategy.globalDefault(),
+        StringUtf8Coder.of());
+
+    WriteRename writeRename = new WriteRename(
+        fakeBqServices,
+        jobIdToken,
+        jsonTable,
+        WriteDisposition.WRITE_EMPTY,
+        CreateDisposition.CREATE_IF_NEEDED,
+        tempTablesView);
+
+    DoFnTester<String, Void> tester = DoFnTester.of(writeRename);
+    tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
+    tester.processElement(null);
+
+    logged.verifyInfo("Starting BigQuery copy job");
+  }
 }


Mime
View raw message