gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/2] incubator-gobblin git commit: Allow disabling global throttling. Fix a race condition in BatchedPermitsRequester.
Date Sat, 29 Jul 2017 08:03:30 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 725a0829d -> a080ad843


Allow disabling global throttling. Fix a race condition in BatchedPermitsRequester.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/82b5b2d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/82b5b2d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/82b5b2d1

Branch: refs/heads/master
Commit: 82b5b2d12d86ca8819a678bf3403495bfd2683e7
Parents: 5459609
Author: ibuenros <issac.buenrostro@gmail.com>
Authored: Wed May 17 10:18:41 2017 -0700
Committer: ibuenros <issac.buenrostro@gmail.com>
Committed: Wed May 17 10:18:41 2017 -0700

----------------------------------------------------------------------
 .../util/limiter/BatchedPermitsRequester.java   |  11 +-
 .../RedirectAwareRestClientRequestSender.java   |   5 +-
 .../util/limiter/RestliLimiterFactoryTest.java  | 121 +++++++++++++++++++
 .../limiter/broker/SharedLimiterFactory.java    |  14 ++-
 4 files changed, 146 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
index 2e6e97e..9ea8c50 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/BatchedPermitsRequester.java
@@ -105,6 +105,7 @@ class BatchedPermitsRequester {
   private final RetryStatus retryStatus;
   private final SynchronizedAverager permitsOutstanding;
   private final long targetMillisBetweenRequests;
+  private final AtomicLong callbackCounter;
 
   @Builder
   private BatchedPermitsRequester(String resourceId, String requestorIdentifier,
@@ -131,6 +132,7 @@ class BatchedPermitsRequester {
 
     this.restRequestTimer = metricContext == null ? null : metricContext.timer(REST_REQUEST_TIMER);
     this.restRequestHistogram = metricContext == null ? null : metricContext.histogram(REST_REQUEST_PERMITS_HISTOGRAM);
+    this.callbackCounter = new AtomicLong();
   }
 
   /**
@@ -150,8 +152,13 @@ class BatchedPermitsRequester {
           return true;
         }
         if (this.retryStatus.canRetryWithinMillis(10000)) {
+          long callbackCounterSnap = this.callbackCounter.get();
           maybeSendNewPermitRequest();
-          this.newPermitsAvailable.await();
+          if (this.callbackCounter.get() == callbackCounterSnap) {
+            // If a callback has happened since we tried to send the new permit request,
don't await
+            // Since some request senders may be synchronous, we would have missed the notification
+            this.newPermitsAvailable.await();
+          }
         } else {
           break;
         }
@@ -279,6 +286,7 @@ class BatchedPermitsRequester {
     @Override
     public void onSuccess(Response<PermitAllocation> result) {
       BatchedPermitsRequester.this.retries = 0;
+      BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
       BatchedPermitsRequester.this.lock.lock();
       try {
         PermitAllocation allocation = result.getEntity();
@@ -309,6 +317,7 @@ class BatchedPermitsRequester {
 
     private void nonRetriableFail(Throwable exc, String msg) {
       BatchedPermitsRequester.this.retryStatus.blockRetries(RETRY_DELAY_ON_NON_RETRIABLE_EXCEPTION,
exc);
+      BatchedPermitsRequester.this.callbackCounter.incrementAndGet();
       BatchedPermitsRequester.this.requestSemaphore.release();
       log.error(msg, exc);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
index 752c933..f7bd631 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/main/java/gobblin/util/limiter/RedirectAwareRestClientRequestSender.java
@@ -61,14 +61,14 @@ public class RedirectAwareRestClientRequestSender extends RestClientRequestSende
    * A {@link SharedResourceFactory} that creates {@link RedirectAwareRestClientRequestSender}s.
    * @param <S>
    */
-  public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RedirectAwareRestClientRequestSender,
SharedRestClientKey, S> {
+  public static class Factory<S extends ScopeType<S>> implements SharedResourceFactory<RequestSender,
SharedRestClientKey, S> {
     @Override
     public String getName() {
       return SharedRestClientFactory.FACTORY_NAME;
     }
 
     @Override
-    public SharedResourceFactoryResponse<RedirectAwareRestClientRequestSender> createResource(
+    public SharedResourceFactoryResponse<RequestSender> createResource(
         SharedResourcesBroker<S> broker, ScopedConfigView<S, SharedRestClientKey>
config)
         throws NotConfiguredException {
       try {
@@ -177,7 +177,6 @@ public class RedirectAwareRestClientRequestSender extends RestClientRequestSende
           if (this.retries > RedirectAwareRestClientRequestSender.this.connectionPrefixes.size())
{
             this.underlying.onError(new NonRetriableException("Failed to connect to all available
connection prefixes."));
           }
-          log.info("Retries " + this.retries + " this " + hashCode());
           updateRestClient(getNextConnectionPrefix(), "Failed to communicate with " + getCurrentServerPrefix());
           this.exponentialBackoff.awaitNextRetry();
           sendRequest(this.originalRequest, this);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java
b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java
new file mode 100644
index 0000000..951612f
--- /dev/null
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/gobblin/util/limiter/RestliLimiterFactoryTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gobblin.util.limiter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.linkedin.common.callback.Callback;
+import com.linkedin.restli.client.Response;
+import com.typesafe.config.ConfigFactory;
+
+import gobblin.broker.BrokerConfigurationKeyGenerator;
+import gobblin.broker.SharedResourcesBrokerFactory;
+import gobblin.broker.SimpleScopeType;
+import gobblin.broker.iface.SharedResourcesBroker;
+import gobblin.restli.SharedRestClientKey;
+import gobblin.restli.throttling.PermitAllocation;
+import gobblin.restli.throttling.PermitRequest;
+import gobblin.util.limiter.broker.SharedLimiterFactory;
+import gobblin.util.limiter.broker.SharedLimiterKey;
+
+
+public class RestliLimiterFactoryTest {
+
+  @Test
+  public void testFactory() throws Exception {
+    SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+        ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+    MyRequestSender requestSender = new MyRequestSender();
+
+    broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(),
+        new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL,
requestSender);
+    RestliServiceBasedLimiter limiter =
+        broker.getSharedResource(new RestliLimiterFactory<>(), new SharedLimiterKey("my/resource"));
+
+    Assert.assertNotNull(limiter.acquirePermits(10));
+    Assert.assertEquals(requestSender.requestList.size(), 1);
+
+    broker.close();
+  }
+
+  @Test
+  public void testRestliLimiterCalledByLimiterFactory() throws Exception {
+    SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+        ConfigFactory.empty(), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+    MyRequestSender requestSender = new MyRequestSender();
+
+    broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(),
+        new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL,
requestSender);
+    Limiter limiter =
+        broker.getSharedResource(new SharedLimiterFactory<>(), new SharedLimiterKey("my/resource"));
+
+    Assert.assertNotNull(limiter.acquirePermits(10));
+    Assert.assertEquals(requestSender.requestList.size(), 1);
+
+    broker.close();
+  }
+
+  @Test
+  public void testSkipGlobalLimiterOnLimiterFactory() throws Exception {
+    Map<String, String> configMap = ImmutableMap.of(
+        BrokerConfigurationKeyGenerator.generateKey(new SharedLimiterFactory(), null, null,
SharedLimiterFactory.SKIP_GLOBAL_LIMITER_KEY), "true"
+    );
+
+    SharedResourcesBroker<SimpleScopeType> broker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+        ConfigFactory.parseMap(configMap), SimpleScopeType.GLOBAL.defaultScopeInstance());
+
+    MyRequestSender requestSender = new MyRequestSender();
+
+    broker.bindSharedResourceAtScope(new RedirectAwareRestClientRequestSender.Factory<>(),
+        new SharedRestClientKey(RestliLimiterFactory.RESTLI_SERVICE_NAME), SimpleScopeType.GLOBAL,
requestSender);
+    Limiter limiter =
+        broker.getSharedResource(new SharedLimiterFactory<>(), new SharedLimiterKey("my/resource"));
+
+    Assert.assertNotNull(limiter.acquirePermits(10));
+    Assert.assertEquals(requestSender.requestList.size(), 0);
+
+    broker.close();
+  }
+
+  public static class MyRequestSender implements RequestSender {
+    List<PermitRequest> requestList = Lists.newArrayList();
+
+    @Override
+    public void sendRequest(PermitRequest request, Callback<Response<PermitAllocation>>
callback) {
+      this.requestList.add(request);
+
+      PermitAllocation permitAllocation = new PermitAllocation();
+      permitAllocation.setPermits(request.getPermits());
+      permitAllocation.setExpiration(Long.MAX_VALUE);
+
+      Response<PermitAllocation> response = Mockito.mock(Response.class);
+      Mockito.when(response.getEntity()).thenReturn(permitAllocation);
+      callback.onSuccess(response);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/82b5b2d1/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
b/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
index 0d0c5f1..0ae45a6 100644
--- a/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
+++ b/gobblin-utility/src/main/java/gobblin/util/limiter/broker/SharedLimiterFactory.java
@@ -32,6 +32,7 @@ import gobblin.broker.iface.SharedResourcesBroker;
 import gobblin.broker.iface.SharedResourceFactoryResponse;
 import gobblin.broker.ResourceCoordinate;
 import gobblin.util.ClassAliasResolver;
+import gobblin.util.ConfigUtils;
 import gobblin.util.limiter.Limiter;
 import gobblin.util.limiter.LimiterFactory;
 import gobblin.util.limiter.MultiLimiter;
@@ -57,6 +58,11 @@ public class SharedLimiterFactory<S extends ScopeType<S>> implements
SharedResou
   public static final String NAME = "limiter";
   public static final String LIMITER_CLASS_KEY = "class";
   public static final String FAIL_IF_NO_GLOBAL_LIMITER_KEY = "failIfNoGlobalLimiter";
+  /**
+   * Skip use of global limiter. In general, this should not be used, but it is provided
to easily disable global limiters
+   * in case of issues with the coordination server.
+   */
+  public static final String SKIP_GLOBAL_LIMITER_KEY = "skipGlobalLimiter";
   public static final String FAIL_ON_UNKNOWN_RESOURCE_ID = "faiOnUnknownResourceId";
 
   private static final ClassAliasResolver<LimiterFactory> RESOLVER = new ClassAliasResolver<>(LimiterFactory.class);
@@ -74,7 +80,13 @@ public class SharedLimiterFactory<S extends ScopeType<S>> implements
SharedResou
     Config config = configView.getConfig();
     SharedLimiterKey.GlobalLimiterPolicy globalLimiterPolicy = configView.getKey().getGlobalLimiterPolicy();
 
-    if (config.hasPath(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && config.getBoolean(FAIL_IF_NO_GLOBAL_LIMITER_KEY)
&&
+    if (ConfigUtils.getBoolean(config, SKIP_GLOBAL_LIMITER_KEY, false)) {
+      if (globalLimiterPolicy != SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY) {
+        SharedLimiterKey modifiedKey = new SharedLimiterKey(configView.getKey().getResourceLimitedPath(),
+            SharedLimiterKey.GlobalLimiterPolicy.LOCAL_ONLY);
+        return new ResourceCoordinate<>(this, modifiedKey, (S) configView.getScope());
+      }
+    } else if (config.hasPath(FAIL_IF_NO_GLOBAL_LIMITER_KEY) && config.getBoolean(FAIL_IF_NO_GLOBAL_LIMITER_KEY)
&&
         globalLimiterPolicy != SharedLimiterKey.GlobalLimiterPolicy.USE_GLOBAL) {
       // if user has specified FAIL_IF_NO_GLOBAL_LIMITER_KEY, promote the policy from USE_GLOBAL_IF_CONFIGURED
to USE_GLOBAL
       // e.g. fail if no GLOBAL configuration is present


Mime
View raw message