beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: Add client-side throttling.
Date Tue, 11 Jul 2017 16:40:25 GMT
Repository: beam
Updated Branches:
  refs/heads/master 138641f14 -> a22f1a05a


Add client-side throttling.

The approach used is as described in
https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
. By backing off individual workers in response to high error rates, we relieve
pressure on the Datastore service, increasing the chance that the workload can
complete successfully.

The exported cumulativeThrottledSeconds could also be used as an autoscaling
signal in future.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/435cbcfa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/435cbcfa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/435cbcfa

Branch: refs/heads/master
Commit: 435cbcfae67c3a2bc8a72437fbdf7350fe5ac10a
Parents: 138641f
Author: Colin Phipps <fipsy@google.com>
Authored: Mon Jun 26 13:34:19 2017 +0000
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue Jul 11 09:40:11 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/datastore/AdaptiveThrottler.java | 103 +++++++++++++++++
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  25 ++++-
 .../io/gcp/datastore/AdaptiveThrottlerTest.java | 111 +++++++++++++++++++
 3 files changed, 238 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/435cbcfa/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
new file mode 100644
index 0000000..ce6ebe6
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Random;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.util.MovingFunction;
+
+
+/**
+ * An implementation of client-side adaptive throttling. See
+ * https://landing.google.com/sre/book/chapters/handling-overload.html#client-side-throttling-a7sYUg
+ * for a full discussion of the use case and algorithm applied.
+ */
+class AdaptiveThrottler {
+  private final MovingFunction successfulRequests;
+  private final MovingFunction allRequests;
+
+  /** The target ratio between requests sent and successful requests. This is "K" in the
formula in
+   * https://landing.google.com/sre/book/chapters/handling-overload.html */
+  private final double overloadRatio;
+
+  /** The target minimum number of requests per samplePeriodMs, even if no requests succeed.
Must be
+   * greater than 0, else we could throttle to zero. Because every decision is probabilistic,
there
+   * is no guarantee that the request rate in any given interval will not be zero. (This
is the +1
+   * from the formula in https://landing.google.com/sre/book/chapters/handling-overload.html
*/
+  private static final double MIN_REQUESTS = 1;
+  private final Random random;
+
+  /**
+   * @param samplePeriodMs the time window to keep of request history to inform throttling
+   * decisions.
+   * @param sampleUpdateMs the length of buckets within this time window.
+   * @param overloadRatio the target ratio between requests sent and successful requests.
You should
+   * always set this to more than 1, otherwise the client would never try to send more requests
than
+   * succeeded in the past - so it could never recover from temporary setbacks.
+   */
+  public AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
+      double overloadRatio) {
+    this(samplePeriodMs, sampleUpdateMs, overloadRatio, new Random());
+  }
+
+  @VisibleForTesting
+  AdaptiveThrottler(long samplePeriodMs, long sampleUpdateMs,
+      double overloadRatio, Random random) {
+    allRequests =
+        new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, Sum.ofLongs());
+    successfulRequests =
+        new MovingFunction(samplePeriodMs, sampleUpdateMs,
+        1 /* numSignificantBuckets */, 1 /* numSignificantSamples */, Sum.ofLongs());
+    this.overloadRatio = overloadRatio;
+    this.random = random;
+  }
+
+  @VisibleForTesting
+  double throttlingProbability(long nowMsSinceEpoch) {
+    if (!allRequests.isSignificant()) {
+      return 0;
+    }
+    long allRequestsNow = allRequests.get(nowMsSinceEpoch);
+    long successfulRequestsNow = successfulRequests.get(nowMsSinceEpoch);
+    return Math.max(0,
+        (allRequestsNow - overloadRatio * successfulRequestsNow) / (allRequestsNow + MIN_REQUESTS));
+  }
+
+  /**
+   * Call this before sending a request to the remote service; if this returns true, drop
the
+   * request (treating it as a failure or trying it again at a later time).
+   */
+  public boolean throttleRequest(long nowMsSinceEpoch) {
+    double delayProbability = throttlingProbability(nowMsSinceEpoch);
+    // Note that we increment the count of all requests here, even if we return true - so
even if we
+    // tell the client not to send a request at all, it still counts as a failed request.
+    allRequests.add(nowMsSinceEpoch, 1);
+
+    return (random.nextDouble() < delayProbability);
+  }
+
+  /**
+   * Call this after {@link throttleRequest} if your request was successful.
+   */
+  public void successfulRequest(long nowMsSinceEpoch) {
+    successfulRequests.add(nowMsSinceEpoch, 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/435cbcfa/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index e67f4b2..5f65428 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -71,6 +71,8 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -1209,6 +1211,13 @@ public class DatastoreV1 {
     private final List<Mutation> mutations = new ArrayList<>();
     private int mutationsSize = 0;  // Accumulated size of protos in mutations.
     private WriteBatcher writeBatcher;
+    private transient AdaptiveThrottler throttler;
+    private final Counter throttledSeconds =
+      Metrics.counter(DatastoreWriterFn.class, "cumulativeThrottlingSeconds");
+    private final Counter rpcErrors =
+      Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
+    private final Counter rpcSuccesses =
+      Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
 
     private static final int MAX_RETRIES = 5;
     private static final FluentBackoff BUNDLE_WRITE_BACKOFF =
@@ -1237,6 +1246,10 @@ public class DatastoreV1 {
     public void startBundle(StartBundleContext c) {
       datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId.get(),
localhost);
       writeBatcher.start();
+      if (throttler == null) {
+        // Initialize throttler at first use, because it is not serializable.
+        throttler = new AdaptiveThrottler(120000, 10000, 1.25);
+      }
     }
 
     @ProcessElement
@@ -1284,11 +1297,20 @@ public class DatastoreV1 {
         commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
         long startTime = System.currentTimeMillis(), endTime;
 
+        if (throttler.throttleRequest(startTime)) {
+          LOG.info("Delaying request due to previous failures");
+          throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
+          sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
+          continue;
+        }
+
         try {
           datastore.commit(commitRequest.build());
           endTime = System.currentTimeMillis();
 
           writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
+          throttler.successfulRequest(startTime);
+          rpcSuccesses.inc();
 
           // Break if the commit threw no exception.
           break;
@@ -1300,11 +1322,12 @@ public class DatastoreV1 {
             endTime = System.currentTimeMillis();
             writeBatcher.addRequestLatency(endTime, endTime - startTime, mutations.size());
           }
-
           // Only log the code and message for potentially-transient errors. The entire exception
           // will be propagated upon the last retry.
           LOG.error("Error writing batch of {} mutations to Datastore ({}): {}", mutations.size(),
               exception.getCode(), exception.getMessage());
+          rpcErrors.inc();
+
           if (!BackOffUtils.next(sleeper, backoff)) {
             LOG.error("Aborting after {} retries.", MAX_RETRIES);
             throw exception;

http://git-wip-us.apache.org/repos/asf/beam/blob/435cbcfa/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
new file mode 100644
index 0000000..c12cf55
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/AdaptiveThrottlerTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+import java.util.Random;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link AdaptiveThrottler}.
+ */
+@RunWith(JUnit4.class)
+public class AdaptiveThrottlerTest {
+
+  static final long START_TIME_MS = 0;
+  static final long SAMPLE_PERIOD_MS = 60000;
+  static final long SAMPLE_BUCKET_MS = 1000;
+  static final double OVERLOAD_RATIO = 2;
+
+  /** Returns a throttler configured with the standard parameters above. */
+  AdaptiveThrottler getThrottler() {
+    return new AdaptiveThrottler(SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO);
+  }
+
+  @Test
+  public void testNoInitialThrottling() throws Exception {
+    AdaptiveThrottler throttler = getThrottler();
+    assertThat(throttler.throttlingProbability(START_TIME_MS), equalTo(0.0));
+    assertThat("first request is not throttled",
+        throttler.throttleRequest(START_TIME_MS), equalTo(false));
+  }
+
+  @Test
+  public void testNoThrottlingIfNoErrors() throws Exception {
+    AdaptiveThrottler throttler = getThrottler();
+    long t = START_TIME_MS;
+    for (; t < START_TIME_MS + 20; t++) {
+      assertFalse(throttler.throttleRequest(t));
+      throttler.successfulRequest(t);
+    }
+    assertThat(throttler.throttlingProbability(t), equalTo(0.0));
+  }
+
+  @Test
+  public void testNoThrottlingAfterErrorsExpire() throws Exception {
+    AdaptiveThrottler throttler = getThrottler();
+    long t = START_TIME_MS;
+    for (; t < START_TIME_MS + SAMPLE_PERIOD_MS; t++) {
+      throttler.throttleRequest(t);
+      // and no successfulRequest.
+    }
+    assertThat("check that we set up a non-zero probability of throttling",
+        throttler.throttlingProbability(t), greaterThan(0.0));
+    for (; t < START_TIME_MS + 2 * SAMPLE_PERIOD_MS; t++) {
+      throttler.throttleRequest(t);
+      throttler.successfulRequest(t);
+    }
+    assertThat(throttler.throttlingProbability(t), equalTo(0.0));
+  }
+
+  @Test
+  public void testThrottlingAfterErrors() throws Exception {
+    Random mockRandom = Mockito.mock(Random.class);
+    Mockito.when(mockRandom.nextDouble()).thenReturn(
+        0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9,
+        0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9);
+    AdaptiveThrottler throttler = new AdaptiveThrottler(
+        SAMPLE_PERIOD_MS, SAMPLE_BUCKET_MS, OVERLOAD_RATIO, mockRandom);
+    for (int i = 0; i < 20; i++) {
+      boolean throttled = throttler.throttleRequest(START_TIME_MS + i);
+      // 1/3rd of requests succeeding.
+      if (i % 3 == 1) {
+        throttler.successfulRequest(START_TIME_MS + i);
+      }
+
+      // Once we have some history in place, check what throttling happens.
+      if (i >= 10) {
+        // Expect 1/3rd of requests to be throttled. (So 1/3rd throttled, 1/3rd succeeding,
1/3rd
+        // tried and failing).
+        assertThat(String.format("for i=%d", i),
+            throttler.throttlingProbability(START_TIME_MS + i), closeTo(0.33, /*error=*/
0.1));
+        // Requests 10..13 should be throttled, 14..19 not throttled given the mocked random
numbers
+        // that we fed to throttler.
+        assertThat(String.format("for i=%d", i), throttled, equalTo(i < 14));
+      }
+    }
+  }
+}


Mime
View raw message