beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-480] Move insertAll() from BigQueryTableInserter to BigQueryServices
Date Fri, 22 Jul 2016 23:52:55 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 12b60ffa5 -> 122cd0466


[BEAM-480] Move insertAll() from BigQueryTableInserter to BigQueryServices


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf3af5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf3af5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf3af5de

Branch: refs/heads/master
Commit: bf3af5de9e6f8126ffd2ccd3f7a68a84e55e90ff
Parents: 12b60ff
Author: Pei He <peihe@google.com>
Authored: Fri Jul 22 15:03:38 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Jul 22 16:52:49 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  36 ++--
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  13 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 192 +++++++++++++++++++
 .../io/gcp/bigquery/BigQueryTableInserter.java  | 192 -------------------
 .../gcp/bigquery/BigQueryServicesImplTest.java  |  70 +++++++
 .../gcp/bigquery/BigQueryTableInserterTest.java |  64 -------
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   |  40 +---
 7 files changed, 300 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 130d444..76f7079 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1668,11 +1668,13 @@ public class BigQueryIO {
       @Override
       public PDone apply(PCollection<TableRow> input) {
         BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+        BigQueryServices bqServices = getBigQueryServices();
 
         // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
         // and BigQuery's streaming import API.
         if (options.isStreaming() || tableRefFunction != null) {
-          return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
+          return input.apply(
+              new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices));
         }
 
         TableReference table = fromJsonString(jsonTableRef, TableReference.class);
@@ -1693,7 +1695,6 @@ public class BigQueryIO {
               e);
         }
 
-        BigQueryServices bqServices = getBigQueryServices();
         return input.apply("Write", org.apache.beam.sdk.io.Write.to(
             new BigQuerySink(
                 jobIdToken,
@@ -2018,6 +2019,8 @@ public class BigQueryIO {
     /** TableSchema in JSON. Use String to make the class Serializable. */
     private final String jsonTableSchema;
 
+    private final BigQueryServices bqServices;
+
     /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
     private transient Map<String, List<TableRow>> tableRows;
 
@@ -2034,8 +2037,9 @@ public class BigQueryIO {
         createAggregator("ByteCount", new Sum.SumLongFn());
 
     /** Constructor. */
-    StreamingWriteFn(TableSchema schema) {
-      jsonTableSchema = toJsonString(schema);
+    StreamingWriteFn(TableSchema schema, BigQueryServices bqServices) {
+      this.jsonTableSchema = toJsonString(schema);
+      this.bqServices = checkNotNull(bqServices, "bqServices");
     }
 
     /** Prepares a target BigQuery table. */
@@ -2060,11 +2064,10 @@ public class BigQueryIO {
     @Override
     public void finishBundle(Context context) throws Exception {
       BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-      Bigquery client = Transport.newBigQueryClient(options).build();
 
       for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
         TableReference tableReference = getOrCreateTable(options, entry.getKey());
-        flushRows(client, tableReference, entry.getValue(),
+        flushRows(tableReference, entry.getValue(),
             uniqueIdsForTableRows.get(entry.getKey()), options);
       }
       tableRows.clear();
@@ -2100,13 +2103,17 @@ public class BigQueryIO {
       return tableReference;
     }
 
-    /** Writes the accumulated rows into BigQuery with streaming API. */
-    private void flushRows(Bigquery client, TableReference tableReference,
-        List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
{
+    /**
+     * Writes the accumulated rows into BigQuery with streaming API.
+     */
+    private void flushRows(TableReference tableReference,
+        List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
+            throws InterruptedException {
       if (!tableRows.isEmpty()) {
         try {
-          BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
-          inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator);
+          long totalBytes = bqServices.getDatasetService(options).insertAll(
+              tableReference, tableRows, uniqueIds);
+          byteCountAggregator.addValue(totalBytes);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
@@ -2320,14 +2327,17 @@ public class BigQueryIO {
     private final transient TableReference tableReference;
     private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
     private final transient TableSchema tableSchema;
+    private final BigQueryServices bqServices;
 
     /** Constructor. */
     StreamWithDeDup(TableReference tableReference,
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-        TableSchema tableSchema) {
+        TableSchema tableSchema,
+        BigQueryServices bqServices) {
       this.tableReference = tableReference;
       this.tableRefFunction = tableRefFunction;
       this.tableSchema = tableSchema;
+      this.bqServices = checkNotNull(bqServices, "bqServices");
     }
 
     @Override
@@ -2358,7 +2368,7 @@ public class BigQueryIO {
       tagged
           .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
           .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-          .apply(ParDo.of(new StreamingWriteFn(tableSchema)));
+          .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices)));
 
       // Note that the implementation to return PDone here breaks the
       // implicit assumption about the job execution order. If a user

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 3e77362..87887ec 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -32,13 +32,12 @@ import com.google.api.services.bigquery.model.TableRow;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.List;
 import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
 
-/**
- * An interface for real, mock, or fake implementations of Cloud BigQuery services.
- */
+/** An interface for real, mock, or fake implementations of Cloud BigQuery services. */
 interface BigQueryServices extends Serializable {
 
   /**
@@ -140,6 +139,14 @@ interface BigQueryServices extends Serializable {
      */
     void deleteDataset(String projectId, String datasetId)
         throws IOException, InterruptedException;
+
+    /**
+     * Inserts {@link TableRow TableRows} with the specified insertIds if not null.
+     *
+     * Returns the total bytes count of {@link TableRow TableRows}.
+     */
+    long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String>
insertIdList)
+        throws IOException, InterruptedException;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 414baae..ef17e0f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -17,8 +17,13 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
 import org.apache.beam.sdk.util.Transport;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
@@ -38,6 +43,8 @@ import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableDataList;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -48,7 +55,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
@@ -254,12 +268,53 @@ class BigQueryServicesImpl implements BigQueryServices {
 
   @VisibleForTesting
   static class DatasetServiceImpl implements DatasetService {
+    // Approximate amount of table data to upload per InsertAll request.
+    private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
+
+    // The maximum number of rows to upload per InsertAll request.
+    private static final long MAX_ROWS_PER_BATCH = 500;
+
+    // The maximum number of times to retry inserting rows into BigQuery.
+    private static final int MAX_INSERT_ATTEMPTS = 5;
+
+    // The initial backoff after a failure inserting rows into BigQuery.
+    private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
+
+    // Backoff time bounds for rate limit exceeded errors.
+    private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
+    private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
+
     private final ApiErrorExtractor errorExtractor;
     private final Bigquery client;
+    private final PipelineOptions options;
+    private final long maxRowsPerBatch;
+
+    private ExecutorService executor;
+
+    @VisibleForTesting
+    DatasetServiceImpl(Bigquery client, PipelineOptions options) {
+      this.errorExtractor = new ApiErrorExtractor();
+      this.client = client;
+      this.options = options;
+      this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+      this.executor = null;
+    }
+
+    @VisibleForTesting
+    DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) {
+      this.errorExtractor = new ApiErrorExtractor();
+      this.client = client;
+      this.options = options;
+      this.maxRowsPerBatch = maxRowsPerBatch;
+      this.executor = null;
+    }
 
     private DatasetServiceImpl(BigQueryOptions bqOptions) {
       this.errorExtractor = new ApiErrorExtractor();
       this.client = Transport.newBigQueryClient(bqOptions).build();
+      this.options = bqOptions;
+      this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+      this.executor = null;
     }
 
     /**
@@ -418,6 +473,143 @@ class BigQueryServicesImpl implements BigQueryServices {
           Sleeper.DEFAULT,
           backoff);
     }
+
+    @Override
+    public long insertAll(
+        TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+        throws IOException, InterruptedException {
+      checkNotNull(ref, "ref");
+      if (executor == null) {
+        this.executor = options.as(GcsOptions.class).getExecutorService();
+      }
+      if (insertIdList != null && rowList.size() != insertIdList.size()) {
+        throw new AssertionError("If insertIdList is not null it needs to have at least "
+            + "as many elements as rowList");
+      }
+
+      AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
+          MAX_INSERT_ATTEMPTS,
+          INITIAL_INSERT_BACKOFF_INTERVAL_MS);
+
+      long retTotalDataSize = 0;
+      List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
+      // These lists contain the rows to publish. Initially the contain the entire list.
+      // If there are failures, they will contain only the failed rows to be retried.
+      List<TableRow> rowsToPublish = rowList;
+      List<String> idsToPublish = insertIdList;
+      while (true) {
+        List<TableRow> retryRows = new ArrayList<>();
+        List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>()
: null;
+
+        int strideIndex = 0;
+        // Upload in batches.
+        List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
+        int dataSize = 0;
+
+        List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures
= new ArrayList<>();
+        List<Integer> strideIndices = new ArrayList<>();
+
+        for (int i = 0; i < rowsToPublish.size(); ++i) {
+          TableRow row = rowsToPublish.get(i);
+          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+          if (idsToPublish != null) {
+            out.setInsertId(idsToPublish.get(i));
+          }
+          out.setJson(row.getUnknownKeys());
+          rows.add(out);
+
+          dataSize += row.toString().length();
+          if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
+              || i == rowsToPublish.size() - 1) {
+            TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+            content.setRows(rows);
+
+            final Bigquery.Tabledata.InsertAll insert = client.tabledata()
+                .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
+                    content);
+
+            futures.add(
+                executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>()
{
+                  @Override
+                  public List<TableDataInsertAllResponse.InsertErrors> call() throws
IOException {
+                    BackOff backoff = new IntervalBoundedExponentialBackOff(
+                        MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
+                    while (true) {
+                      try {
+                        return insert.execute().getInsertErrors();
+                      } catch (IOException e) {
+                        if (new ApiErrorExtractor().rateLimited(e)) {
+                          LOG.info("BigQuery insertAll exceeded rate limit, retrying");
+                          try {
+                            Thread.sleep(backoff.nextBackOffMillis());
+                          } catch (InterruptedException interrupted) {
+                            throw new IOException(
+                                "Interrupted while waiting before retrying insertAll");
+                          }
+                        } else {
+                          throw e;
+                        }
+                      }
+                    }
+                  }
+                }));
+            strideIndices.add(strideIndex);
+
+            retTotalDataSize += dataSize;
+
+            dataSize = 0;
+            strideIndex = i + 1;
+            rows = new LinkedList<>();
+          }
+        }
+
+        try {
+          for (int i = 0; i < futures.size(); i++) {
+            List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
+            if (errors != null) {
+              for (TableDataInsertAllResponse.InsertErrors error : errors) {
+                allErrors.add(error);
+                if (error.getIndex() == null) {
+                  throw new IOException("Insert failed: " + allErrors);
+                }
+
+                int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
+                retryRows.add(rowsToPublish.get(errorIndex));
+                if (retryIds != null) {
+                  retryIds.add(idsToPublish.get(errorIndex));
+                }
+              }
+            }
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while inserting " + rowsToPublish);
+        } catch (ExecutionException e) {
+          throw new RuntimeException(e.getCause());
+        }
+
+        if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
+          try {
+            Thread.sleep(backoff.nextBackOffMillis());
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException(
+                "Interrupted while waiting before retrying insert of " + retryRows);
+          }
+          LOG.info("Retrying failed inserts to BigQuery");
+          rowsToPublish = retryRows;
+          idsToPublish = retryIds;
+          allErrors.clear();
+        } else {
+          break;
+        }
+      }
+      if (!allErrors.isEmpty()) {
+        throw new IOException("Insert failed: " + allErrors);
+      } else {
+        return retTotalDataSize;
+      }
+    }
   }
 
   private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
index 00a4fa3..bf038f5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
@@ -17,15 +17,9 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
 
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
@@ -33,11 +27,8 @@ import com.google.api.client.util.ExponentialBackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableDataList;
 import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
@@ -46,13 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
@@ -63,26 +47,7 @@ import javax.annotation.Nullable;
 class BigQueryTableInserter {
   private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
 
-  // Approximate amount of table data to upload per InsertAll request.
-  private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
-
-  // The maximum number of rows to upload per InsertAll request.
-  private static final long MAX_ROWS_PER_BATCH = 500;
-
-  // The maximum number of times to retry inserting rows into BigQuery.
-  private static final int MAX_INSERT_ATTEMPTS = 5;
-
-  // The initial backoff after a failure inserting rows into BigQuery.
-  private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
-
-  // Backoff time bounds for rate limit exceeded errors.
-  private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
-  private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
-
   private final Bigquery client;
-  private final long maxRowsPerBatch;
-
-  private ExecutorService executor;
 
   /**
    * Constructs a new row inserter.
@@ -92,163 +57,6 @@ class BigQueryTableInserter {
    */
   BigQueryTableInserter(Bigquery client, PipelineOptions options) {
     this.client = client;
-    this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
-    this.executor = options.as(GcsOptions.class).getExecutorService();
-  }
-
-  /**
-   * Constructs a new row inserter.
-   *
-   * @param client a BigQuery client
-   * @param options a PipelineOptions object
-   * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
-   */
-  BigQueryTableInserter(Bigquery client, PipelineOptions options,
-                               int maxRowsPerBatch) {
-    this.client = client;
-    this.maxRowsPerBatch = maxRowsPerBatch;
-    this.executor = options.as(GcsOptions.class).getExecutorService();
-  }
-
-  /**
-   * Insert all rows from the given list.
-   */
-  void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
-    insertAll(ref, rowList, null, null);
-  }
-
-  /**
-   * Insert all rows from the given list using specified insertIds if not null. Track count
of
-   * bytes written with the Aggregator.
-   */
-  void insertAll(TableReference ref, List<TableRow> rowList,
-      @Nullable List<String> insertIdList, @Nullable Aggregator<Long, Long> byteCountAggregator)
-      throws IOException {
-    checkNotNull(ref, "ref");
-    if (insertIdList != null && rowList.size() != insertIdList.size()) {
-      throw new AssertionError("If insertIdList is not null it needs to have at least "
-          + "as many elements as rowList");
-    }
-
-    AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
-        MAX_INSERT_ATTEMPTS,
-        INITIAL_INSERT_BACKOFF_INTERVAL_MS);
-
-    List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
-    // These lists contain the rows to publish. Initially the contain the entire list. If
there are
-    // failures, they will contain only the failed rows to be retried.
-    List<TableRow> rowsToPublish = rowList;
-    List<String> idsToPublish = insertIdList;
-    while (true) {
-      List<TableRow> retryRows = new ArrayList<>();
-      List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>()
: null;
-
-      int strideIndex = 0;
-      // Upload in batches.
-      List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
-      int dataSize = 0;
-
-      List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures
= new ArrayList<>();
-      List<Integer> strideIndices = new ArrayList<>();
-
-      for (int i = 0; i < rowsToPublish.size(); ++i) {
-        TableRow row = rowsToPublish.get(i);
-        TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-        if (idsToPublish != null) {
-          out.setInsertId(idsToPublish.get(i));
-        }
-        out.setJson(row.getUnknownKeys());
-        rows.add(out);
-
-        dataSize += row.toString().length();
-        if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
-            || i == rowsToPublish.size() - 1) {
-          TableDataInsertAllRequest content = new TableDataInsertAllRequest();
-          content.setRows(rows);
-
-          final Bigquery.Tabledata.InsertAll insert = client.tabledata()
-              .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
-                  content);
-
-          futures.add(
-              executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>()
{
-                @Override
-                public List<TableDataInsertAllResponse.InsertErrors> call() throws
IOException {
-                  BackOff backoff = new IntervalBoundedExponentialBackOff(
-                      MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
-                  while (true) {
-                    try {
-                      return insert.execute().getInsertErrors();
-                    } catch (IOException e) {
-                      if (new ApiErrorExtractor().rateLimited(e)) {
-                        LOG.info("BigQuery insertAll exceeded rate limit, retrying");
-                        try {
-                          Thread.sleep(backoff.nextBackOffMillis());
-                        } catch (InterruptedException interrupted) {
-                          throw new IOException(
-                              "Interrupted while waiting before retrying insertAll");
-                        }
-                      } else {
-                        throw e;
-                      }
-                    }
-                  }
-                }
-              }));
-          strideIndices.add(strideIndex);
-
-          if (byteCountAggregator != null) {
-            byteCountAggregator.addValue((long) dataSize);
-          }
-          dataSize = 0;
-          strideIndex = i + 1;
-          rows = new LinkedList<>();
-        }
-      }
-
-      try {
-        for (int i = 0; i < futures.size(); i++) {
-          List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
-          if (errors != null) {
-            for (TableDataInsertAllResponse.InsertErrors error : errors) {
-              allErrors.add(error);
-              if (error.getIndex() == null) {
-                throw new IOException("Insert failed: " + allErrors);
-              }
-
-              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
-              retryRows.add(rowsToPublish.get(errorIndex));
-              if (retryIds != null) {
-                retryIds.add(idsToPublish.get(errorIndex));
-              }
-            }
-          }
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Interrupted while inserting " + rowsToPublish);
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e.getCause());
-      }
-
-      if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
-        try {
-          Thread.sleep(backoff.nextBackOffMillis());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while waiting before retrying insert of " +
retryRows);
-        }
-        LOG.info("Retrying failed inserts to BigQuery");
-        rowsToPublish = retryRows;
-        idsToPublish = retryIds;
-        allErrors.clear();
-      } else {
-        break;
-      }
-    }
-    if (!allErrors.isEmpty()) {
-      throw new IOException("Insert failed: " + allErrors);
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 2cdf511..686685b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -18,11 +18,14 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
 import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
@@ -32,6 +35,7 @@ import org.apache.beam.sdk.util.Transport;
 import com.google.api.client.googleapis.json.GoogleJsonError;
 import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
 import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.http.LowLevelHttpResponse;
 import com.google.api.client.json.GenericJson;
 import com.google.api.client.json.Json;
@@ -46,6 +50,9 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.collect.ImmutableList;
 
@@ -61,6 +68,8 @@ import org.mockito.MockitoAnnotations;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Tests for {@link BigQueryServicesImpl}.
@@ -267,6 +276,67 @@ public class BigQueryServicesImplTest {
     verify(response, times(1)).getContentType();
   }
 
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} retries quota rate limited attempts.
+   */
+  @Test
+  public void testInsertRetry() throws Exception {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = new ArrayList<>();
+    rows.add(new TableRow());
+
+    // First response is 403 rate limited, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    dataService.insertAll(ref, rows, null);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
+  }
+
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} does not retry non-rate-limited attempts.
+   */
+  @Test
+  public void testInsertDoesNotRetry() throws Throwable {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = new ArrayList<>();
+    rows.add(new TableRow());
+
+    // First response is 403 not-rate-limited, second response has valid payload but should
not
+    // be invoked.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
+        .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+    thrown.expect(GoogleJsonResponseException.class);
+    thrown.expectMessage("actually forbidden");
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    try {
+      dataService.insertAll(ref, rows, null);
+      fail();
+    } catch (RuntimeException e) {
+      verify(response, times(1)).getStatusCode();
+      verify(response, times(1)).getContent();
+      verify(response, times(1)).getContentType();
+      throw e.getCause();
+    }
+  }
+
   /** A helper to wrap a {@link GenericJson} object in a content stream. */
   private static InputStream toStream(GenericJson content) throws IOException {
     return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
index c29da91..dac3911 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
@@ -48,9 +48,7 @@ import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.hadoop.util.RetryBoundedBackOff;
 import com.google.common.collect.ImmutableList;
 
@@ -67,8 +65,6 @@ import org.mockito.MockitoAnnotations;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * Tests of {@link BigQueryTableInserter}.
@@ -250,64 +246,4 @@ public class BigQueryTableInserterTest {
       throw e;
     }
   }
-
-  /**
-   * Tests that {@link BigQueryTableInserter#insertAll} retries quota rate limited attempts.
-   */
-  @Test
-  public void testInsertRetry() throws IOException {
-    TableReference ref =
-        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = new ArrayList<>();
-    rows.add(new TableRow());
-
-    // First response is 403 rate limited, second response has valid payload.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
-        .thenReturn(toStream(new TableDataInsertAllResponse()));
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-
-    inserter.insertAll(ref, rows);
-    verify(response, times(2)).getStatusCode();
-    verify(response, times(2)).getContent();
-    verify(response, times(2)).getContentType();
-    expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts.
-   */
-  @Test
-  public void testInsertDoesNotRetry() throws Throwable {
-    TableReference ref =
-        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = new ArrayList<>();
-    rows.add(new TableRow());
-
-    // First response is 403 not-rate-limited, second response has valid payload but should
not
-    // be invoked.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
-        .thenReturn(toStream(new TableDataInsertAllResponse()));
-
-    thrown.expect(GoogleJsonResponseException.class);
-    thrown.expectMessage("actually forbidden");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-
-    try {
-      inserter.insertAll(ref, rows);
-      fail();
-    } catch (RuntimeException e) {
-      verify(response, times(1)).getStatusCode();
-      verify(response, times(1)).getContent();
-      verify(response, times(1)).getContentType();
-      throw e.getCause();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf3af5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index e0c353b..89284df 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -30,11 +30,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
 
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Table;
@@ -435,7 +433,7 @@ public class BigQueryUtilTest {
 
     TableReference ref = BigQueryIO
         .parseTableSpec("project:dataset.table");
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5);
+    DatasetServiceImpl datasetService = new DatasetServiceImpl(mockClient, options, 5);
 
     List<TableRow> rows = new ArrayList<>();
     List<String> ids = new ArrayList<>();
@@ -444,41 +442,13 @@ public class BigQueryUtilTest {
       ids.add(new String());
     }
 
-    InMemoryLongSumAggregator byteCountAggregator = new InMemoryLongSumAggregator("ByteCount");
+    long totalBytes = 0;
     try {
-      inserter.insertAll(ref, rows, ids, byteCountAggregator);
+      totalBytes = datasetService.insertAll(ref, rows, ids);
     } finally {
       verifyInsertAll(5);
       // Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"
-      assertEquals("Incorrect byte count", 25L * 23L, byteCountAggregator.getSum());
-    }
-  }
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long>
{
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
-    }
-
-    public long getSum() {
-      return sum;
+      assertEquals("Incorrect byte count", 25L * 23L, totalBytes);
     }
   }
 }



Mime
View raw message