beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Add backoff if insertAll hits rate limits in BQTableInserter
Date Mon, 04 Apr 2016 18:24:56 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master f236db027 -> fd049b52e


Add backoff if insertAll hits rate limits in BQTableInserter


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/991d6eb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/991d6eb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/991d6eb0

Branch: refs/heads/master
Commit: 991d6eb02fcf0691c33d92102821c0a322cd4e1a
Parents: f236db0
Author: Daniel Mills <millsd@google.com>
Authored: Thu Mar 31 11:10:53 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Apr 4 11:24:23 2016 -0700

----------------------------------------------------------------------
 .../sdk/util/BigQueryTableInserter.java         | 24 +++++++-
 .../util/IntervalBoundedExponentialBackOff.java |  2 +-
 .../sdk/util/BigQueryTableInserterTest.java     | 64 ++++++++++++++++++++
 3 files changed, 88 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/991d6eb0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
index 7f82e96..91a0c9e 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java
@@ -74,6 +74,10 @@ public class BigQueryTableInserter {
   // The initial backoff after a failure inserting rows into BigQuery.
   private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
 
+  // Backoff time bounds for rate limit exceeded errors.
+  private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
+  private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
+
   private final Bigquery client;
   private final TableReference defaultRef;
   private final long maxRowsPerBatch;
@@ -216,7 +220,25 @@ public class BigQueryTableInserter {
               executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>()
{
                 @Override
                 public List<TableDataInsertAllResponse.InsertErrors> call() throws
IOException {
-                  return insert.execute().getInsertErrors();
+                  BackOff backoff = new IntervalBoundedExponentialBackOff(
+                      MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
+                  while (true) {
+                    try {
+                      return insert.execute().getInsertErrors();
+                    } catch (IOException e) {
+                      if (new ApiErrorExtractor().rateLimited(e)) {
+                        LOG.info("BigQuery insertAll exceeded rate limit, retrying");
+                        try {
+                          Thread.sleep(backoff.nextBackOffMillis());
+                        } catch (InterruptedException interrupted) {
+                          throw new IOException(
+                              "Interrupted while waiting before retrying insertAll");
+                        }
+                      } else {
+                        throw e;
+                      }
+                    }
+                  }
                 }
               }));
           strideIndices.add(strideIndex);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/991d6eb0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java
index 6fcc784..51ff9f2 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java
@@ -54,7 +54,7 @@ public class IntervalBoundedExponentialBackOff implements BackOff {
   private final long initialIntervalMillis;
   private int currentAttempt;
 
-  public IntervalBoundedExponentialBackOff(int maximumIntervalMillis, long initialIntervalMillis)
{
+  public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis)
{
     Preconditions.checkArgument(
         maximumIntervalMillis > 0, "Maximum interval must be greater than zero.");
     Preconditions.checkArgument(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/991d6eb0/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java
index 954eaf9..ea29e5d 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java
@@ -41,7 +41,9 @@ import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
 import com.google.cloud.hadoop.util.RetryBoundedBackOff;
 import com.google.common.collect.ImmutableList;
@@ -59,6 +61,8 @@ import org.mockito.MockitoAnnotations;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Tests of {@link BigQueryTableInserter}.
@@ -237,4 +241,64 @@ public class BigQueryTableInserterTest {
       throw e;
     }
   }
+
+  /**
+   * Tests that {@link BigQueryTableInserter#insertAll} retries quota rate limited attempts.
+   */
+  @Test
+  public void testInsertRetry() throws IOException {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = new ArrayList<>();
+    rows.add(new TableRow());
+
+    // First response is 403 rate limited, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+
+    inserter.insertAll(ref, rows);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
+  }
+
+  /**
+   * Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts.
+   */
+  @Test
+  public void testInsertDoesNotRetry() throws Throwable {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = new ArrayList<>();
+    rows.add(new TableRow());
+
+    // First response is 403 not-rate-limited, second response has valid payload but should
not
+    // be invoked.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
+        .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+    thrown.expect(GoogleJsonResponseException.class);
+    thrown.expectMessage("actually forbidden");
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
+
+    try {
+      inserter.insertAll(ref, rows);
+      fail();
+    } catch (RuntimeException e) {
+      verify(response, times(1)).getStatusCode();
+      verify(response, times(1)).getContent();
+      verify(response, times(1)).getContentType();
+      throw e.getCause();
+    }
+  }
 }


Mime
View raw message