zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject [zookeeper] branch master updated: ZOOKEEPER-3492: Add weights to server side connection throttling
Date Thu, 05 Sep 2019 21:13:02 GMT
This is an automated email from the ASF dual-hosted git repository.

hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new eecd9e7  ZOOKEEPER-3492: Add weights to server side connection throttling
eecd9e7 is described below

commit eecd9e7ce046083bd40cd6134bdb2b405d01fe67
Author: Jie Huang <jiehuang@fb.com>
AuthorDate: Thu Sep 5 14:12:50 2019 -0700

    ZOOKEEPER-3492: Add weights to server side connection throttling
    
    Author: Jie Huang <jiehuang@fb.com>
    
    Reviewers: Michael Han <hanm@apache.org>, Enrico Olivelli <eolivelli@gmail.com>
    
    Closes #1037 from jhuan31/ZOOKEEPER-3492
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  33 ++++-
 .../org/apache/zookeeper/server/BlueThrottle.java  | 119 ++++++++++++++++--
 .../apache/zookeeper/server/SessionTracker.java    |   1 +
 .../zookeeper/server/SessionTrackerImpl.java       |   4 +
 .../apache/zookeeper/server/ZooKeeperServer.java   |  34 ++++--
 .../server/quorum/LeaderSessionTracker.java        |   1 -
 .../server/quorum/UpgradeableSessionTracker.java   |   5 +
 .../apache/zookeeper/server/BlueThrottleTest.java  | 134 +++++++++++++++++++++
 .../zookeeper/server/PrepRequestProcessorTest.java |   4 +
 9 files changed, 311 insertions(+), 24 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index c4cc747..5113eaa 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -124,8 +124,8 @@ is no full support.
 
 #### Required Software
 
-ZooKeeper runs in Java, release 1.8 or greater 
-(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported). 
+ZooKeeper runs in Java, release 1.8 or greater
+(JDK 8 LTS, JDK 11 LTS, JDK 12 - Java 9 and 10 are not supported).
 It runs as an _ensemble_ of ZooKeeper servers. Three
 ZooKeeper servers is the minimum recommended size for an
 ensemble, and we also recommend that they run on separate
@@ -822,6 +822,27 @@ property, when available, is noted below.
     dropping. This parameter defines the threshold to decrease the dropping
     probability. The default is 0.
 
+* *zookeeper.connection_throttle_weight_enabled* :
+    (Java system property only)
+    **New in 3.6.0:**
+    Whether to consider connection weights when throttling. Only useful when connection throttle
is enabled, that is, connectionMaxTokens is larger than 0. The default is false.
+
+* *zookeeper.connection_throttle_global_session_weight* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The weight of a global session. It is the number of tokens required for a global session
request to get through the connection throttler. It has to be a positive integer no smaller
than the weight of a local session. The default is 3.
+
+* *zookeeper.connection_throttle_local_session_weight* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The weight of a local session. It is the number of tokens required for a local session
request to get through the connection throttler. It has to be a positive integer no larger
than the weight of a global session or a renew session. The default is 1.
+
+* *zookeeper.connection_throttle_renew_session_weight* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The weight of renewing a session. It is also the number of tokens required for a reconnect
request to get through the throttler. It has to be a positive integer no smaller than the
weight of a local session. The default is 2.
+
+
  * *clientPortListenBacklog* :
     **New in 3.4.14, 3.5.5, 3.6.0:**
     The socket backlog length for the ZooKeeper server socket. This controls
@@ -889,7 +910,7 @@ property, when available, is noted below.
 
 * *advancedFlowControlEnabled* :
     (Java system property: **zookeeper.netty.advancedFlowControl.enabled**)
-    Using accurate flow control in netty based on the status of ZooKeeper 
+    Using accurate flow control in netty based on the status of ZooKeeper
     pipeline to avoid direct buffer OOM. It will disable the AUTO_READ in
     Netty.
 
@@ -958,9 +979,9 @@ of servers -- that is, when deploying clusters of servers.
 * *connectToLearnerMasterLimit* :
     (Java system property: zookeeper.**connectToLearnerMasterLimit**)
     Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to
-    connect to the leader after leader election. Defaults to the value of initLimit. 
+    connect to the leader after leader election. Defaults to the value of initLimit.
     Use when initLimit is high so connecting to learner master doesn't result in higher timeout.
-        
+
 * *leaderServes* :
     (Java system property: zookeeper.**leaderServes**)
     Leader accepts client connections. Default value is "yes".
@@ -1568,7 +1589,7 @@ options are used to configure the [AdminServer](#sc_adminserver).
 
 ### Metrics Providers
 
-**New in 3.6.0:** The following options are used to configure metrics. 
+**New in 3.6.0:** The following options are used to configure metrics.
 
  By default ZooKeeper server exposes useful metrics using the [AdminServer](#sc_adminserver).
  and [Four Letter Words](#sc_4lw) interface.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
index 3895c2e..9f03e44 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
@@ -20,6 +20,8 @@ package org.apache.zookeeper.server;
 
 import java.util.Random;
 import org.apache.zookeeper.common.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements a token-bucket based rate limiting mechanism with optional
@@ -69,6 +71,7 @@ import org.apache.zookeeper.common.Time;
  **/
 
 public class BlueThrottle {
+    private static final Logger LOG = LoggerFactory.getLogger(BlueThrottle.class);
 
     private int maxTokens;
     private int fillTime;
@@ -86,35 +89,115 @@ public class BlueThrottle {
     Random rng;
 
     public static final String CONNECTION_THROTTLE_TOKENS = "zookeeper.connection_throttle_tokens";
-    public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
+    private static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
 
     public static final String CONNECTION_THROTTLE_FILL_TIME = "zookeeper.connection_throttle_fill_time";
-    public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
+    private static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
 
     public static final String CONNECTION_THROTTLE_FILL_COUNT = "zookeeper.connection_throttle_fill_count";
-    public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
+    private static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
 
     public static final String CONNECTION_THROTTLE_FREEZE_TIME = "zookeeper.connection_throttle_freeze_time";
-    public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
+    private static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
 
     public static final String CONNECTION_THROTTLE_DROP_INCREASE = "zookeeper.connection_throttle_drop_increase";
-    public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
+    private static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
 
     public static final String CONNECTION_THROTTLE_DROP_DECREASE = "zookeeper.connection_throttle_drop_decrease";
-    public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
+    private static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
 
     public static final String CONNECTION_THROTTLE_DECREASE_RATIO = "zookeeper.connection_throttle_decrease_ratio";
-    public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+    private static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+
+    public static final String WEIGHED_CONNECTION_THROTTLE = "zookeeper.connection_throttle_weight_enabled";
+    private static boolean connectionWeightEnabled;
+
+    public static final String GLOBAL_SESSION_WEIGHT = "zookeeper.connection_throttle_global_session_weight";
+    private static final int DEFAULT_GLOBAL_SESSION_WEIGHT;
+
+    public static final String LOCAL_SESSION_WEIGHT = "zookeeper.connection_throttle_local_session_weight";
+    private static final int DEFAULT_LOCAL_SESSION_WEIGHT;
+
+    public static final String RENEW_SESSION_WEIGHT = "zookeeper.connection_throttle_renew_session_weight";
+    private static final int DEFAULT_RENEW_SESSION_WEIGHT;
+
+    // for unit tests only
+    protected  static void setConnectionWeightEnabled(boolean enabled) {
+        connectionWeightEnabled = enabled;
+        logWeighedThrottlingSetting();
+    }
+
+    private static void logWeighedThrottlingSetting() {
+        if (connectionWeightEnabled) {
+            LOG.info("Weighed connection throttling is enabled. "
+                    + "But it will only be effective if connection throttling is enabled");
+            LOG.info(
+                    "The weights for different session types are: global {} renew {} local
{}",
+                    DEFAULT_GLOBAL_SESSION_WEIGHT,
+                    DEFAULT_RENEW_SESSION_WEIGHT,
+                    DEFAULT_LOCAL_SESSION_WEIGHT
+            );
+        } else {
+            LOG.info("Weighed connection throttling is disabled");
+        }
+    }
 
     static {
-        DEFAULT_CONNECTION_THROTTLE_TOKENS = Integer.getInteger(CONNECTION_THROTTLE_TOKENS,
0);
-        DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME,
1);
-        DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT,
1);
+        int tokens = Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0);
+        int fillCount = Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1);
+
+        connectionWeightEnabled = Boolean.getBoolean(WEIGHED_CONNECTION_THROTTLE);
+
+        // if not specified, the weights for a global session, a local session, and a renew
session
+        // are 3, 1, 2 respectively. The weight for a global session is 3 because in our
connection benchmarking,
+        // the throughput of global sessions is about one third of that of local sessions.
Renewing a session
+        // requires is more expensive than establishing a local session and cheaper than
creating a global session so
+        // its default weight is set to 2.
+        int globalWeight = Integer.getInteger(GLOBAL_SESSION_WEIGHT, 3);
+        int localWeight = Integer.getInteger(LOCAL_SESSION_WEIGHT, 1);
+        int renewWeight = Integer.getInteger(RENEW_SESSION_WEIGHT, 2);
+
+        if (globalWeight <= 0) {
+            LOG.warn("Invalid global session weight {}. It should be larger than 0", globalWeight);
+            DEFAULT_GLOBAL_SESSION_WEIGHT = 3;
+        } else if (globalWeight < localWeight) {
+            LOG.warn("The global session weight {} is less than the local session weight
{}. Use the local session weight.",
+                    globalWeight, localWeight);
+            DEFAULT_GLOBAL_SESSION_WEIGHT = localWeight;
+        } else {
+            DEFAULT_GLOBAL_SESSION_WEIGHT = globalWeight;
+        }
 
+        if (localWeight <= 0) {
+            LOG.warn("Invalid local session weight {}. It should be larger than 0", localWeight);
+            DEFAULT_LOCAL_SESSION_WEIGHT = 1;
+        } else {
+            DEFAULT_LOCAL_SESSION_WEIGHT = localWeight;
+        }
+
+        if (renewWeight <= 0) {
+            LOG.warn("Invalid renew session weight {}. It should be larger than 0", renewWeight);
+            DEFAULT_RENEW_SESSION_WEIGHT = 2;
+        } else if (renewWeight < localWeight) {
+            LOG.warn("The renew session weight {} is less than the local session weight {}.
Use the local session weight.",
+                    renewWeight, localWeight);
+            DEFAULT_RENEW_SESSION_WEIGHT = localWeight;
+        } else {
+            DEFAULT_RENEW_SESSION_WEIGHT = renewWeight;
+        }
+
+        // This is based on the assumption that tokens set in config are for global sessions
+        DEFAULT_CONNECTION_THROTTLE_TOKENS = connectionWeightEnabled
+                ? DEFAULT_GLOBAL_SESSION_WEIGHT * tokens : tokens;
+        DEFAULT_CONNECTION_THROTTLE_FILL_TIME = Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME,
1);
+        DEFAULT_CONNECTION_THROTTLE_FILL_COUNT = connectionWeightEnabled
+                ? DEFAULT_GLOBAL_SESSION_WEIGHT * fillCount : fillCount;
         DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME = Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME,
-1);
         DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE,
0.02);
         DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE = getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE,
0.002);
         DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO = getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO,
0);
+
+        logWeighedThrottlingSetting();
     }
 
     /* Varation of Integer.getInteger for real number properties */
@@ -212,6 +295,22 @@ public class BlueThrottle {
         return maxTokens - tokens;
     }
 
+    public int getRequiredTokensForGlobal() {
+        return BlueThrottle.DEFAULT_GLOBAL_SESSION_WEIGHT;
+    }
+
+    public int getRequiredTokensForLocal() {
+        return BlueThrottle.DEFAULT_LOCAL_SESSION_WEIGHT;
+    }
+
+    public int getRequiredTokensForRenew() {
+        return BlueThrottle.DEFAULT_RENEW_SESSION_WEIGHT;
+    }
+
+    public boolean isConnectionWeightEnabled() {
+        return BlueThrottle.connectionWeightEnabled;
+    }
+
     public synchronized boolean checkLimit(int need) {
         // A maxTokens setting of zero disables throttling
         if (maxTokens == 0) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
index 8a3bb1e..9cf4774 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
@@ -137,4 +137,5 @@ public interface SessionTracker {
      */
     long getLocalSessionCount();
 
+    boolean isLocalSessionsEnabled();
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
index 07b3fae..755512e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
@@ -342,4 +342,8 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements
Sessi
         return 0;
     }
 
+    @Override
+    public boolean isLocalSessionsEnabled() {
+        return false;
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 01748e6..95aaed3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -148,6 +148,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
     protected String initialConfig;
     private final RequestPathMetricsCollector requestPathMetricsCollector;
 
+    private boolean localSessionEnabled = false;
     protected enum State {
         INITIAL,
         RUNNING,
@@ -598,7 +599,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
         registerMetrics();
 
         setState(State.RUNNING);
+
         requestPathMetricsCollector.start();
+
+        localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
         notifyAll();
     }
 
@@ -1212,12 +1216,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
         return connThrottle.getDropChance();
     }
 
-    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws
IOException, ClientCnxnLimitException {
-
-        if (!connThrottle.checkLimit(1)) {
-            throw new ClientCnxnLimitException();
-        }
-        ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+    @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't
change after startup")
+    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
+        throws IOException, ClientCnxnLimitException {
 
         BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
         ConnectRequest connReq = new ConnectRequest();
@@ -1226,7 +1227,27 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
             LOG.debug("Session establishment request from client " + cnxn.getRemoteSocketAddress()
                       + " client's lastZxid is 0x" + Long.toHexString(connReq.getLastZxidSeen()));
         }
+        long sessionId = connReq.getSessionId();
+        int tokensNeeded = 1;
+        if (connThrottle.isConnectionWeightEnabled()) {
+            if (sessionId == 0) {
+                if (localSessionEnabled) {
+                    tokensNeeded = connThrottle.getRequiredTokensForLocal();
+                } else {
+                    tokensNeeded = connThrottle.getRequiredTokensForGlobal();
+                }
+            } else {
+                tokensNeeded = connThrottle.getRequiredTokensForRenew();
+            }
+        }
+
+        if (!connThrottle.checkLimit(tokensNeeded)) {
+            throw new ClientCnxnLimitException();
+        }
+        ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+
         ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
+
         boolean readOnly = false;
         try {
             readOnly = bia.readBool("readOnly");
@@ -1269,7 +1290,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider
{
         // We don't want to receive any packets until we are sure that the
         // session is setup
         cnxn.disableRecv();
-        long sessionId = connReq.getSessionId();
         if (sessionId == 0) {
             long id = createSession(cnxn, passwd, sessionTimeout);
             if (LOG.isDebugEnabled()) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
index f4eb92c..5ab732f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
@@ -38,7 +38,6 @@ public class LeaderSessionTracker extends UpgradeableSessionTracker {
 
     private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class);
 
-    private final boolean localSessionsEnabled;
     private final SessionTrackerImpl globalSessionTracker;
 
     /**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
index 9edb4f2..bc25e5d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
@@ -55,6 +55,11 @@ public abstract class UpgradeableSessionTracker implements SessionTracker
{
         return localSessionTracker != null && localSessionTracker.isTrackingSession(sessionId);
     }
 
+    @Override
+    public boolean isLocalSessionsEnabled() {
+        return localSessionsEnabled;
+    }
+
     public boolean isUpgradingSession(long sessionId) {
         return upgradingSessions != null && upgradingSessions.containsKey(sessionId);
     }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
index c3d10bb..8b64c2b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
@@ -22,7 +22,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +35,7 @@ import org.slf4j.LoggerFactory;
 public class BlueThrottleTest extends ZKTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(BlueThrottleTest.class);
+    private static final int RAPID_TIMEOUT = 10000;
 
     class MockRandom extends Random {
 
@@ -162,4 +168,132 @@ public class BlueThrottleTest extends ZKTestCase {
         assertTrue("Later requests should have a chance", accepted > 0);
     }
 
+    private QuorumUtil quorumUtil = new QuorumUtil(1);
+    private ClientBase.CountdownWatcher[] watchers;
+    private ZooKeeper[] zks;
+
+    private int connect(int n) throws Exception {
+        String connStr = quorumUtil.getConnectionStringForServer(1);
+        int connected = 0;
+
+        zks = new ZooKeeper[n];
+        watchers = new ClientBase.CountdownWatcher[n];
+        for (int i = 0; i < n; i++){
+            watchers[i] = new ClientBase.CountdownWatcher();
+            zks[i] = new ZooKeeper(connStr, 3000, watchers[i]);
+            try {
+                watchers[i].waitForConnected(RAPID_TIMEOUT);
+                connected++;
+            } catch (TimeoutException e) {
+                LOG.info("Connection denied by the throttler due to insufficient tokens");
+                break;
+            }
+        }
+
+        return connected;
+    }
+
+    private void shutdownQuorum() throws Exception{
+        for (ZooKeeper zk : zks) {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+
+        quorumUtil.shutdownAll();
+    }
+
+    @Test
+    public void testNoThrottling() throws Exception {
+        quorumUtil.startAll();
+
+        //disable throttling
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(0);
+
+        int connected = connect(10);
+
+        Assert.assertEquals(10, connected);
+        shutdownQuorum();
+    }
+
+    @Test
+    public void testThrottling() throws Exception {
+        quorumUtil.enableLocalSession(true);
+        quorumUtil.startAll();
+
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
+        //no refill, makes testing easier
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+
+        int connected = connect(3);
+        Assert.assertEquals(2, connected);
+        shutdownQuorum();
+
+        quorumUtil.enableLocalSession(false);
+        quorumUtil.startAll();
+
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(2);
+        //no refill, makes testing easier
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+
+        connected = connect(3);
+        Assert.assertEquals(2, connected);
+        shutdownQuorum();
+    }
+
+    @Test
+    public void testWeighedThrottling() throws Exception {
+        // this test depends on the session weights set to the default values
+        // 3 for global session, 2 for renew sessions, 1 for local sessions
+        BlueThrottle.setConnectionWeightEnabled(true);
+
+        quorumUtil.enableLocalSession(true);
+        quorumUtil.startAll();
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+
+        //try to create 11 local sessions, 10 created, because we have only 10 tokens
+        int connected = connect(11);
+        Assert.assertEquals(10, connected);
+        shutdownQuorum();
+
+        quorumUtil.enableLocalSession(false);
+        quorumUtil.startAll();
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+        //tyr to create 11 global sessions, 3 created, because we have 10 tokens and each
connection needs 3
+        connected = connect(11);
+        Assert.assertEquals(3, connected);
+        shutdownQuorum();
+
+        quorumUtil.startAll();
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(10);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+        connected = connect(2);
+        Assert.assertEquals(2, connected);
+
+        quorumUtil.shutdown(1);
+        watchers[0].waitForDisconnected(RAPID_TIMEOUT);
+        watchers[1].waitForDisconnected(RAPID_TIMEOUT);
+
+        quorumUtil.restart(1);
+        //client will try to reconnect
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setMaxTokens(3);
+        quorumUtil.getPeer(1).peer.getActiveServer().connThrottle().setFillCount(0);
+        int reconnected = 0;
+        for (int i = 0; i < 2; i++){
+            try {
+                watchers[i].waitForConnected(RAPID_TIMEOUT);
+                reconnected++;
+            } catch (TimeoutException e) {
+                LOG.info("One reconnect fails due to insufficient tokens");
+            }
+        }
+        //each reconnect takes two tokens, we have 3, so only one reconnects
+        LOG.info("reconnected {}", reconnected);
+        Assert.assertEquals(1, reconnected);
+        shutdownQuorum();
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 8aacaac..9724423 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -278,6 +278,10 @@ public class PrepRequestProcessorTest extends ClientBase {
             return 0;
         }
 
+        @Override
+        public boolean isLocalSessionsEnabled() {
+            return false;
+        }
     }
 
 }


Mime
View raw message