beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [38/50] [abbrv] beam git commit: Add client-side throttling.
Date Thu, 13 Jul 2017 03:06:50 GMT
Add client-side throttling.

The approach used is as described in
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
. By backing off individual workers in response to high error rates, we relieve
pressure on the Datastore service, increasing the chance that the workload can
complete successfully.

The exported cumulativeThrottledSeconds could also be used as an autoscaling
signal in future.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1defd14
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1defd14
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1defd14

Branch: refs/heads/DSL_SQL
Commit: f1defd14c943d65e946cda081fe22a872ce6ce07
Parents: 7925a66
Author: Colin Phipps <fipsy@google.com>
Authored: Mon Jun 26 13:34:19 2017 +0000
Committer: Tyler Akidau <takidau@apache.org>
Committed: Wed Jul 12 20:01:02 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/datastore/AdaptiveThrottler.java | 103 +++++++++++++++++
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  25 ++++-
 .../io/gcp/datastore/AdaptiveThrottlerTest.java | 111 +++++++++++++++++++
 3 files changed, 238 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f1defd14/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
new file mode 100644
index 0000000..ce6ebe6
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Random;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.MovingFunction;
+
+
+/**
+ * An implementation of client-side adaptive throttling. See
+ * https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
+ * for a full discussion of the use case and algorithm applied.
+ */
+class AdaptiveThrottler {
+  private final MovingFunction successfulRequests;
+  private final MovingFunction allRequests;
+
+  /** The target ratio between requests sent and successful requests. This is "K" in the
formula in
+   * https://landing.google.com/sre/book/chapters/handling-overload.html */
+  private final double overloadRatio;
+
+  /** The target minimum number of requests per samplePeriodMs, even if no requests succeed.
Must be
+   * greater than 0, else we could throttle to zero. Because every decision is probabilistic,
there
+   * is no guarantee that the request rate in any given interval will not be zero. (This
is the +1
+   * from the formula in https://landing.google.com/sre/book/chapters/handling-overload.html
*/
+  private static final double MIN_REQUESTS = 1;
+  private final Random random;
+
+  /**
+   * @param samplePeriodMs the time window to keep of request history to inform throttling
+   * decisions.
+   * @param sampleUpdateMs the length of buckets within this time window.
+   * @param overloadRatio the target ratio between requests sent and successful requests.
You should
+   * always set this to more than 1, otherwise the client would never try to send more requests
than
+   * succeeded in the past - so it could never recover from temporary setbacks.
+   */
+  public AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
+      double overloadRatio) {
+    this(samplePeriodMs, sampleUpdateMs, overloadRatio, new Random());
+  }
+
+  @VisibleForTesting
+  AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
+      double overloadRatio, Random random) {
+    allRequests =
+        new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, Sum.ofLongs());
+    successfulRequests =
+        new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, Sum.ofLongs());
+    this.overloadRatio = overloadRatio;
+    this.random = random;
+  }
+
+  @VisibleForTesting
+  double throttlingProbability(long nowMsSinceEpoch) {
+    if (!allRequests.isSignificant()) {
+      return 0;
+    }
+    long allRequestsNow = allRequests.get(nowMsSinceEpoch);
+    long successfulRequestsNow = successfulRequests.get(nowMsSinceEpoch);
+    return Math.max(0,
+        (allRequestsNow - overloadRatio * successfulRequestsNow) / (allRequestsNow + MIN_REQUESTS));
+  }
+
+  /**
+   * Call this before sending a request to the remote service; if this returns true, drop
the
+   * request (treating it as a failure or trying it again at a later time).
+   */
+  public boolean throttleRequest(long nowMsSinceEpoch) {
+    double delayProbability = throttlingProbability(nowMsSinceEpoch);
+    // Note that we increment the count of all requests here, even if we return true - so
even if we
+    // tell the client not to send a request at all, it still counts as a failed request.
+    allRequests.add(nowMsSinceEpoch, 1);
+
+    return (random.nextDouble() < delayProbability);
+  }
+
+  /**
+   * Call this after {@link throttleRequest} if your request was successful.
+   */
+  public void successfulRequest(long nowMsSinceEpoch) {
+    successfulRequests.add(nowMsSinceEpoch, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f1defd14/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index e67f4b2..5f65428 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -71,6 +71,8 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -1209,6 +1211,13 @@ public class DatastoreV1 {
     private final List<Mutation> mutations = new ArrayList<>();
     private int mutationsSize = 0;  // Accumulated size of protos in mutations.
     private WriteBatcher writeBatcher;
+    private transient AdaptiveThrottler throttler;
+    private final Counter throttledSeconds =
+      Metrics.counter(DatastoreWriterFn.class, "cumulativeThrottlingSeconds");
+    private final Counter rpcErrors =
+      Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
+    private final Counter rpcSuccesses =
+      Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
 
     private static final int MAX_RETRIES = 5;
     private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1237,6 +1246,10 @@ public class DatastoreV1 {
     public void startBundle(StartBundleContext c) {
       datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(),
localhost);
       writeBatcher.start();
+      if (throttler == null) {
+        // Initialize throttler at first use, because it is not serializable.
+        throttler = new AdaptiveThrottler(120000, 10000, 1.25);
+      }
     }
 
     @ProcessElement
@@ -1284,11 +1297,20 @@ public class DatastoreV1 {
         commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
         long startTime = System.currentTimeMillis(), endTime;
 
+        if (throttler.throttleRequest(startTime)) {
+          LOG.info("Delaying request due to previous failures");
+          throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
+          sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
+          continue;
+        }
+
         try {
           datastore.commit(commitRequest.build());
           endTime = System.currentTimeMillis();
 
           writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
+          throttler.successfulRequest(startTime);
+          rpcSuccesses.inc();
 
           // Break if the commit threw no exception.
           break;
@@ -1300,11 +1322,12 @@ public class DatastoreV1 {
             endTime = System.currentTimeMillis();
             writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
           }
-
           // Only log the code and message for potentially-transient errors. The entire exception
           // will be propagated upon the last retry.
           LOG.error("Error writing batch of {} mutations to Datastore ({}): {}", mutations.size(),
               exception.getCode(), exception.getMessage());
+          rpcErrors.inc();
+
           if (!BackOffUtils.next(sleeper, backoff)) {
             LOG.error("Aborting after {} retries.", MAX_RETRIES);
             throw exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/f1defd14/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
new file mode 100644
index 0000000..c12cf55
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import java.util.Random;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link AdaptiveThrottler}.
+ */
+@RunWith(JUnit4.class)
+public class AdaptiveThrottlerTest {
+
+  static final long START_TIME_MS = 0;
+  static final long SAMPLE_PERIOD_MS = 60000;
+  static final long SAMPLE_BUCKET_MS = 1000;
+  static final double OVERLOAD_RATIO = 2;
+
+  /** Returns a throttler configured with the standard parameters above. */
+  AdaptiveThrottler getThrottler() {
+    return new AdaptiveThrottler(SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO);
+  }
+
+  @Test
+  public void testNoInitialThrottling() throws Exception {
+    AdaptiveThrottler throttler = getThrottler();
+    assertThat(throttler.throttlingProbability(START_TIME_MS), equalTo(0.0));
+    assertThat("first request is not throttled",
+        throttler.throttleRequest(START_TIME_MS), equalTo(false));
+  }
+
+  @Test
+  public void testNoThrottlingIfNoErrors() throws Exception {
+    AdaptiveThrottler throttler = getThrottler();
+    long t = START_TIME_MS;
+    for (; t < START_TIME_MS + 20; t++) {
+      assertFalse(throttler.throttleRequest(t));
+      throttler.successfulRequest(t);
+    }
+    assertThat(throttler.throttlingProbability(t), equalTo(0.0));
+  }
+
+  @Test
+  public void testNoThrottlingAfterErrorsExpire() throws Exception {
+    AdaptiveThrottler throttler = getThrottler();
+    long t = START_TIME_MS;
+    for (; t < START_TIME_MS + SAMPLE_PERIOD_MS; t++) {
+      throttler.throttleRequest(t);
+      // and no successfulRequest.
+    }
+    assertThat("check that we set up a non-zero probability of throttling",
+        throttler.throttlingProbability(t), greaterThan(0.0));
+    for (; t < START_TIME_MS + 2 * SAMPLE_PERIOD_MS; t++) {
+      throttler.throttleRequest(t);
+      throttler.successfulRequest(t);
+    }
+    assertThat(throttler.throttlingProbability(t), equalTo(0.0));
+  }
+
+  @Test
+  public void testThrottlingAfterErrors() throws Exception {
+    Random mockRandom = Mockito.mock(Random.class);
+    Mockito.when(mockRandom.nextDouble()).thenReturn(
+        0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9,
+        0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9);
+    AdaptiveThrottler throttler = new AdaptiveThrottler(
+        SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO, mockRandom);
+    for (int i = 0; i < 20; i++) {
+      boolean throttled = throttler.throttleRequest(START_TIME_MS + i);
+      // 1/3rd of requests succeeding.
+      if (i % 3 == 1) {
+        throttler.successfulRequest(START_TIME_MS + i);
+      }
+
+      // Once we have some history in place, check what throttling happens.
+      if (i >= 10) {
+        // Expect 1/3rd of requests to be throttled. (So 1/3rd throttled, 1/3rd succeeding,
1/3rd
+        // tried and failing).
+        assertThat(String.format("for i=%d", i),
+            throttler.throttlingProbability(START_TIME_MS + i), closeTo(0.33, /*error=*/
0.1));
+        // Requests 10..13 should be throttled, 14..19 not throttled given the mocked random
numbers
+        // that we fed to throttler.
+        assertThat(String.format("for i=%d", i), throttled, equalTo(i < 14));
+      }
+    }
+  }
+}


Mime
View raw message