geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From heyba...@apache.org
Subject [geode] branch develop updated: GEODE-6515: refactor ConnectionManagerImpl (#3304)
Date Mon, 01 Apr 2019 17:01:05 GMT
This is an automated email from the ASF dual-hosted git repository.

heybales pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e87e720  GEODE-6515: refactor ConnectionManagerImpl (#3304)
e87e720 is described below

commit e87e720b331f164723c36ec82cd8d01c559b1ae3
Author: Helena Bales <hbales@pivotal.io>
AuthorDate: Mon Apr 1 10:00:49 2019 -0700

    GEODE-6515: refactor ConnectionManagerImpl (#3304)
    
    Refactors the ConnectionManagerImpl to a non-locking implementation to allow gets to scale with more threads. The previous implementation locked around all logic for getting, creating, or returning a connection to the pool, which resulted in a high degree of contention for that lock.
    
    Additionally, much of the logic for accounting for the number of total connections, and the dequeue of available connections have been extracted to ConnectionAccounting and AvailableConnectionManager respectively. This was done in order to add unit and concurrent tests for that logic.
    
    * Refactor ConnectionManagerImpl to a non-locking implementation
    * add unit tests for ConnectionManagerImpl
    * update ConnectionManagerImpl Javadocs
    * extract ConnectionAccounting from ConnectionManagerImpl
    * add unit test for ConnectionAccounting
    * add concurrency tests for ConnectionAccounting
    * extract AvailableConnectionManager from ConnectionManagerImpl
    * add unit tests for AvailableConnectionManager
    * add concurrency tests for AvailableConnectionManager
    * add unit test for ConcurrentTestRunner
    * add javadocs to AvailableConnectionManager and improved the method names
    * activate returns false if the connection has been destroyed instead of throwing ConnectionDestroyedException
    * start background prefill if under the minimum number of connections in ConnectionManagerImpl#borrowConnection when create fails
    * add generic <ServerLocation> to Set in ConnectionManagerImpl
    * Correct invalidateServer logic in ConnectionManagerImpl
    * make NOT_WAITING private in ConnectionManagerImpl
    * made createLifetimeReplacementConnection private since it is only
    used by ConnectionMap
    
    Signed-off-by: Helena Bales <hbales@pivotal.io>
    Signed-off-by: Jacob Barrett <jbarrett@pivotal.io>
    Signed-off-by: Darrel Schneider <dschneider@pivotal.io>
---
 geode-concurrency-test/build.gradle                |   8 +-
 .../test/concurrency/ConcurrentTestRunner.java     |   4 +-
 .../geode/test/concurrency/ParallelExecutor.java   |  11 +
 .../{loop/LoopRunnerConfig.java => Utilities.java} |  28 +-
 .../geode/test/concurrency/loop/LoopRunner.java    |  28 +-
 .../test/concurrency/loop/LoopRunnerConfig.java    |   7 +
 .../test/concurrency/ConcurrentTestRunnerTest.java |  58 ++
 .../AvailableConnectionManagerConcurrentTest.java  | 189 +++++
 .../ConnectionAccountingConcurrentTest.java        | 211 +++++
 .../pooling/ConnectionManagerImplTest.java         | 559 +++++++++++++
 .../pooling/ConnectionManagerJUnitTest.java        |   2 +-
 .../cache/client/internal/OpExecutorImpl.java      |  12 +-
 .../pooling/AvailableConnectionManager.java        | 143 ++++
 .../internal/pooling/ConnectionAccounting.java     | 148 ++++
 .../client/internal/pooling/ConnectionManager.java |  42 +-
 .../internal/pooling/ConnectionManagerImpl.java    | 864 ++++++++-------------
 .../client/internal/pooling/PooledConnection.java  |  21 +-
 .../client/internal/OpExecutorImplJUnitTest.java   |   7 +-
 .../pooling/AvailableConnectionManagerTest.java    | 230 ++++++
 .../internal/pooling/ConnectionAccountingTest.java | 223 ++++++
 20 files changed, 2228 insertions(+), 567 deletions(-)

diff --git a/geode-concurrency-test/build.gradle b/geode-concurrency-test/build.gradle
index 7d422c8..3ab10bb 100644
--- a/geode-concurrency-test/build.gradle
+++ b/geode-concurrency-test/build.gradle
@@ -14,11 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-apply from: "${project.projectDir}/../gradle/publish.gradle"
 
+apply from: "${project.projectDir}/../gradle/publish.gradle"
 
 dependencies {
   compile(platform(project(':boms:geode-all-bom')))
   compile('junit:junit')
   compile('org.apache.logging.log4j:log4j-api')
+  testCompile('org.assertj:assertj-core')
+}
+
+test {
+  // Some tests have inner tests that should be ignored
+  exclude "**/*\$*.class"
 }
diff --git a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ConcurrentTestRunner.java b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ConcurrentTestRunner.java
index 177bc55..5652e1e 100644
--- a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ConcurrentTestRunner.java
+++ b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ConcurrentTestRunner.java
@@ -38,9 +38,7 @@ import org.apache.geode.test.concurrency.loop.LoopRunner;
  * test can use to invoke code in parallel.
  *
  * This test run will try to exercise the test method to flush out any concurrent bugs in the
- * parallel execution. Currently this runner is using Java PathFinder to run the test with *all*
- * possible thread interleavings, but other methods such as invoking the method multiple times in a
- * normal JVM may be supported in the feature.
+ * parallel execution.
  *
  * All test logic and state *must* be encapsulated in the individual test methods. This is because
  * the concurrency testing logic may need to invoke the test body multiple times, possibly in
diff --git a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ParallelExecutor.java b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ParallelExecutor.java
index ee46ed8..011c123 100644
--- a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ParallelExecutor.java
+++ b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/ParallelExecutor.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.test.concurrency;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -40,9 +42,18 @@ public interface ParallelExecutor {
     });
   }
 
+  default <T> Collection<Future<T>> inParallel(RunnableWithException runnable, int count) {
+    ArrayList<Future<T>> futures = new ArrayList<>(count);
+    for (; count > 0; count--) {
+      futures.add(inParallel(runnable));
+    }
+    return futures;
+  }
+
   /**
    * Execute all tasks in parallel, wait for them to complete and throw an exception if any of the
    * tasks throw an exception.
    */
   void execute() throws ExecutionException, InterruptedException;
+
 }
diff --git a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunnerConfig.java b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/Utilities.java
similarity index 61%
copy from geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunnerConfig.java
copy to geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/Utilities.java
index 7e58a4e..bc274f7 100644
--- a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunnerConfig.java
+++ b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/Utilities.java
@@ -12,11 +12,27 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.test.concurrency.loop;
 
-/**
- * Configuration for the LoopRunner class
- */
-public @interface LoopRunnerConfig {
-  int count();
+package org.apache.geode.test.concurrency;
+
+public interface Utilities {
+
+  /**
+   * @return Result of {@code Runtime.getRuntime().availableProcessors()}.
+   */
+  static int availableProcessors() {
+    return Runtime.getRuntime().availableProcessors();
+  }
+
+  /**
+   * Repeat {@code task} for {@code count} times serially.
+   *
+   * @param task to repeat
+   * @param count number of times
+   */
+  static void repeat(Runnable task, int count) {
+    for (; count > 0; count--) {
+      task.run();
+    }
+  }
 }
diff --git a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunner.java b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunner.java
index 31d87f9..0cc0297 100644
--- a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunner.java
+++ b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunner.java
@@ -12,17 +12,21 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
+
 package org.apache.geode.test.concurrency.loop;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.geode.test.concurrency.ParallelExecutor;
 import org.apache.geode.test.concurrency.Runner;
@@ -44,6 +48,12 @@ public class LoopRunner implements Runner {
         try {
           Object test = child.getDeclaringClass().newInstance();
           child.invoke(test, executor);
+        } catch (InvocationTargetException ex) {
+          Throwable exceptionToReturn = ex.getCause();
+          if (exceptionToReturn == null) {
+            exceptionToReturn = ex;
+          }
+          return Collections.singletonList(ex.getCause());
         } catch (Exception e) {
           return Collections.singletonList(e);
         }
@@ -66,25 +76,37 @@ public class LoopRunner implements Runner {
 
   private static class DelegatingExecutor implements ParallelExecutor {
     private final ExecutorService executorService;
-    List<Future<?>> futures;
+    private List<Future<?>> futures;
+    private final AtomicInteger callablesStarting = new AtomicInteger(0);
+    private final CountDownLatch start = new CountDownLatch(1);
 
     public DelegatingExecutor(ExecutorService executorService) {
       this.executorService = executorService;
-      futures = new ArrayList<Future<?>>();
+      futures = new ArrayList<>();
     }
 
     @Override
     public <T> Future<T> inParallel(Callable<T> callable) {
-      Future<T> future = executorService.submit(callable);
+      callablesStarting.getAndIncrement();
+      Future<T> future = executorService.submit(() -> {
+        callablesStarting.getAndDecrement();
+        start.await();
+        return callable.call();
+      });
       futures.add(future);
       return future;
     }
 
     @Override
     public void execute() throws ExecutionException, InterruptedException {
+      while (callablesStarting.get() > 0);
+
+      start.countDown();
       for (Future future : futures) {
         future.get();
       }
+      futures.clear();
     }
+
   }
 }
diff --git a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunnerConfig.java b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunnerConfig.java
index 7e58a4e..ee294d72 100644
--- a/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunnerConfig.java
+++ b/geode-concurrency-test/src/main/java/org/apache/geode/test/concurrency/loop/LoopRunnerConfig.java
@@ -14,9 +14,16 @@
  */
 package org.apache.geode.test.concurrency.loop;
 
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
 /**
  * Configuration for the LoopRunner class
  */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
 public @interface LoopRunnerConfig {
   int count();
 }
diff --git a/geode-concurrency-test/src/test/java/org/apache/geode/test/concurrency/ConcurrentTestRunnerTest.java b/geode-concurrency-test/src/test/java/org/apache/geode/test/concurrency/ConcurrentTestRunnerTest.java
new file mode 100644
index 0000000..76e8d01
--- /dev/null
+++ b/geode-concurrency-test/src/test/java/org/apache/geode/test/concurrency/ConcurrentTestRunnerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.test.concurrency;
+
+import static org.apache.geode.test.concurrency.Utilities.availableProcessors;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.junit.runner.JUnitCore;
+import org.junit.runner.RunWith;
+
+public class ConcurrentTestRunnerTest {
+  @Test
+  public void confirmThatInParallelRunsConcurrently() {
+    // We only need FailingTest to fail once for the following
+    // assertion to pass. ConcurrentTestRunner runs FailingTest
+    // 1000 times by default. It will stop running it once it
+    // sees it fail, which is what we want to see because it
+    // confirms that running inParallel actually runs concurrently.
+    assertThat(JUnitCore.runClasses(CheckForConcurrency.class).wasSuccessful()).isFalse();
+  }
+
+  /**
+   * This "test" is only meant to be run by confirmThatInParallelRunsConcurrently.
+   * If you run this "test" directly you can expect to see if fail.
+   */
+  @RunWith(ConcurrentTestRunner.class)
+  public static class CheckForConcurrency {
+    @Test
+    public void validateConcurrentExecution(ParallelExecutor executor)
+        throws ExecutionException, InterruptedException {
+      final AtomicInteger atomicInteger = new AtomicInteger(0);
+      executor.inParallel(() -> {
+        int oldValue = atomicInteger.get();
+        // We want to see the following assertion fail because that indicates
+        // that another thread currently modified the atomic.
+        assertThat(atomicInteger.compareAndSet(oldValue, oldValue + 1)).isTrue();
+      }, availableProcessors());
+      executor.execute();
+    }
+  }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManagerConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManagerConcurrentTest.java
new file mode 100644
index 0000000..de86b96
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManagerConcurrentTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.client.internal.pooling;
+
+import static org.apache.geode.test.concurrency.Utilities.availableProcessors;
+import static org.apache.geode.test.concurrency.Utilities.repeat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.test.concurrency.ConcurrentTestRunner;
+import org.apache.geode.test.concurrency.ParallelExecutor;
+
+@RunWith(ConcurrentTestRunner.class)
+public class AvailableConnectionManagerConcurrentTest {
+  private final int parallelCount = availableProcessors();
+  private final int iterationCount = 250;
+  private final AvailableConnectionManager instance = new AvailableConnectionManager();
+
+  @Test
+  public void useFirstAddFirstDoesNotLoseConnections(ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    repeat(() -> instance.addFirst(createConnection(), false), parallelCount);
+
+    executor.inParallel(() -> {
+      repeat(() -> {
+        PooledConnection used = instance.useFirst();
+        instance.addFirst(used, true);
+      }, iterationCount);
+    }, parallelCount);
+    executor.execute();
+
+    assertThat(instance.getDeque()).hasSize(parallelCount);
+  }
+
+  @Test
+  public void useFirstWithPredicateAddFirstDoesNotLoseConnections(ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    repeat(() -> instance.addFirst(createConnection(), false), parallelCount);
+
+    executor.inParallel(() -> {
+      repeat(() -> {
+        PooledConnection used = instance.useFirst(c -> true);
+        instance.addFirst(used, true);
+      }, iterationCount);
+    }, parallelCount);
+    executor.execute();
+
+    assertThat(instance.getDeque()).hasSize(parallelCount);
+  }
+
+  @Test
+  public void useFirstAddLastDoesNotLoseConnections(ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    repeat(() -> instance.addFirst(createConnection(), false), parallelCount);
+
+    executor.inParallel(() -> {
+      repeat(() -> {
+        PooledConnection used = instance.useFirst();
+        instance.addLast(used, true);
+      }, iterationCount);
+    }, parallelCount);
+    executor.execute();
+
+    assertThat(instance.getDeque()).hasSize(parallelCount);
+  }
+
+  @Test
+  public void useFirstAddFirstDoesNotLoseConnectionsEvenWhenUseFirstReturnsNull(
+      ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    int connectionCount = 2;
+    int threadCount = connectionCount * 5;
+    repeat(() -> instance.addFirst(createConnection(), false), connectionCount);
+
+    executor.inParallel(() -> {
+      repeat(() -> {
+        PooledConnection used = instance.useFirst();
+        if (used != null) {
+          Thread.yield();
+          instance.addFirst(used, true);
+        }
+      }, iterationCount);
+    }, threadCount);
+    executor.execute();
+
+    assertThat(instance.getDeque()).hasSize(connectionCount);
+  }
+
+  @Test
+  public void useFirstWithPredicateAddFirstDoesNotLoseConnectionsEvenWhenUseFirstReturnsNull(
+      ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    int connectionCount = 2;
+    int threadCount = connectionCount * 5;
+    repeat(() -> instance.addFirst(createConnection(), false), connectionCount);
+
+    executor.inParallel(() -> {
+      repeat(() -> {
+        PooledConnection used = instance.useFirst(c -> true);
+        if (used != null) {
+          Thread.yield();
+          instance.addFirst(used, true);
+        }
+      }, iterationCount);
+    }, threadCount);
+    executor.execute();
+
+    assertThat(instance.getDeque()).hasSize(connectionCount);
+  }
+
+  @Test
+  public void useFirstAddLastWithPredicateThatDoesNotAlwaysMatchDoesNotLoseConnectionsEvenWhenUseFirstReturnsNull(
+      ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    int connectionCount = 2;
+    int threadCount = connectionCount * 5;
+    repeat(() -> instance.addFirst(createConnection(), false), connectionCount);
+    // now add a bunch of connections that will not match the predicate
+    repeat(() -> {
+      PooledConnection nonMatchingConnection = createConnection();
+      when(nonMatchingConnection.getBirthDate()).thenReturn(1L);
+      instance.addFirst(nonMatchingConnection, false);
+    }, connectionCount);
+
+    executor.inParallel(() -> {
+      repeat(() -> {
+        PooledConnection used = instance.useFirst(c -> c.getBirthDate() == 0L);
+        if (used != null) {
+          Thread.yield();
+          assertThat(used.getBirthDate()).isEqualTo(0L);
+          instance.addLast(used, true);
+        }
+      }, iterationCount);
+    }, threadCount);
+    executor.execute();
+
+    assertThat(instance.getDeque()).hasSize(connectionCount * 2);
+  }
+
+  @Test
+  public void addLastRemoveDoesNotRemoveOtherConnections(ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    int originalCount = 7;
+    ArrayList<PooledConnection> originalConnections = new ArrayList<>();
+    repeat(() -> {
+      PooledConnection original = createConnection();
+      originalConnections.add(original);
+      instance.addFirst(original, false);
+    }, originalCount);
+
+    executor.inParallel(() -> {
+      repeat(() -> {
+        PooledConnection removed = createConnection();
+        instance.addLast(removed, true);
+        assertThat(instance.remove(removed)).isTrue();
+      }, iterationCount);
+    }, parallelCount);
+    executor.execute();
+
+    assertThat(instance.getDeque()).containsExactlyInAnyOrderElementsOf(originalConnections);
+  }
+
+  private PooledConnection createConnection() {
+    PooledConnection result = mock(PooledConnection.class);
+    when(result.activate()).thenReturn(true);
+    when(result.isActive()).thenReturn(true);
+    return result;
+  }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccountingConcurrentTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccountingConcurrentTest.java
new file mode 100644
index 0000000..75ead32
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccountingConcurrentTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.client.internal.pooling;
+
+import static org.apache.geode.test.concurrency.Utilities.availableProcessors;
+import static org.apache.geode.test.concurrency.Utilities.repeat;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.test.concurrency.ConcurrentTestRunner;
+import org.apache.geode.test.concurrency.ParallelExecutor;
+
+@RunWith(ConcurrentTestRunner.class)
+public class ConnectionAccountingConcurrentTest {
+  private final int count = availableProcessors() * 2;
+
+  @Test
+  public void tryPrefillStaysBelowOrAtMin(ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    final int min = count - 1;
+    ConnectionAccounting accountant = new ConnectionAccounting(min, min + 4);
+
+    executor.inParallel(() -> {
+      accountant.tryPrefill();
+      assertThat(accountant.getCount()).isGreaterThan(0).isLessThanOrEqualTo(min);
+    }, count);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(min);
+  }
+
+  @Test
+  public void cancelTryPrefillStaysUnderMin(ParallelExecutor executor)
+      throws ExecutionException, InterruptedException {
+    final int min = count - 1;
+    ConnectionAccounting accountant = new ConnectionAccounting(min, min + 4);
+
+    executor.inParallel(() -> {
+      if (accountant.tryPrefill()) {
+        accountant.cancelTryPrefill();
+        assertThat(accountant.getCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(min);
+      }
+    }, count);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void creates(ParallelExecutor executor) throws Exception {
+    ConnectionAccounting accountant = new ConnectionAccounting(0, 1);
+
+    executor.inParallel(() -> {
+      accountant.create();
+      assertThat(accountant.getCount()).isGreaterThan(0).isLessThanOrEqualTo(count);
+    }, count);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(count);
+  }
+
+  @Test
+  public void tryCreateStaysWithinMax(ParallelExecutor executor) throws Exception {
+    ConnectionAccounting accountant = new ConnectionAccounting(1, count);
+
+    executor.inParallel(() -> {
+      if (accountant.tryCreate()) {
+        assertThat(accountant.getCount()).isGreaterThan(0).isLessThanOrEqualTo(count);
+      }
+    }, count + 1);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(count);
+  }
+
+  @Test
+  public void cancelTryCreateStaysWithinMax(ParallelExecutor executor) throws Exception {
+    ConnectionAccounting accountant = new ConnectionAccounting(1, count);
+
+    executor.inParallel(() -> {
+      if (accountant.tryCreate()) {
+        accountant.cancelTryCreate();
+        assertThat(accountant.getCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(count);
+      }
+    }, count + 1);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void destroyAndIsUnderMinimum(ParallelExecutor executor) throws Exception {
+    ConnectionAccounting accountant = new ConnectionAccounting(2, 4);
+    repeat(() -> accountant.create(), count);
+
+    executor.inParallel(() -> {
+      if (accountant.destroyAndIsUnderMinimum(1)) {
+        assertThat(accountant.getCount()).isGreaterThanOrEqualTo(0).isLessThan(2);
+      } else {
+        assertThat(accountant.getCount()).isGreaterThanOrEqualTo(0).isLessThan(count);
+      }
+    }, count);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void tryDestroyNeverGoesBelowMax(ParallelExecutor executor) throws Exception {
+    final int overfillMax = Math.max(count, 4);
+    final int max = overfillMax / 2;
+    ConnectionAccounting accountant = new ConnectionAccounting(1, max);
+    repeat(() -> accountant.create(), overfillMax);
+
+    executor.inParallel(() -> {
+      if (accountant.tryDestroy()) {
+        assertThat(accountant.getCount()).isGreaterThanOrEqualTo(max).isLessThan(overfillMax);
+      }
+    }, overfillMax + 1);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(max);
+  }
+
+  @Test
+  public void cancelTryDestroyStaysAboveMax(ParallelExecutor executor) throws Exception {
+    final int overfillMax = Math.max(count, 4);
+    final int max = overfillMax / 2;
+    ConnectionAccounting accountant = new ConnectionAccounting(1, max);
+    repeat(() -> accountant.create(), overfillMax);
+
+    executor.inParallel(() -> {
+      if (accountant.tryDestroy()) {
+        accountant.cancelTryDestroy();
+        assertThat(accountant.getCount()).isGreaterThanOrEqualTo(max)
+            .isLessThanOrEqualTo(overfillMax);
+      }
+    }, max + 1);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isEqualTo(overfillMax);
+  }
+
+
+  @Test
+  public void mixItUp(ParallelExecutor executor) throws Exception {
+    final int overfill = Math.max(count, 8);
+    final int max = overfill / 4;
+    final int overfillMax = overfill + max;
+    ConnectionAccounting accountant = new ConnectionAccounting(1, max);
+    repeat(() -> accountant.create(), overfill);
+
+    executor.inParallel(() -> {
+      if (accountant.tryDestroy()) {
+        accountant.cancelTryDestroy();
+      }
+    }, max);
+
+    executor.inParallel(() -> {
+      if (accountant.tryCreate()) {
+        accountant.cancelTryCreate();
+      }
+    }, max);
+
+    executor.inParallel(() -> {
+      accountant.create();
+    }, max);
+
+    executor.inParallel(() -> {
+      if (accountant.tryCreate()) {
+        accountant.destroyAndIsUnderMinimum(1);
+      }
+    }, max);
+
+    executor.inParallel(() -> {
+      if (accountant.tryPrefill()) {
+        accountant.cancelTryPrefill();
+      }
+    }, max);
+
+    executor.execute();
+
+    assertThat(accountant.getCount()).isGreaterThanOrEqualTo(0).isLessThanOrEqualTo(overfillMax);
+  }
+
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
new file mode 100644
index 0000000..35afea3
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImplTest.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal.pooling;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.ConnectionFactory;
+import org.apache.geode.cache.client.internal.Endpoint;
+import org.apache.geode.cache.client.internal.EndpointManager;
+import org.apache.geode.distributed.PoolCancelledException;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.PoolStats;
+import org.apache.geode.internal.logging.InternalLogWriter;
+
+public class ConnectionManagerImplTest {
+  ConnectionManagerImpl connectionManager;
+  public final String poolName = "poolName";
+  public final ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+  public final EndpointManager endpointManager = mock(EndpointManager.class);
+  public final InternalLogWriter securityLogger = mock(InternalLogWriter.class);
+  public final CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+  public final PoolStats poolStats = mock(PoolStats.class);
+  public final ScheduledExecutorService backgroundProcessor = mock(ScheduledExecutorService.class);
+  public int maxConnections = 800;
+  public int minConnections = 10;
+  public long idleTimeout = 1000;
+  public long timeout = 1000;
+  public int lifetimeTimeout = 1000;
+  public long pingInterval = 10;
+
+  private ConnectionManagerImpl createDefaultConnectionManager() {
+    return new ConnectionManagerImpl(poolName, connectionFactory, endpointManager, maxConnections,
+        minConnections, idleTimeout, lifetimeTimeout, securityLogger, pingInterval, cancelCriterion,
+        poolStats);
+  }
+
+  @Test
+  public void startExecutedPrefillConnectionsOnce() {
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+    connectionManager.start(backgroundProcessor);
+    verify(backgroundProcessor, times(1)).execute(any());
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void startShouldEatRejectedExecutionException() {
+    doThrow(RejectedExecutionException.class).when(backgroundProcessor).execute(any());
+
+    connectionManager = createDefaultConnectionManager();
+    assertThatCode(() -> connectionManager.start(backgroundProcessor)).doesNotThrowAnyException();
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionThrowsWhenUsingExistingConnectionsAndNoConnectionsExist() {
+    ServerLocation serverLocation = mock(ServerLocation.class);
+
+    connectionManager = createDefaultConnectionManager();
+    assertThatThrownBy(() -> connectionManager.borrowConnection(serverLocation, timeout, true))
+        .isInstanceOf(AllConnectionsInUseException.class);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionCreatesAConnectionOnSpecifiedServerWhenNoneExist() {
+    Connection connection = mock(Connection.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation, false))
+        .thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    assertThat(connectionManager.borrowConnection(serverLocation, timeout, false))
+        .isInstanceOf(PooledConnection.class);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(1);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionCreatesAConnectionWhenNoneExist() {
+    Connection connection = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    assertThat(connectionManager.borrowConnection(timeout)).isInstanceOf(PooledConnection.class);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(1);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionReturnsAnActiveConnection() {
+    Connection connection = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    PooledConnection heldConnection =
+        (PooledConnection) connectionManager.borrowConnection(timeout);
+    assertThatThrownBy(() -> heldConnection.activate()).isInstanceOf(InternalGemFireException.class)
+        .hasMessageContaining("Connection already active");
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionReturnsAConnectionWhenOneExists() {
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    Endpoint endpoint = mock(Endpoint.class);
+    Connection connection = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(connection);
+    when(connection.getServer()).thenReturn(serverLocation);
+    when(connection.getEndpoint()).thenReturn(endpoint);
+    when(endpoint.getLocation()).thenReturn(serverLocation);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(timeout);
+    connectionManager.returnConnection(heldConnection);
+    heldConnection = connectionManager.borrowConnection(timeout);
+    assertThat(heldConnection.getServer()).isEqualTo(connection.getServer());
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(1);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionThrowsExceptionWhenUnableToCreateConnection() {
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(null);
+    doNothing().when(backgroundProcessor).execute(any());
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+    assertThatThrownBy(() -> connectionManager.borrowConnection(timeout))
+        .isInstanceOf(NoAvailableServersException.class);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(0);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionWillSchedulePrefillIfUnderMinimumConnections() {
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(null);
+    doNothing().when(backgroundProcessor).execute(any());
+
+    pingInterval = 20000000; // set it high to prevent prefill retry
+    connectionManager = spy(createDefaultConnectionManager());
+    connectionManager.start(backgroundProcessor);
+    assertThatThrownBy(() -> connectionManager.borrowConnection(timeout))
+        .isInstanceOf(NoAvailableServersException.class);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(0);
+
+    verify(connectionManager, times(2)).startBackgroundPrefill();
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionGivesUpWhenShuttingDown() {
+    int maxConnections = 1;
+    Connection connection = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(connection);
+
+    connectionManager = new ConnectionManagerImpl(poolName, connectionFactory, endpointManager,
+        maxConnections, 1, idleTimeout, lifetimeTimeout, securityLogger, pingInterval,
+        cancelCriterion, poolStats);
+    connectionManager.start(backgroundProcessor);
+    connectionManager.shuttingDown.set(true);
+
+    // reach max connection count so we can't create a new connection and end up in the wait loop
+    connectionManager.borrowConnection(timeout);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(maxConnections);
+
+    assertThatThrownBy(() -> connectionManager.borrowConnection(timeout))
+        .isInstanceOf(PoolCancelledException.class);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowConnectionTimesOutWithException() {
+    int maxConnections = 1;
+    Connection connection = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(connection);
+
+    connectionManager = new ConnectionManagerImpl(poolName, connectionFactory, endpointManager,
+        maxConnections, 1, idleTimeout, lifetimeTimeout, securityLogger, pingInterval,
+        cancelCriterion, poolStats);
+    connectionManager.start(backgroundProcessor);
+
+    // reach max connection count so we can't create a new connection and end up in the wait loop
+    connectionManager.borrowConnection(timeout);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(maxConnections);
+
+    assertThatThrownBy(() -> connectionManager.borrowConnection(10))
+        .isInstanceOf(AllConnectionsInUseException.class);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void borrowWithServerLocationBreaksMaxConnectionContract() {
+    int maxConnections = 2;
+
+    ServerLocation serverLocation1 = mock(ServerLocation.class);
+    Connection connection1 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation1, false))
+        .thenReturn(connection1);
+
+    ServerLocation serverLocation2 = mock(ServerLocation.class);
+    Connection connection2 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation2, false))
+        .thenReturn(connection2);
+
+    ServerLocation serverLocation3 = mock(ServerLocation.class);
+    Connection connection3 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation3, false))
+        .thenReturn(connection3);
+
+    connectionManager = new ConnectionManagerImpl(poolName, connectionFactory, endpointManager,
+        maxConnections, 1, idleTimeout, lifetimeTimeout, securityLogger, pingInterval,
+        cancelCriterion, poolStats);
+    connectionManager.start(backgroundProcessor);
+
+    connectionManager.borrowConnection(serverLocation1, timeout, false);
+    connectionManager.borrowConnection(serverLocation2, timeout, false);
+    connectionManager.borrowConnection(serverLocation3, timeout, false);
+
+    assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void returnConnectionReturnsToHead() {
+    ServerLocation serverLocation1 = mock(ServerLocation.class);
+    Connection connection1 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation1, false))
+        .thenReturn(connection1);
+    when(connection1.getServer()).thenReturn(serverLocation1);
+
+    ServerLocation serverLocation2 = mock(ServerLocation.class);
+    Connection connection2 = mock(Connection.class);
+    Endpoint endpoint2 = mock(Endpoint.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation2, false))
+        .thenReturn(connection2);
+    when(connection2.getServer()).thenReturn(serverLocation2);
+    when(connection2.getEndpoint()).thenReturn(endpoint2);
+    when(endpoint2.getLocation()).thenReturn(serverLocation2);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+    Connection heldConnection1 =
+        connectionManager.borrowConnection(serverLocation1, timeout, false);
+    Connection heldConnection2 =
+        connectionManager.borrowConnection(serverLocation2, timeout, false);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(2);
+
+    connectionManager.returnConnection(heldConnection1, true);
+    connectionManager.returnConnection(heldConnection2, true);
+
+    assertThat(connectionManager.borrowConnection(timeout).getServer())
+        .isEqualTo(connection2.getServer());
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void shouldDestroyConnectionsDoNotGetReturnedToPool() {
+    Connection connection = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(any())).thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(timeout);
+    heldConnection.destroy();
+    connectionManager.returnConnection(heldConnection, true);
+
+    assertThat(connectionManager.borrowConnection(timeout)).isNotEqualTo(connection);
+    verify(connectionFactory, times(2)).createClientToServerConnection(any());
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void connectionGetsDestroyedWhenReturningToPoolAndOverMaxConnections() {
+    int maxConnections = 2;
+
+    ServerLocation serverLocation1 = mock(ServerLocation.class);
+    Connection connection1 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation1, false))
+        .thenReturn(connection1);
+
+    ServerLocation serverLocation2 = mock(ServerLocation.class);
+    Connection connection2 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation2, false))
+        .thenReturn(connection2);
+
+    ServerLocation serverLocation3 = mock(ServerLocation.class);
+    Connection connection3 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation3, false))
+        .thenReturn(connection3);
+
+    connectionManager = new ConnectionManagerImpl(poolName, connectionFactory, endpointManager,
+        maxConnections, 1, idleTimeout, lifetimeTimeout, securityLogger, pingInterval,
+        cancelCriterion, poolStats);
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection1 =
+        connectionManager.borrowConnection(serverLocation1, timeout, false);
+    Connection heldConnection2 =
+        connectionManager.borrowConnection(serverLocation2, timeout, false);
+    Connection heldConnection3 =
+        connectionManager.borrowConnection(serverLocation3, timeout, false);
+
+    assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections);
+
+    connectionManager.returnConnection(heldConnection3);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(maxConnections);
+
+    connectionManager.returnConnection(heldConnection1);
+    connectionManager.returnConnection(heldConnection2);
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(maxConnections);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void exchangeCreatesNewConnectionIfNoneAreAvailable() {
+    Set<ServerLocation> excluded = Collections.emptySet();
+
+    ServerLocation serverLocation1 = mock(ServerLocation.class);
+    Connection connection1 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation1, false))
+        .thenReturn(connection1);
+
+    ServerLocation serverLocation2 = mock(ServerLocation.class);
+    Endpoint endpoint2 = mock(Endpoint.class);
+    Connection connection2 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(eq(Collections.EMPTY_SET)))
+        .thenReturn(connection2);
+    when(connection2.getServer()).thenReturn(serverLocation2);
+    when(connection2.getEndpoint()).thenReturn(endpoint2);
+    when(endpoint2.getLocation()).thenReturn(serverLocation2);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(serverLocation1, timeout, false);
+    heldConnection = connectionManager.exchangeConnection(heldConnection, excluded, timeout);
+
+    assertThat(heldConnection.getServer()).isEqualTo(connection2.getServer());
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(2);
+    verify(connectionFactory, times(1)).createClientToServerConnection(Collections.EMPTY_SET);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void exchangeBreaksMaxConnectionContractWhenNoConnectionsAreAvailable() {
+    int maxConnections = 2;
+    Set<ServerLocation> excluded = Collections.emptySet();
+
+    ServerLocation serverLocation1 = mock(ServerLocation.class);
+    Connection connection1 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation1, false))
+        .thenReturn(connection1);
+
+    ServerLocation serverLocation2 = mock(ServerLocation.class);
+    Connection connection2 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation2, false))
+        .thenReturn(connection2);
+
+    ServerLocation serverLocation3 = mock(ServerLocation.class);
+    Connection connection3 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation3, false))
+        .thenReturn(connection3);
+
+    ServerLocation serverLocation4 = mock(ServerLocation.class);
+    Endpoint endpoint4 = mock(Endpoint.class);
+    Connection connection4 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(eq(Collections.EMPTY_SET)))
+        .thenReturn(connection4);
+    when(connection4.getServer()).thenReturn(serverLocation4);
+    when(connection4.getEndpoint()).thenReturn(endpoint4);
+    when(endpoint4.getLocation()).thenReturn(serverLocation4);
+
+    connectionManager = new ConnectionManagerImpl(poolName, connectionFactory, endpointManager,
+        maxConnections, 1, idleTimeout, lifetimeTimeout, securityLogger, pingInterval,
+        cancelCriterion, poolStats);
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(serverLocation1, timeout, false);
+    connectionManager.borrowConnection(serverLocation2, timeout, false);
+    connectionManager.borrowConnection(serverLocation3, timeout, false);
+    assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections);
+
+    heldConnection = connectionManager.exchangeConnection(heldConnection, excluded, timeout);
+    assertThat(connectionManager.getConnectionCount()).isGreaterThan(maxConnections);
+    assertThat(heldConnection.getServer()).isEqualTo(connection4.getServer());
+    verify(connectionFactory, times(1)).createClientToServerConnection(Collections.EMPTY_SET);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void exchangeReturnsExistingConnectionIfOneExists() {
+    Set<ServerLocation> excluded = Collections.emptySet();
+
+    ServerLocation serverLocation1 = mock(ServerLocation.class);
+    Connection connection1 = mock(Connection.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation1, false))
+        .thenReturn(connection1);
+
+    ServerLocation serverLocation2 = mock(ServerLocation.class);
+    Connection connection2 = mock(Connection.class);
+    Endpoint endpoint2 = mock(Endpoint.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation2, false))
+        .thenReturn(connection2);
+    when(connection2.getServer()).thenReturn(serverLocation2);
+    when(connection2.getEndpoint()).thenReturn(endpoint2);
+    when(endpoint2.getLocation()).thenReturn(serverLocation2);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection1 =
+        connectionManager.borrowConnection(serverLocation1, timeout, false);
+    Connection heldConnection2 =
+        connectionManager.borrowConnection(serverLocation2, timeout, false);
+
+    connectionManager.returnConnection(heldConnection2);
+    heldConnection2 = connectionManager.exchangeConnection(heldConnection1, excluded, timeout);
+    assertThat(heldConnection2.getServer()).isEqualTo(connection2.getServer());
+    assertThat(connectionManager.getConnectionCount()).isEqualTo(2);
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void activateActivatesConnection() {
+    Connection connection = mock(Connection.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation, false))
+        .thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
+    connectionManager.passivate(heldConnection, false);
+    connectionManager.activate(heldConnection);
+
+    assertThat(((PooledConnection) heldConnection).isActive()).isTrue();
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void activateThrowsIfConnectionIsAlreadyActive() {
+    Connection connection = mock(Connection.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation, false))
+        .thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
+
+    assertThatThrownBy(() -> connectionManager.activate(heldConnection))
+        .isInstanceOf(InternalGemFireException.class)
+        .hasMessageContaining("Connection already active");
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void passivatePassivatesConnection() {
+    Connection connection = mock(Connection.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation, false))
+        .thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
+    connectionManager.passivate(heldConnection, false);
+
+    assertThat(((PooledConnection) heldConnection).isActive()).isFalse();
+
+    connectionManager.close(false);
+  }
+
+  @Test
+  public void passivateThrowsWhenConnectionIsNotActive() {
+    Connection connection = mock(Connection.class);
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    when(connectionFactory.createClientToServerConnection(serverLocation, false))
+        .thenReturn(connection);
+
+    connectionManager = createDefaultConnectionManager();
+    connectionManager.start(backgroundProcessor);
+
+    Connection heldConnection = connectionManager.borrowConnection(serverLocation, timeout, false);
+    connectionManager.passivate(heldConnection, false);
+    assertThatThrownBy(() -> connectionManager.passivate(heldConnection, false))
+        .isInstanceOf(InternalGemFireException.class)
+        .hasMessageContaining("Connection not active");
+
+    connectionManager.close(false);
+  }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
index 15d25cf..9c3b616 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerJUnitTest.java
@@ -570,7 +570,7 @@ public class ConnectionManagerJUnitTest {
     Assert.assertEquals(0, factory.destroys);
     Assert.assertEquals(2, manager.getConnectionCount());
 
-    Connection conn3 = manager.exchangeConnection(conn1, Collections.EMPTY_SET, 10);
+    Connection conn3 = manager.exchangeConnection(conn1, Collections.emptySet(), 10);
 
     Assert.assertEquals(3, factory.creates);
     Assert.assertEquals(1, factory.destroys);
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index c07493f..cf128fd 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -153,14 +153,12 @@ public class OpExecutorImpl implements ExecutablePool {
       // while we're performing the op. It will be reset
       // if the op succeeds.
       localConnection.set(null);
-      try {
-        this.connectionManager.activate(conn);
-      } catch (ConnectionDestroyedException ex) {
+      if (!this.connectionManager.activate(conn)) {
         conn = connectionManager.borrowConnection(serverTimeout);
       }
     }
     try {
-      Set attemptedServers = null;
+      Set<ServerLocation> attemptedServers = null;
 
       for (int attempt = 0; true; attempt++) {
         // when an op is retried we may need to try to recover the previous
@@ -183,7 +181,7 @@ public class OpExecutorImpl implements ExecutablePool {
           handleException(e, conn, attempt, attempt >= retries && retries != -1);
           if (null == attemptedServers) {
             // don't allocate this until we need it
-            attemptedServers = new HashSet();
+            attemptedServers = new HashSet<>();
           }
           attemptedServers.add(conn.getServer());
           try {
@@ -445,15 +443,13 @@ public class OpExecutorImpl implements ExecutablePool {
     }
     boolean borrow = true;
     if (conn != null) {
-      try {
-        this.connectionManager.activate(conn);
+      if (this.connectionManager.activate(conn)) {
         borrow = false;
         if (!conn.getServer().equals(server)) {
           // poolLoadConditioningMonitor can replace the connection's
           // endpoint from underneath us. fixes bug 45151
           borrow = true;
         }
-      } catch (ConnectionDestroyedException e) {
       }
     }
     if (conn == null || borrow) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManager.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManager.java
new file mode 100644
index 0000000..6628266
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal.pooling;
+
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.function.Predicate;
+
+/**
+ * This manager maintains a collection of PooledConnection instances.
+ * PooledConnections can be added to this manager using one of the add* methods.
+ * PooledConnections can be taken out of this manager to be used by calling one of the use* methods.
+ * Once a PooledConnection has reached the end of its life it can be permanently removed from this
+ * manager using the remove method.
+ * The manager has a concept of "first" which identifies the PooledConnections that should be
+ * preferred to be used by the next use* method, and "last" which identifies PooledConnections that
+ * should only be used as a last resort.
+ *
+ */
+public class AvailableConnectionManager {
+  private final Deque<PooledConnection> connections =
+      new ConcurrentLinkedDeque<>();
+
+  /**
+   * Remove, activate, and return the first connection.
+   * Connections that can not be activated will be removed from the manager but not returned.
+   *
+   * @return the activated connection or null if none found
+   */
+  public PooledConnection useFirst() {
+    PooledConnection connection;
+    while (null != (connection = connections.pollFirst())) {
+      if (connection.activate()) {
+        return connection;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Removes the first connection equal to the given connection from this manager.
+   *
+   * @param connection the connection to remove
+   * @return true if a connection was removed; otherwise false
+   */
+  public boolean remove(PooledConnection connection) {
+    return connections.remove(connection);
+  }
+
+  /**
+   * Remove, activate, and return the first connection that matches the predicate.
+   * Connections that can not be activated will be removed from the manager but not returned.
+   *
+   * @param predicate that the connections are matched against
+   * @return the activated connection or null if none found
+   */
+  public PooledConnection useFirst(Predicate<PooledConnection> predicate) {
+    final EqualsWithPredicate equalsWithPredicate = new EqualsWithPredicate(predicate);
+    while (connections.removeFirstOccurrence(equalsWithPredicate)) {
+      PooledConnection connection = equalsWithPredicate.getConnectionThatMatched();
+      if (connection.activate()) {
+        return connection;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Passivate and add the given connection to this manager as its first, highest priority
+   * connection.
+   *
+   * @param connection the connection to passivate and add
+   * @param accessed true if the connection was used by the caller, false otherwise
+   */
+  public void addFirst(PooledConnection connection, boolean accessed) {
+    passivate(connection, accessed);
+    connections.addFirst(connection);
+  }
+
+  /**
+   * Passivate and add the given connection to this manager as its last, lowest priority connection.
+   *
+   * @param connection the connection to passivate and add
+   * @param accessed true if the connection was used by the caller, false otherwise
+   */
+  public void addLast(PooledConnection connection, boolean accessed) {
+    passivate(connection, accessed);
+    connections.addLast(connection);
+  }
+
+  private void passivate(PooledConnection connection, boolean accessed) {
+    // thread local connections are already passive at this point
+    if (connection.isActive()) {
+      connection.passivate(accessed);
+    }
+  }
+
+  // used by unit tests
+  Deque<PooledConnection> getDeque() {
+    return connections;
+  }
+
+  /**
+   * This class exists so that we can use ConcurrentLinkedDeque removeFirstOccurrence.
+   * We want to efficiently scan the Deque for an item that matches the predicate without changing
+   * the position of items that do not match. We also need to know the identity of the first item
+   * that did match.
+   */
+  private static class EqualsWithPredicate {
+    private final Predicate<PooledConnection> predicate;
+    private PooledConnection connectionThatMatched;
+
+    EqualsWithPredicate(Predicate<PooledConnection> predicate) {
+      this.predicate = predicate;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      PooledConnection pooledConnection = (PooledConnection) o;
+      if (predicate.test(pooledConnection)) {
+        this.connectionThatMatched = pooledConnection;
+        return true;
+      }
+      return false;
+    }
+
+    public PooledConnection getConnectionThatMatched() {
+      return this.connectionThatMatched;
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccounting.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccounting.java
new file mode 100644
index 0000000..3965f79
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccounting.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.client.internal.pooling;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Responsible for counting connections.
+ * The count maintained by this class will eventually be consistent with the actual number of
+ * connections. Since the count is changed before and after the actual connections are created and
+ * destroyed, and not changed while holding a lock, the count should be treated as an estimate of
+ * the current number of connections.
+ */
+public class ConnectionAccounting {
+  private final int minimum;
+  private final int maximum;
+  private final AtomicInteger count = new AtomicInteger();
+
+  public ConnectionAccounting(int min, int max) {
+    this.minimum = min;
+    this.maximum = max;
+  }
+
+  public int getMinimum() {
+    return minimum;
+  }
+
+  public int getMaximum() {
+    return maximum;
+  }
+
+  public int getCount() {
+    return count.get();
+  }
+
+  /**
+   * Should be called when prefilling connections to reach minimum connections. Caller should only
+   * create a connection if this method returns {@code true}. If connection creation fails then
+   * {@link #cancelTryPrefill} must be called to revert the count increase.
+   *
+   * @return {@code true} if count was under minimum and we increased it, otherwise {@code false}.
+   */
+  public boolean tryPrefill() {
+    return tryReserve(minimum);
+  }
+
+  /**
+   * Should only be called if connection creation failed after calling {@link #tryPrefill()} ()}.
+   */
+  public void cancelTryPrefill() {
+    count.getAndDecrement();
+  }
+
+  /**
+   * Should be called when a new connection would be nice to have when count is under maximum.
+   * Caller should only create a connection if this method returns {@code true}. If connection
+   * creation fails then {@link #cancelTryCreate} must be called to revert the count increase.
+   *
+   * @return {@code true} if count was under maximum and we increased it, otherwise {@code false}.
+   */
+  public boolean tryCreate() {
+    return tryReserve(maximum);
+  }
+
+  /**
+   * Should only be called if connection creation failed after calling {@link #tryCreate()}.
+   */
+  public void cancelTryCreate() {
+    count.decrementAndGet();
+  }
+
+  /**
+   * Count a created connection regardless of maximum. Should not be called after
+   * {@link #tryCreate()}.
+   */
+  public void create() {
+    count.getAndIncrement();
+  }
+
+  /**
+   * Should be called when a connection is being returned and the caller should destroy the
+   * connection if {@code true} is returned. If connection destroy fails then
+   * {@link #cancelTryDestroy()} must be called.
+   *
+   * @return {@code true} if count was over maximum and we decreased it, otherwise {@code false}.
+   */
+  public boolean tryDestroy() {
+    int currentCount;
+    while ((currentCount = count.get()) > maximum) {
+      if (count.compareAndSet(currentCount, currentCount - 1)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Should only be called if connection destroy failed after calling {@link #tryDestroy()}.
+   */
+  public void cancelTryDestroy() {
+    count.getAndIncrement();
+  }
+
+  /**
+   * Should be called after any connection destroys are done. Should not be called
+   * after {@link #tryDestroy()}.
+   *
+   * @param destroyCount number of connections being destroyed.
+   *
+   * @return {@code true} if after decreasing count it is under the minimum, otherwise
+   *         {@code false}.
+   */
+  public boolean destroyAndIsUnderMinimum(int destroyCount) {
+    int newCount = count.addAndGet(-destroyCount);
+    return newCount < minimum;
+  }
+
+  public boolean isUnderMinimum() {
+    return count.get() < minimum;
+  }
+
+  public boolean isOverMinimum() {
+    return count.get() > minimum;
+  }
+
+  private boolean tryReserve(int upperBound) {
+    int currentCount;
+    while ((currentCount = count.get()) < upperBound) {
+      if (count.compareAndSet(currentCount, currentCount + 1)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
index 55ddac0..ea221f0 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManager.java
@@ -17,9 +17,13 @@ package org.apache.geode.cache.client.internal.pooling;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.geode.InternalGemFireException;
 import org.apache.geode.cache.client.AllConnectionsInUseException;
 import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.distributed.PoolCancelledException;
 import org.apache.geode.distributed.internal.ServerLocation;
 
 /**
@@ -32,26 +36,35 @@ import org.apache.geode.distributed.internal.ServerLocation;
 public interface ConnectionManager {
 
   /**
-   * Borrow an existing idle connection or create a new one.
+   * Borrow an existing idle connection or create a new one. Fails after one failed attempt to
+   * create a new connection.
    *
    * @param aquireTimeout The amount of time to wait for a connection to become available.
    * @return A connection to use.
-   * @throws AllConnectionsInUseException If the maximum number of connections are already in use
+   * @throws AllConnectionsInUseException If the maximum number of connections are already, in use
    *         and no connection becomes free within the aquireTimeout.
    * @throws NoAvailableServersException if we can't connect to any server
+   * @throws ServerOperationException if there is an issue with security or connecting to a gateway
+   * @throws PoolCancelledException if the pool is being shut down
    */
   Connection borrowConnection(long aquireTimeout)
       throws AllConnectionsInUseException, NoAvailableServersException;
 
   /**
-   * Borrow an existing idle connection or create a new one to a specific server.
+   * Borrow an existing idle connection or create a new one to a specific server. Fails after one
+   * failed attempt to create a new connection. May cause pool to exceed maxConnections by one, if
+   * no connection is available.
    *
    * @param server The server the connection needs to be to.
    * @param aquireTimeout The amount of time to wait for a connection to become available.
+   * @param onlyUseExistingCnx if true, will not create a new connection if none are available.
    * @return A connection to use.
-   * @throws AllConnectionsInUseException If the maximum number of connections are already in use
-   *         and no connection becomes free within the aquireTimeout.
-   * @throws NoAvailableServersException if we can't connect to any server
+   * @throws AllConnectionsInUseException If there is no available connection on the desired server,
+   *         and onlyUseExistingCnx is set.
+   * @throws ServerOperationException If there is an issue creating the connection due to security
+   * @throws NoAvailableServersException If we can't connect to any server
+   * @throws ServerConnectivityException If finding a connection and creating a connection both fail
+   *         to return a connection
    *
    */
   Connection borrowConnection(ServerLocation server, long aquireTimeout, boolean onlyUseExistingCnx)
@@ -85,14 +98,20 @@ public interface ConnectionManager {
   void close(boolean keepAlive);
 
   /**
-   * Exchange one connection for a new connection to a different server.
+   * Exchange one connection for a new connection to a different server. This method can break the
+   * max connection contract if there is no available connection and maxConnections has already been
+   * reached.
    *
    * @param conn connection to exchange. It will be returned to the pool (or destroyed if it has
    *        been invalidated).
    * @param excludedServers servers to exclude when looking for a new connection
+   * @param aquireTimeout The amount of time to wait for a connection to be available
+   * @throws InternalGemFireException if the found connection is already active
+   * @throws NoAvailableServersException if no servers are available to connect to
+   * @throws ServerOperationException if creating a connection fails due to authentication issues
    * @return a new connection to the pool to a server that is not in the list of excluded servers
    */
-  Connection exchangeConnection(Connection conn, Set/* <ServerLocation> */ excludedServers,
+  Connection exchangeConnection(Connection conn, Set<ServerLocation> excludedServers,
       long aquireTimeout) throws AllConnectionsInUseException;
 
   /**
@@ -104,11 +123,16 @@ public interface ConnectionManager {
 
   /**
    * Used to active a thread local connection
+   *
+   * @return true if connection activated, false if could not be activated because it is destroyed
+   * @throws InternalGemFireException when the connection is already active
    */
-  void activate(Connection conn);
+  boolean activate(Connection conn);
 
   /**
    * Used to passivate a thread local connection
+   *
+   * @throws InternalGemFireException when the connection is already passive
    */
   void passivate(Connection conn, boolean accessed);
 
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index ae0b481..fff52e3 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -14,8 +14,9 @@
  */
 package org.apache.geode.cache.client.internal.pooling;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 import java.net.SocketException;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -24,13 +25,13 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SplittableRandom;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.logging.log4j.Logger;
 
@@ -38,7 +39,6 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.GatewayConfigurationException;
 import org.apache.geode.cache.client.AllConnectionsInUseException;
 import org.apache.geode.cache.client.NoAvailableServersException;
 import org.apache.geode.cache.client.ServerConnectivityException;
@@ -70,15 +70,15 @@ import org.apache.geode.security.GemFireSecurityException;
  */
 public class ConnectionManagerImpl implements ConnectionManager {
   private static final Logger logger = LogService.getLogger();
+  private static final int NOT_WAITING = -1;
 
   private final String poolName;
   private final PoolStats poolStats;
   protected final long prefillRetry; // ms
-  private final ArrayDeque<PooledConnection> availableConnections = new ArrayDeque<>();
+  private final AvailableConnectionManager availableConnectionManager =
+      new AvailableConnectionManager();
   protected final ConnectionMap allConnectionsMap = new ConnectionMap();
   private final EndpointManager endpointManager;
-  private final int maxConnections;
-  protected final int minConnections;
   private final long idleTimeout; // make this an int
   protected final long idleTimeoutNanos;
   final int lifetimeTimeout;
@@ -86,21 +86,17 @@ public class ConnectionManagerImpl implements ConnectionManager {
   private final InternalLogWriter securityLogWriter;
   protected final CancelCriterion cancelCriterion;
 
-  protected volatile int connectionCount;
+  private final ConnectionAccounting connectionAccounting;
   protected ScheduledExecutorService backgroundProcessor;
   protected ScheduledExecutorService loadConditioningProcessor;
 
-  protected ReentrantLock lock = new ReentrantLock();
-  protected Condition freeConnection = lock.newCondition();
   private ConnectionFactory connectionFactory;
   protected boolean haveIdleExpireConnectionsTask;
-  protected boolean havePrefillTask;
+  protected final AtomicBoolean havePrefillTask = new AtomicBoolean(false);
   private boolean keepAlive = false;
-  protected volatile boolean shuttingDown;
+  protected final AtomicBoolean shuttingDown = new AtomicBoolean(false);
   private EndpointManager.EndpointListenerAdapter endpointListener;
 
-  private static final long NANOS_PER_MS = 1000000L;
-
   /**
    * Adds an arbitrary variance to a positive temporal interval. Where possible, 10% of the interval
    * is added or subtracted from the interval. Otherwise, 1 is added or subtracted from the
@@ -153,10 +149,10 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
     this.connectionFactory = factory;
     this.endpointManager = endpointManager;
-    this.maxConnections = maxConnections == -1 ? Integer.MAX_VALUE : maxConnections;
-    this.minConnections = minConnections;
+    this.connectionAccounting = new ConnectionAccounting(minConnections,
+        maxConnections == -1 ? Integer.MAX_VALUE : maxConnections);
     this.lifetimeTimeout = addVarianceToInterval(lifetimeTimeout);
-    this.lifetimeTimeoutNanos = this.lifetimeTimeout * NANOS_PER_MS;
+    this.lifetimeTimeoutNanos = MILLISECONDS.toNanos(this.lifetimeTimeout);
     if (this.lifetimeTimeout != -1) {
       if (idleTimeout > this.lifetimeTimeout || idleTimeout == -1) {
         // lifetimeTimeout takes precedence over longer idle timeouts
@@ -164,7 +160,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
       }
     }
     this.idleTimeout = idleTimeout;
-    this.idleTimeoutNanos = this.idleTimeout * NANOS_PER_MS;
+    this.idleTimeoutNanos = MILLISECONDS.toNanos(this.idleTimeout);
     this.securityLogWriter = securityLogger;
     this.prefillRetry = pingInterval;
     this.cancelCriterion = cancelCriterion;
@@ -176,96 +172,135 @@ public class ConnectionManagerImpl implements ConnectionManager {
     };
   }
 
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.geode.cache.client.internal.pooling.ConnectionManager#borrowConnection(long)
-   */
-  @Override
-  public Connection borrowConnection(long acquireTimeout)
-      throws AllConnectionsInUseException, NoAvailableServersException {
+  private void destroyAndMaybePrefill() {
+    destroyAndMaybePrefill(1);
+  }
+
+  private void destroyAndMaybePrefill(int count) {
+    if (connectionAccounting.destroyAndIsUnderMinimum(count)) {
+      startBackgroundPrefill();
+    }
+  }
 
-    long startTime = System.currentTimeMillis();
-    long remainingTime = acquireTimeout;
+  private PooledConnection createPooledConnection()
+      throws NoAvailableServersException, ServerOperationException {
+    return createPooledConnection(Collections.emptySet());
+  }
 
-    // wait for a connection to become free
-    lock.lock();
+  private PooledConnection createPooledConnection(Set<ServerLocation> excludedServers)
+      throws NoAvailableServersException, ServerOperationException {
     try {
-      while (connectionCount >= maxConnections && availableConnections.isEmpty()
-          && remainingTime > 0 && !shuttingDown) {
-        final long start = getPoolStats().beginConnectionWait();
-        boolean interrupted = false;
-        try {
-          freeConnection.await(remainingTime, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-          interrupted = true;
-          cancelCriterion.checkCancelInProgress(e);
-          throw new AllConnectionsInUseException();
-        } finally {
-          if (interrupted) {
-            Thread.currentThread().interrupt();
-          }
-          getPoolStats().endConnectionWait(start);
-        }
-        remainingTime = acquireTimeout - (System.currentTimeMillis() - startTime);
-      }
-      if (shuttingDown) {
-        throw new PoolCancelledException();
-      }
+      return addConnection(connectionFactory.createClientToServerConnection(excludedServers));
+    } catch (GemFireSecurityException e) {
+      throw new ServerOperationException(e);
+    } catch (ServerRefusedConnectionException e) {
+      throw new NoAvailableServersException(e);
+    }
+  }
 
-      while (!availableConnections.isEmpty()) {
-        PooledConnection connection = availableConnections.removeFirst();
-        try {
-          connection.activate();
-          return connection;
-        } catch (ConnectionDestroyedException ex) {
-          // whoever destroyed it already decremented connectionCount
-        }
-      }
-      if (connectionCount >= maxConnections) {
-        throw new AllConnectionsInUseException();
-      } else {
-        // We need to create a connection. Reserve space for it.
-        connectionCount++;
-      }
+  private PooledConnection createPooledConnection(ServerLocation serverLocation)
+      throws ServerRefusedConnectionException, GemFireSecurityException {
+    return addConnection(connectionFactory.createClientToServerConnection(serverLocation, false));
+  }
 
-    } finally {
-      lock.unlock();
+  /**
+   * Always creates a connection and may cause {@link #connectionCount} to exceed
+   * {@link #maxConnections}.
+   */
+  private PooledConnection forceCreateConnection(ServerLocation serverLocation)
+      throws ServerRefusedConnectionException, ServerOperationException {
+    connectionAccounting.create();
+    try {
+      return createPooledConnection(serverLocation);
+    } catch (GemFireSecurityException e) {
+      throw new ServerOperationException(e);
+    }
+  }
+
+  /**
+   * Always creates a connection and may cause {@link #connectionCount} to exceed
+   * {@link #maxConnections}.
+   */
+  private PooledConnection forceCreateConnection(Set<ServerLocation> excludedServers)
+      throws NoAvailableServersException, ServerOperationException {
+    connectionAccounting.create();
+    return createPooledConnection(excludedServers);
+  }
+
+  private boolean checkShutdownInterruptedOrTimeout(final long timeout)
+      throws PoolCancelledException {
+    if (shuttingDown.get()) {
+      throw new PoolCancelledException();
+    }
+
+    if (Thread.currentThread().isInterrupted()) {
+      return true;
+    }
+
+    if (timeout < System.nanoTime()) {
+      return true;
     }
 
-    PooledConnection connection = null;
+    return false;
+  }
+
+  private long beginConnectionWaitStatIfNotStarted(final long waitStart) {
+    if (NOT_WAITING == waitStart) {
+      return getPoolStats().beginConnectionWait();
+    }
+
+    return waitStart;
+  }
+
+  private void endConnectionWaitStatIfStarted(final long waitStart) {
+    if (NOT_WAITING != waitStart) {
+      getPoolStats().endConnectionWait(waitStart);
+    }
+  }
+
+  @Override
+  public Connection borrowConnection(long acquireTimeout)
+      throws AllConnectionsInUseException, NoAvailableServersException, ServerOperationException {
+    long waitStart = NOT_WAITING;
     try {
-      Connection plainConnection =
-          connectionFactory.createClientToServerConnection(Collections.EMPTY_SET);
+      long timeout = System.nanoTime() + MILLISECONDS.toNanos(acquireTimeout);
+      while (true) {
+        PooledConnection connection = availableConnectionManager.useFirst();
+        if (null != connection) {
+          return connection;
+        }
 
-      connection = addConnection(plainConnection);
-    } catch (GemFireSecurityException e) {
-      throw new ServerOperationException(e);
-    } catch (GatewayConfigurationException e) {
-      throw new ServerOperationException(e);
-    } catch (ServerRefusedConnectionException srce) {
-      throw new NoAvailableServersException(srce);
-    } finally {
-      // if we failed, release the space we reserved for our connection
-      if (connection == null) {
-        lock.lock();
-        try {
-          --connectionCount;
-          if (connectionCount < minConnections) {
-            startBackgroundPrefill();
+        if (connectionAccounting.tryCreate()) {
+          try {
+            connection = createPooledConnection();
+            if (null != connection) {
+              return connection;
+            }
+            throw new NoAvailableServersException();
+          } finally {
+            if (connection == null) {
+              connectionAccounting.cancelTryCreate();
+              if (connectionAccounting.isUnderMinimum()) {
+                startBackgroundPrefill();
+              }
+            }
           }
-        } finally {
-          lock.unlock();
         }
-      }
-    }
 
-    if (connection == null) {
-      this.cancelCriterion.checkCancelInProgress(null);
-      throw new NoAvailableServersException();
+        if (checkShutdownInterruptedOrTimeout(timeout)) {
+          break;
+        }
+
+        waitStart = beginConnectionWaitStatIfNotStarted(waitStart);
+
+        Thread.yield();
+      }
+    } finally {
+      endConnectionWaitStatIfStarted(waitStart);
     }
 
-    return connection;
+    this.cancelCriterion.checkCancelInProgress(null);
+    throw new AllConnectionsInUseException();
   }
 
   /**
@@ -277,151 +312,49 @@ public class ConnectionManagerImpl implements ConnectionManager {
   @Override
   public Connection borrowConnection(ServerLocation server, long acquireTimeout,
       boolean onlyUseExistingCnx) throws AllConnectionsInUseException, NoAvailableServersException {
-    lock.lock();
-    try {
-      if (shuttingDown) {
-        throw new PoolCancelledException();
-      }
-      for (Iterator<PooledConnection> itr = availableConnections.iterator(); itr.hasNext();) {
-        PooledConnection nextConnection = itr.next();
-        try {
-          nextConnection.activate();
-          if (nextConnection.getServer().equals(server)) {
-            itr.remove();
-            return nextConnection;
-          }
-          nextConnection.passivate(false);
-        } catch (ConnectionDestroyedException ex) {
-          // someone else already destroyed this connection so ignore it
-          // but remove it from availableConnections
-        }
-        // Fix for 41516. Before we let this method exceed the max connections
-        // by creating a new connection, we need to make sure that they're
-        // aren't bogus connections sitting in the available connection list
-        // otherwise, the length of that list might exceed max connections,
-        // but with some bad connections. That can cause members to
-        // get a bad connection but have no permits to create a new connection.
-        if (nextConnection.shouldDestroy()) {
-          itr.remove();
-        }
-      }
-
-      if (onlyUseExistingCnx) {
-        throw new AllConnectionsInUseException();
-      }
-
-      // We need to create a connection. Reserve space for it.
-      connectionCount++;
-    } finally {
-      lock.unlock();
+    PooledConnection connection =
+        availableConnectionManager.useFirst((c) -> c.getServer().equals(server));
+    if (null != connection) {
+      return connection;
     }
 
-    PooledConnection connection = null;
-    try {
-      Connection plainConnection = connectionFactory.createClientToServerConnection(server, false);
-      connection = addConnection(plainConnection);
-    } catch (GemFireSecurityException e) {
-      throw new ServerOperationException(e);
-    } finally {
-      // if we failed, release the space we reserved for our connection
-      if (connection == null) {
-        lock.lock();
-        try {
-          --connectionCount;
-          if (connectionCount < minConnections) {
-            startBackgroundPrefill();
-          }
-        } finally {
-          lock.unlock();
-        }
-      }
+    if (onlyUseExistingCnx) {
+      throw new AllConnectionsInUseException();
     }
-    if (connection == null) {
-      throw new ServerConnectivityException(
-          "Could not create a new connection to server " + server);
+
+    connection = forceCreateConnection(server);
+    if (null != connection) {
+      return connection;
     }
-    return connection;
+
+    throw new ServerConnectivityException(
+        "Could not create a new connection to server " + server);
   }
 
   @Override
-  public Connection exchangeConnection(Connection oldConnection,
-      Set/* <ServerLocation> */ excludedServers, long acquireTimeout)
+  public Connection exchangeConnection(final Connection oldConnection,
+      final Set<ServerLocation> excludedServers, final long acquireTimeout)
       throws AllConnectionsInUseException {
-    assert oldConnection instanceof PooledConnection;
-    PooledConnection newConnection = null;
-    PooledConnection oldPC = (PooledConnection) oldConnection;
 
-    boolean needToUndoEstimate = false;
-    lock.lock();
     try {
-      if (shuttingDown) {
-        throw new PoolCancelledException();
+      PooledConnection connection = availableConnectionManager
+          .useFirst((c) -> !excludedServers.contains(c.getServer()));
+      if (null != connection) {
+        return connection;
       }
-      for (Iterator<PooledConnection> itr = availableConnections.iterator(); itr.hasNext();) {
-        PooledConnection nextConnection = itr.next();
-        if (!excludedServers.contains(nextConnection.getServer())) {
-          itr.remove();
-          try {
-            nextConnection.activate();
-            newConnection = nextConnection;
-            if (allConnectionsMap.removeConnection(oldPC)) {
-              --connectionCount;
-              if (connectionCount < minConnections) {
-                startBackgroundPrefill();
-              }
-            }
-            break;
-          } catch (ConnectionDestroyedException ex) {
-            // someone else already destroyed this connection so ignore it
-            // but remove it from availableConnections
-          }
-        }
-      }
-      if (newConnection == null) {
-        if (!allConnectionsMap.removeConnection(oldPC)) {
-          // We need to create a connection. Reserve space for it.
-          needToUndoEstimate = true;
-          connectionCount++;
-        }
-      }
-    } finally {
-      lock.unlock();
-    }
 
-    if (newConnection == null) {
-      try {
-        Connection plainConnection =
-            connectionFactory.createClientToServerConnection(excludedServers);
-        newConnection = addConnection(plainConnection);
-      } catch (GemFireSecurityException e) {
-        throw new ServerOperationException(e);
-      } catch (ServerRefusedConnectionException srce) {
-        throw new NoAvailableServersException(srce);
-      } finally {
-        if (needToUndoEstimate && newConnection == null) {
-          lock.lock();
-          try {
-            --connectionCount;
-            if (connectionCount < minConnections) {
-              startBackgroundPrefill();
-            }
-          } finally {
-            lock.unlock();
-          }
-        }
+      connection = forceCreateConnection(excludedServers);
+      if (null != connection) {
+        return connection;
       }
-    }
 
-    if (newConnection == null) {
       throw new NoAvailableServersException();
+    } finally {
+      returnConnection(oldConnection, true, true);
     }
-
-    oldPC.internalDestroy();
-
-    return newConnection;
   }
 
-  protected/* GemStoneAddition */ String getPoolName() {
+  protected String getPoolName() {
     return this.poolName;
   }
 
@@ -437,92 +370,47 @@ public class ConnectionManagerImpl implements ConnectionManager {
     allConnectionsMap.addConnection(pooledConn);
     if (logger.isDebugEnabled()) {
       logger.debug("Created a new connection. {} Connection count is now {}", pooledConn,
-          connectionCount);
+          connectionAccounting.getCount());
     }
     return pooledConn;
   }
 
   private void destroyConnection(PooledConnection connection) {
-    lock.lock();
-    try {
-      if (allConnectionsMap.removeConnection(connection)) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Invalidating connection {} connection count is now {}", connection,
-              connectionCount);
-        }
-
-        if (connectionCount < minConnections) {
-          startBackgroundPrefill();
-        }
-        freeConnection.signalAll();
+    if (allConnectionsMap.removeConnection(connection)) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Invalidating connection {} connection count is now {}", connection,
+            connectionAccounting.getCount());
       }
-      --connectionCount; // fix for bug #50333
-    } finally {
-      lock.unlock();
+
+      destroyAndMaybePrefill();
     }
 
     connection.internalDestroy();
   }
 
 
-  /*
-   * (non-Javadoc)
-   *
-   * @see
-   * org.apache.geode.cache.client.internal.pooling.ConnectionManager#invalidateServer(org.apache.
-   * geode.distributed.internal.ServerLocation)
-   */
   protected void invalidateServer(Endpoint endpoint) {
-    Set badConnections = allConnectionsMap.removeEndpoint(endpoint);
+    Set<PooledConnection> badConnections = allConnectionsMap.removeEndpoint(endpoint);
     if (badConnections == null) {
       return;
     }
 
-    lock.lock();
-    try {
-      if (shuttingDown) {
-        return;
-      }
-      if (logger.isDebugEnabled()) {
-        logger.debug("Invalidating {} connections to server {}", badConnections.size(), endpoint);
-      }
-
-      // mark connections for destruction now, so if anyone tries
-      // to return a connection they'll get an exception
-      for (Iterator itr = badConnections.iterator(); itr.hasNext();) {
-        PooledConnection conn = (PooledConnection) itr.next();
-        if (!conn.setShouldDestroy()) {
-        }
-      }
-
-      availableConnections.removeIf(badConnections::contains);
-
-      connectionCount -= badConnections.size();
-
-      if (connectionCount < minConnections) {
-        startBackgroundPrefill();
-      }
+    if (shuttingDown.get()) {
+      return;
+    }
 
-      for (Iterator itr = badConnections.iterator(); itr.hasNext();) {
-        PooledConnection conn = (PooledConnection) itr.next();
-        conn.internalDestroy();
-      }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Invalidating {} connections to server {}", badConnections.size(), endpoint);
+    }
 
-      if (connectionCount < maxConnections) {
-        freeConnection.signalAll();
+    for (PooledConnection conn : badConnections) {
+      if (conn.internalDestroy()) {
+        destroyAndMaybePrefill();
+        availableConnectionManager.remove(conn);
       }
-    } finally {
-      lock.unlock();
     }
   }
 
-  /*
-   * (non-Javadoc)
-   *
-   * @see
-   * org.apache.geode.cache.client.internal.pooling.ConnectionManager#returnConnection(org.apache.
-   * geode.cache.client.internal.Connection)
-   */
   @Override
   public void returnConnection(Connection connection) {
     returnConnection(connection, true);
@@ -530,83 +418,69 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
   @Override
   public void returnConnection(Connection connection, boolean accessed) {
+    returnConnection(connection, accessed, false);
+  }
 
+  private void returnConnection(Connection connection, boolean accessed, boolean addLast) {
     assert connection instanceof PooledConnection;
     PooledConnection pooledConn = (PooledConnection) connection;
 
-    boolean shouldClose = false;
-
-    lock.lock();
-    try {
-      if (pooledConn.isDestroyed()) {
-        return;
-      }
+    if (pooledConn.isDestroyed()) {
+      return;
+    }
 
-      if (pooledConn.shouldDestroy()) {
-        destroyConnection(pooledConn);
+    if (pooledConn.shouldDestroy()) {
+      destroyConnection(pooledConn);
+    } else if (!destroyIfOverLimit(pooledConn)) {
+      if (addLast) {
+        availableConnectionManager.addLast(pooledConn, accessed);
       } else {
-        // thread local connections are already passive at this point
-        if (pooledConn.isActive()) {
-          pooledConn.passivate(accessed);
-        }
-
-        // borrowConnection(ServerLocation, long) allows us to break the
-        // connection limit in order to get a connection to a server. So we need
-        // to get our pool back to size if we're above the limit
-        if (connectionCount > maxConnections) {
-          if (allConnectionsMap.removeConnection(pooledConn)) {
-            shouldClose = true;
-            // getPoolStats().incConCount(-1);
-            --connectionCount;
-          }
-        } else {
-          availableConnections.addFirst(pooledConn);
-          freeConnection.signalAll();
-        }
+        availableConnectionManager.addFirst(pooledConn, accessed);
       }
-    } finally {
-      lock.unlock();
     }
+  }
 
-    if (shouldClose) {
-      try {
-        PoolImpl localpool = (PoolImpl) PoolManagerImpl.getPMI().find(poolName);
-        boolean durable = false;
-        if (localpool != null) {
-          durable = localpool.isDurableClient();
+  /**
+   * Destroys connection if and only if {@link #connectionCount} exceeds {@link #maxConnections}.
+   *
+   * @return true if connection is destroyed, otherwise false.
+   */
+  private boolean destroyIfOverLimit(PooledConnection connection) {
+    if (connectionAccounting.tryDestroy()) {
+      if (allConnectionsMap.removeConnection(connection)) {
+        try {
+          PoolImpl localpool = (PoolImpl) PoolManagerImpl.getPMI().find(poolName);
+          boolean durable = false;
+          if (localpool != null) {
+            durable = localpool.isDurableClient();
+          }
+          connection.internalClose(durable || this.keepAlive);
+        } catch (Exception e) {
+          logger.warn(String.format("Error closing connection %s", connection), e);
         }
-        pooledConn.internalClose(durable || this.keepAlive);
-      } catch (Exception e) {
-        logger.warn(String.format("Error closing connection %s", pooledConn), e);
+      } else {
+        // Not a pooled connection so undo the decrement.
+        connectionAccounting.cancelTryDestroy();
       }
+
+      return true;
     }
+
+    return false;
   }
 
-  /*
-   * (non-Javadoc)
-   */
   @Override
   public void start(ScheduledExecutorService backgroundProcessor) {
     this.backgroundProcessor = backgroundProcessor;
     String name = "poolLoadConditioningMonitor-" + getPoolName();
     this.loadConditioningProcessor =
-        LoggingExecutors.newScheduledThreadPool(name, 1/* why not 0? */, false);
+        LoggingExecutors.newScheduledThreadPool(name, 1, false);
 
     endpointManager.addListener(endpointListener);
 
-    lock.lock();
-    try {
-      startBackgroundPrefill();
-    } finally {
-      lock.unlock();
-    }
+    startBackgroundPrefill();
   }
 
-  /*
-   * (non-Javadoc)
-   *
-   * @see org.apache.geode.cache.client.internal.pooling.ConnectionManager#close(boolean, long)
-   */
   @Override
   public void close(boolean keepAlive) {
     if (logger.isDebugEnabled()) {
@@ -615,21 +489,15 @@ public class ConnectionManagerImpl implements ConnectionManager {
     this.keepAlive = keepAlive;
     endpointManager.removeListener(endpointListener);
 
-    lock.lock();
-    try {
-      if (shuttingDown) {
-        return;
-      }
-      shuttingDown = true;
-    } finally {
-      lock.unlock();
+    if (!shuttingDown.compareAndSet(false, true)) {
+      return;
     }
 
     try {
       if (this.loadConditioningProcessor != null) {
         this.loadConditioningProcessor.shutdown();
         if (!this.loadConditioningProcessor.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT,
-            TimeUnit.MILLISECONDS)) {
+            MILLISECONDS)) {
           logger.warn("Timeout waiting for load conditioning tasks to complete");
         }
       }
@@ -645,7 +513,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
   @Override
   public void emergencyClose() {
-    shuttingDown = true;
+    shuttingDown.set(true);
     if (this.loadConditioningProcessor != null) {
       this.loadConditioningProcessor.shutdown();
     }
@@ -659,7 +527,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
           haveIdleExpireConnectionsTask = true;
           try {
             backgroundProcessor.schedule(new IdleExpireConnectionsTask(), idleTimeout,
-                TimeUnit.MILLISECONDS);
+                MILLISECONDS);
           } catch (RejectedExecutionException e) {
             // ignore, the timer has been cancelled, which means we're shutting
             // down.
@@ -669,10 +537,8 @@ public class ConnectionManagerImpl implements ConnectionManager {
     }
   }
 
-  /** Always called with lock held */
   protected void startBackgroundPrefill() {
-    if (!havePrefillTask) {
-      havePrefillTask = true;
+    if (havePrefillTask.compareAndSet(false, true)) {
       try {
         backgroundProcessor.execute(new PrefillConnectionsTask());
       } catch (RejectedExecutionException e) {
@@ -684,7 +550,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
   protected boolean prefill() {
     try {
-      while (connectionCount < minConnections) {
+      while (connectionAccounting.isUnderMinimum()) {
         if (cancelCriterion.isCancelInProgress()) {
           return true;
         }
@@ -707,7 +573,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
   @Override
   public int getConnectionCount() {
-    return this.connectionCount;
+    return connectionAccounting.getCount();
   }
 
   protected PoolStats getPoolStats() {
@@ -725,61 +591,43 @@ public class ConnectionManagerImpl implements ConnectionManager {
     }
   }
 
+
   private boolean prefillConnection() {
-    boolean createConnection = false;
-    lock.lock();
-    try {
-      if (shuttingDown) {
-        return false;
-      }
-      if (connectionCount < minConnections) {
-        connectionCount++;
-        createConnection = true;
-      }
-    } finally {
-      lock.unlock();
+    if (shuttingDown.get()) {
+      return false;
     }
 
-    if (createConnection) {
+    if (connectionAccounting.tryPrefill()) {
       PooledConnection connection = null;
       try {
-        Connection plainConnection =
-            connectionFactory.createClientToServerConnection(Collections.EMPTY_SET);
-        if (plainConnection == null) {
+        connection = createPooledConnection();
+        if (connection == null) {
           return false;
         }
-        connection = addConnection(plainConnection);
-        connection.passivate(false);
         getPoolStats().incPrefillConnect();
+        availableConnectionManager.addLast(connection, false);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Prefilled connection {} connection count is now {}", connection,
+              connectionAccounting.getCount());
+        }
+        return true;
       } catch (ServerConnectivityException ex) {
-        logger
-            .info(String.format("Unable to prefill pool to minimum because: %s",
-                ex.getMessage()));
+        logger.info(
+            String.format("Unable to prefill pool to minimum because: %s", ex.getMessage()));
         return false;
       } finally {
-        lock.lock();
-        try {
-          if (connection == null) {
-            connectionCount--;
-            if (logger.isDebugEnabled()) {
-              logger.debug("Unable to prefill pool to minimum, connection count is now {}",
-                  connectionCount);
-            }
-          } else {
-            availableConnections.addFirst(connection);
-            freeConnection.signalAll();
-            if (logger.isDebugEnabled()) {
-              logger.debug("Prefilled connection {} connection count is now {}", connection,
-                  connectionCount);
-            }
+        if (connection == null) {
+          connectionAccounting.cancelTryPrefill();
+
+          if (logger.isDebugEnabled()) {
+            logger.debug("Unable to prefill pool to minimum, connection count is now {}",
+                this::getConnectionCount);
           }
-        } finally {
-          lock.unlock();
         }
       }
     }
 
-    return true;
+    return false;
   }
 
   public static void loadEmergencyClasses() {
@@ -810,7 +658,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
     public void run() {
       try {
         getPoolStats().incIdleCheck();
-        allConnectionsMap.checkIdleExpiration();
+        allConnectionsMap.expireIdleConnections();
       } catch (CancelException ignore) {
       } catch (VirtualMachineError e) {
         SystemFailure.initiateFailure(e);
@@ -827,28 +675,21 @@ public class ConnectionManagerImpl implements ConnectionManager {
   }
 
   protected class PrefillConnectionsTask extends PoolTask {
-
     @Override
     public void run2() {
       if (logger.isTraceEnabled()) {
         logger.trace("Prefill Connections task running");
       }
-
       prefill();
-      lock.lock();
-      try {
-        if (connectionCount < minConnections && !cancelCriterion.isCancelInProgress()) {
-          try {
-            backgroundProcessor.schedule(new PrefillConnectionsTask(), prefillRetry,
-                TimeUnit.MILLISECONDS);
-          } catch (RejectedExecutionException e) {
-            // ignore, the timer has been cancelled, which means we're shutting down.
-          }
-        } else {
-          havePrefillTask = false;
+      if (connectionAccounting.isUnderMinimum() && !cancelCriterion.isCancelInProgress()) {
+        try {
+          backgroundProcessor.schedule(new PrefillConnectionsTask(), prefillRetry,
+              MILLISECONDS);
+        } catch (RejectedExecutionException ignored) {
+          // ignore, the timer has been cancelled, which means we're shutting down.
         }
-      } finally {
-        lock.unlock();
+      } else {
+        havePrefillTask.set(false);
       }
     }
   }
@@ -902,66 +743,54 @@ public class ConnectionManagerImpl implements ConnectionManager {
    * @return true if caller should recheck for expired lifetimes; false if a background check was
    *         scheduled or no expirations are possible.
    */
-  public boolean createLifetimeReplacementConnection(ServerLocation currentServer,
+  private boolean createLifetimeReplacementConnection(ServerLocation currentServer,
       boolean idlePossible) {
-    HashSet excludedServers = new HashSet();
-    ServerLocation sl = this.connectionFactory.findBestServer(currentServer, excludedServers);
-
-    while (sl != null) {
-      if (sl.equals(currentServer)) {
-        this.allConnectionsMap.extendLifeOfCnxToServer(currentServer);
+    HashSet<ServerLocation> excludedServers = new HashSet<>();
+    while (true) {
+      ServerLocation sl = connectionFactory.findBestServer(currentServer, excludedServers);
+      if (sl == null || sl.equals(currentServer)) {
+        // we didn't find a server to create a replacement cnx on so
+        // extends the currentServers life
+        allConnectionsMap.extendLifeOfCnxToServer(currentServer);
         break;
-      } else {
-        if (!this.allConnectionsMap.hasExpiredCnxToServer(currentServer)) {
-          break;
-        }
-        Connection con = null;
-        try {
-          con = this.connectionFactory.createClientToServerConnection(sl, false);
-        } catch (GemFireSecurityException e) {
-          securityLogWriter.warning(
-              String.format("Security exception connecting to server '%s': %s",
-                  new Object[] {sl, e}));
-        } catch (ServerRefusedConnectionException srce) {
-          logger.warn("Server '{}' refused new connection: {}",
-              new Object[] {sl, srce});
-        }
-        if (con == null) {
-          excludedServers.add(sl);
-          sl = this.connectionFactory.findBestServer(currentServer, excludedServers);
-        } else {
+      }
+      if (!allConnectionsMap.hasExpiredCnxToServer(currentServer)) {
+        break;
+      }
+      Connection con = null;
+      try {
+        con = connectionFactory.createClientToServerConnection(sl, false);
+        if (con != null) {
           getPoolStats().incLoadConditioningConnect();
-          if (!this.allConnectionsMap.hasExpiredCnxToServer(currentServer)) {
+          if (allConnectionsMap.hasExpiredCnxToServer(currentServer)) {
+            offerReplacementConnection(con, currentServer);
+          } else {
             getPoolStats().incLoadConditioningReplaceTimeouts();
             con.destroy();
-            break;
           }
-          offerReplacementConnection(con, currentServer);
           break;
         }
+      } catch (GemFireSecurityException e) {
+        securityLogWriter.warning(
+            String.format("Security exception connecting to server '%s': %s",
+                new Object[] {sl, e}));
+      } catch (ServerRefusedConnectionException srce) {
+        logger.warn("Server '{}' refused new connection: {}",
+            new Object[] {sl, srce});
       }
+      excludedServers.add(sl);
     }
-    if (sl == null) {
-      // we didn't find a server to create a replacement cnx on so
-      // extends the currentServers life
-      this.allConnectionsMap.extendLifeOfCnxToServer(currentServer);
-    }
-    return this.allConnectionsMap.checkForReschedule(true);
+    return allConnectionsMap.checkForReschedule(true);
   }
 
   protected class ConnectionMap {
-    private final HashMap/* <Endpoint, HashSet<PooledConnection> */ map = new HashMap();
-    private List/* <PooledConnection> */ allConnections = new LinkedList/* <PooledConnection> */(); // in
-                                                                                                    // the
-                                                                                                    // order
-                                                                                                    // they
-                                                                                                    // were
-                                                                                                    // created
+    private final Map<Endpoint, Set<PooledConnection>> map = new HashMap<>();
+    private List<PooledConnection> allConnections = new LinkedList<>();
     private boolean haveLifetimeExpireConnectionsTask;
     volatile boolean closing;
 
     public synchronized boolean isIdleExpirePossible() {
-      return this.allConnections.size() > minConnections;
+      return this.allConnections.size() > connectionAccounting.getMinimum();
     }
 
     @Override
@@ -991,25 +820,23 @@ public class ConnectionManagerImpl implements ConnectionManager {
       if (this.closing) {
         throw new CacheClosedException("This pool is closing");
       }
-      synchronized (this) {
 
-        getPoolStats().incPoolConnections(1);
+      getPoolStats().incPoolConnections(1);
 
-        // we want the smallest birthDate (e.g. oldest cnx) at the front of the list
-        this.allConnections.add(connection);
+      // we want the smallest birthDate (e.g. oldest cnx) at the front of the list
+      this.allConnections.add(connection);
 
-        addToEndpointMap(connection);
+      addToEndpointMap(connection);
 
-        if (isIdleExpirePossible()) {
-          startBackgroundExpiration();
-        }
-        if (lifetimeTimeout != -1 && !haveLifetimeExpireConnectionsTask) {
-          if (checkForReschedule(true)) {
-            // something has already expired so start processing with no delay
-            startBackgroundLifetimeExpiration(0);
-          } else {
-            // either no possible lifetime expires or we scheduled one
-          }
+      if (isIdleExpirePossible()) {
+        startBackgroundExpiration();
+      }
+      if (lifetimeTimeout != -1 && !haveLifetimeExpireConnectionsTask) {
+        if (checkForReschedule(true)) {
+          // something has already expired so start processing with no delay
+          startBackgroundLifetimeExpiration(0);
+        } else {
+          // either no possible lifetime expires or we scheduled one
         }
       }
     }
@@ -1029,11 +856,11 @@ public class ConnectionManagerImpl implements ConnectionManager {
       }
     }
 
-    public synchronized Set removeEndpoint(Endpoint endpoint) {
-      final Set endpointConnections = (Set) this.map.remove(endpoint);
+    public synchronized Set<PooledConnection> removeEndpoint(Endpoint endpoint) {
+      final Set<PooledConnection> endpointConnections = this.map.remove(endpoint);
       if (endpointConnections != null) {
         int count = 0;
-        for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
+        for (Iterator<PooledConnection> it = this.allConnections.iterator(); it.hasNext();) {
           if (endpointConnections.contains(it.next())) {
             count++;
             it.remove();
@@ -1046,10 +873,6 @@ public class ConnectionManagerImpl implements ConnectionManager {
       return endpointConnections;
     }
 
-    public synchronized boolean containsConnection(PooledConnection connection) {
-      return this.allConnections.contains(connection);
-    }
-
     public synchronized boolean removeConnection(PooledConnection connection) {
       boolean result = this.allConnections.remove(connection);
       if (result) {
@@ -1061,9 +884,9 @@ public class ConnectionManagerImpl implements ConnectionManager {
     }
 
     private synchronized void addToEndpointMap(PooledConnection connection) {
-      Set endpointConnections = (Set) map.get(connection.getEndpoint());
+      Set<PooledConnection> endpointConnections = map.get(connection.getEndpoint());
       if (endpointConnections == null) {
-        endpointConnections = new HashSet();
+        endpointConnections = new HashSet<>();
         map.put(connection.getEndpoint(), endpointConnections);
       }
       endpointConnections.add(connection);
@@ -1075,7 +898,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
     private synchronized void removeFromEndpointMap(Endpoint endpoint,
         PooledConnection connection) {
-      Set endpointConnections = (Set) this.map.get(endpoint);
+      Set<PooledConnection> endpointConnections = this.map.get(endpoint);
       if (endpointConnections != null) {
         endpointConnections.remove(connection);
         if (endpointConnections.size() == 0) {
@@ -1136,8 +959,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
      */
     public synchronized PooledConnection findReplacementTarget(ServerLocation currentServer) {
       final long now = System.nanoTime();
-      for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
-        PooledConnection pc = (PooledConnection) it.next();
+      for (PooledConnection pc : allConnections) {
         if (currentServer.equals(pc.getServer())) {
           if (!pc.shouldDestroy() && pc.remainingLife(now, lifetimeTimeoutNanos) <= 0) {
             removeFromEndpointMap(pc);
@@ -1155,8 +977,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
     public synchronized boolean hasExpiredCnxToServer(ServerLocation currentServer) {
       if (!this.allConnections.isEmpty()) {
         final long now = System.nanoTime();
-        for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
-          PooledConnection pc = (PooledConnection) it.next();
+        for (PooledConnection pc : allConnections) {
           if (pc.shouldDestroy()) {
             // this con has already been destroyed so ignore it
             continue;
@@ -1180,8 +1001,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
     public synchronized boolean checkForReschedule(boolean rescheduleOk) {
       if (!this.allConnections.isEmpty()) {
         final long now = System.nanoTime();
-        for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
-          PooledConnection pc = (PooledConnection) it.next();
+        for (PooledConnection pc : allConnections) {
           if (pc.hasIdleExpired(now, idleTimeoutNanos)) {
             // this con has already idle expired so ignore it
             continue;
@@ -1212,8 +1032,8 @@ public class ConnectionManagerImpl implements ConnectionManager {
     public synchronized void extendLifeOfCnxToServer(ServerLocation sl) {
       if (!this.allConnections.isEmpty()) {
         final long now = System.nanoTime();
-        for (Iterator it = this.allConnections.iterator(); it.hasNext();) {
-          PooledConnection pc = (PooledConnection) it.next();
+        for (Iterator<PooledConnection> it = this.allConnections.iterator(); it.hasNext();) {
+          PooledConnection pc = it.next();
           if (pc.remainingLife(now, lifetimeTimeoutNanos) > 0) {
             // no more connections whose lifetime could have expired
             break;
@@ -1247,36 +1067,36 @@ public class ConnectionManagerImpl implements ConnectionManager {
       }
     }
 
-    public void checkIdleExpiration() {
+    public void expireIdleConnections() {
       int expireCount = 0;
       List<PooledConnection> toClose = null;
       synchronized (this) {
         haveIdleExpireConnectionsTask = false;
-        if (shuttingDown) {
+        if (shuttingDown.get()) {
           return;
         }
         if (logger.isTraceEnabled()) {
           logger.trace("Looking for connections to expire");
         }
 
-        // because we expire thread local connections we need to scan allConnections
-
         // find connections which have idle expired
-        int conCount = this.allConnections.size();
-        if (conCount <= minConnections) {
+        if (!connectionAccounting.isOverMinimum()) {
           return;
         }
-        final long now = System.nanoTime();
+
         long minRemainingIdle = Long.MAX_VALUE;
-        toClose = new ArrayList<PooledConnection>(conCount - minConnections);
-        for (Iterator it = this.allConnections.iterator(); it.hasNext()
-            && conCount > minConnections;) {
-          PooledConnection pc = (PooledConnection) it.next();
+        int conCount = connectionAccounting.getCount();
+        toClose = new ArrayList<>(conCount - connectionAccounting.getMinimum());
+
+        // because we expire thread local connections we need to scan allConnections
+        for (Iterator<PooledConnection> it = allConnections.iterator(); it.hasNext()
+            && conCount > connectionAccounting.getMinimum();) {
+          PooledConnection pc = it.next();
           if (pc.shouldDestroy()) {
             // ignore these connections
             conCount--;
           } else {
-            long remainingIdle = pc.doIdleTimeout(now, idleTimeoutNanos);
+            long remainingIdle = pc.doIdleTimeout(System.nanoTime(), idleTimeoutNanos);
             if (remainingIdle >= 0) {
               if (remainingIdle == 0) {
                 // someone else already destroyed pc so ignore it
@@ -1294,7 +1114,8 @@ public class ConnectionManagerImpl implements ConnectionManager {
             }
           }
         }
-        if (conCount > minConnections && minRemainingIdle < Long.MAX_VALUE) {
+        if (conCount > connectionAccounting.getMinimum()
+            && minRemainingIdle < Long.MAX_VALUE) {
           try {
             backgroundProcessor.schedule(new IdleExpireConnectionsTask(), minRemainingIdle,
                 TimeUnit.NANOSECONDS);
@@ -1308,32 +1129,19 @@ public class ConnectionManagerImpl implements ConnectionManager {
       if (expireCount > 0) {
         getPoolStats().incIdleExpire(expireCount);
         getPoolStats().incPoolConnections(-expireCount);
-        // do this outside the above sync
-        lock.lock();
-        try {
-          connectionCount -= expireCount;
-          freeConnection.signalAll();
-          if (connectionCount < minConnections) {
-            startBackgroundPrefill();
-          }
-        } finally {
-          lock.unlock();
-        }
+        destroyAndMaybePrefill(expireCount);
       }
+
       // now destroy all of the connections, outside the sync
-      // if (toClose != null) (cannot be null)
       final boolean isDebugEnabled = logger.isDebugEnabled();
-      {
-        for (Iterator itr = toClose.iterator(); itr.hasNext();) {
-          PooledConnection connection = (PooledConnection) itr.next();
-          if (isDebugEnabled) {
-            logger.debug("Idle connection detected. Expiring connection {}", connection);
-          }
-          try {
-            connection.internalClose(false);
-          } catch (Exception e) {
-            logger.warn("Error expiring connection {}", connection);
-          }
+      for (PooledConnection connection : toClose) {
+        if (isDebugEnabled) {
+          logger.debug("Idle connection detected. Expiring connection {}", connection);
+        }
+        try {
+          connection.internalClose(false);
+        } catch (Exception e) {
+          logger.warn("Error expiring connection {}", connection);
         }
       }
     }
@@ -1342,19 +1150,18 @@ public class ConnectionManagerImpl implements ConnectionManager {
       boolean done;
       synchronized (this) {
         this.haveLifetimeExpireConnectionsTask = false;
-        if (shuttingDown) {
+        if (shuttingDown.get()) {
           return;
         }
       }
       do {
         getPoolStats().incLoadConditioningCheck();
         long firstLife = -1;
-        done = true;
         ServerLocation candidate = null;
         boolean idlePossible = true;
 
         synchronized (this) {
-          if (shuttingDown) {
+          if (shuttingDown.get()) {
             return;
           }
           // find a connection whose lifetime has expired
@@ -1362,9 +1169,10 @@ public class ConnectionManagerImpl implements ConnectionManager {
           long now = System.nanoTime();
           long life = 0;
           idlePossible = isIdleExpirePossible();
-          for (Iterator it = this.allConnections.iterator(); it.hasNext() && life <= 0
+          for (Iterator<PooledConnection> it = this.allConnections.iterator(); it.hasNext()
+              && life <= 0
               && (candidate == null);) {
-            PooledConnection pc = (PooledConnection) it.next();
+            PooledConnection pc = it.next();
             // skip over idle expired and destroyed
             life = pc.remainingLife(now, lifetimeTimeoutNanos);
             if (life <= 0) {
@@ -1386,7 +1194,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
             // reschedule
             startBackgroundLifetimeExpiration(firstLife);
           }
-          done = true; // just to be clear
+          done = true;
         }
       } while (!done);
       // If a lifetimeExpire task is not scheduled at this point then
@@ -1416,9 +1224,9 @@ public class ConnectionManagerImpl implements ConnectionManager {
   }
 
   @Override
-  public void activate(Connection conn) {
+  public boolean activate(Connection conn) {
     assert conn instanceof PooledConnection;
-    ((PooledConnection) conn).activate();
+    return ((PooledConnection) conn).activate();
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
index d47333b..f0d3491 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/PooledConnection.java
@@ -69,7 +69,11 @@ public class PooledConnection implements Connection {
     }
   }
 
-  public void internalDestroy() {
+  /**
+   * @return true if internal connection was destroyed by this call; false if already destroyed
+   */
+  public boolean internalDestroy() {
+    boolean result = false;
     this.shouldDestroy.set(true); // probably already set but make sure
     synchronized (this) {
       this.active = false;
@@ -78,8 +82,10 @@ public class PooledConnection implements Connection {
       if (myCon != null) {
         myCon.destroy();
         connection = null;
+        result = true;
       }
     }
+    return result;
   }
 
   /**
@@ -202,7 +208,10 @@ public class PooledConnection implements Connection {
     return true;
   }
 
-  public void activate() {
+  /**
+   * @return true if connection activated, false if could not be activated because it is destroyed
+   */
+  public boolean activate() {
     synchronized (this) {
       try {
         while (this.waitingToSwitch) {
@@ -211,14 +220,14 @@ public class PooledConnection implements Connection {
       } catch (InterruptedException ex) {
         Thread.currentThread().interrupt();
       }
-      getConnection(); // it checks if we are destroyed
+      if (isDestroyed() || shouldDestroy()) {
+        return false;
+      }
       if (active) {
         throw new InternalGemFireException("Connection already active");
       }
-      if (shouldDestroy()) {
-        throw new ConnectionDestroyedException();
-      }
       active = true;
+      return true;
     }
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
index 098cf53..4b5b74d 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/OpExecutorImplJUnitTest.java
@@ -589,7 +589,8 @@ public class OpExecutorImplJUnitTest {
     public void start(ScheduledExecutorService backgroundProcessor) {}
 
     @Override
-    public Connection exchangeConnection(Connection conn, Set excludedServers, long aquireTimeout) {
+    public Connection exchangeConnection(Connection conn, Set<ServerLocation> excludedServers,
+        long aquireTimeout) {
       if (excludedServers.size() >= numServers) {
         throw new NoAvailableServersException();
       }
@@ -608,7 +609,9 @@ public class OpExecutorImplJUnitTest {
     }
 
     @Override
-    public void activate(Connection conn) {}
+    public boolean activate(Connection conn) {
+      return true;
+    }
 
     @Override
     public void passivate(Connection conn, boolean accessed) {}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManagerTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManagerTest.java
new file mode 100644
index 0000000..1cc1c98
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/AvailableConnectionManagerTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.cache.client.internal.pooling;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+public class AvailableConnectionManagerTest {
+
+  private final AvailableConnectionManager instance = new AvailableConnectionManager();
+
+  private PooledConnection createConnection() {
+    PooledConnection result = mock(PooledConnection.class);
+    when(result.activate()).thenReturn(true);
+    when(result.isActive()).thenReturn(true);
+    return result;
+  }
+
+  @Test
+  public void useFirstReturnsNullGivenEmptyManager() {
+    instance.getDeque().clear();
+
+    PooledConnection result = instance.useFirst();
+
+    assertThat(result).isNull();
+  }
+
+  @Test
+  public void useFirstReturnsExpectedConnectionGivenManagerWithOneItem() {
+    PooledConnection expected = createConnection();
+    instance.getDeque().addFirst(expected);
+
+    PooledConnection result = instance.useFirst();
+
+    assertThat(result).isSameAs(expected);
+    assertThat(instance.getDeque()).isEmpty();
+    verify(expected).activate();
+  }
+
+  @Test
+  public void useFirstReturnsNullGivenManagerWithOneItemThatCantBeActivated() {
+    PooledConnection expected = createConnection();
+    when(expected.activate()).thenReturn(false);
+    instance.getDeque().addFirst(expected);
+
+    PooledConnection result = instance.useFirst();
+
+    assertThat(result).isNull();
+    assertThat(instance.getDeque()).isEmpty();
+    verify(expected).activate();
+  }
+
+  @Test
+  public void useFirstWithPredicateReturnsNullGivenEmptyManager() {
+    instance.getDeque().clear();
+
+    PooledConnection result = instance.useFirst(c -> true);
+
+    assertThat(result).isNull();
+  }
+
+  @Test
+  public void useFirstWithPredicateReturnsExpectedGivenManagerWithOneItem() {
+    PooledConnection expected = createConnection();
+    instance.getDeque().addFirst(expected);
+
+    PooledConnection result = instance.useFirst(c -> c == expected);
+
+    assertThat(result).isSameAs(expected);
+    assertThat(instance.getDeque()).isEmpty();
+    verify(expected).activate();
+  }
+
+  @Test
+  public void useFirstWithPredicateReturnsNullGivenManagerWithOneItemThatDoesNotMatch() {
+    PooledConnection expected = createConnection();
+    instance.getDeque().addFirst(expected);
+
+    PooledConnection result = instance.useFirst(c -> false);
+
+    assertThat(result).isNull();
+    assertThat(instance.getDeque()).hasSize(1);
+    verify(expected, never()).activate();
+  }
+
+  @Test
+  public void useFirstWithPredicateReturnsNullGivenManagerWithOneItemThatCantBeActivated() {
+    PooledConnection expected = createConnection();
+    when(expected.activate()).thenReturn(false);
+    instance.getDeque().addFirst(expected);
+
+    PooledConnection result = instance.useFirst(c -> c == expected);
+
+    assertThat(result).isNull();
+    assertThat(instance.getDeque()).isEmpty();
+    verify(expected).activate();
+  }
+
+  @Test
+  public void removeReturnsFalseGivenConnectionNotInManager() {
+    instance.getDeque().clear();
+
+    boolean result = instance.remove(createConnection());
+
+    assertThat(result).isFalse();
+  }
+
+  @Test
+  public void removeReturnsTrueGivenConnectionInManager() {
+    PooledConnection connection = createConnection();
+    instance.getDeque().addFirst(connection);
+
+    boolean result = instance.remove(connection);
+
+    assertThat(result).isTrue();
+  }
+
+  @Test
+  public void removeEmptiesDequeGivenConnectionInManager() {
+    PooledConnection connection = createConnection();
+    instance.getDeque().addFirst(connection);
+
+    instance.remove(connection);
+
+    assertThat(instance.getDeque()).isEmpty();
+  }
+
+  @Test
+  public void addFirstWithTrueAddsActiveConnectionToManager() {
+    PooledConnection connection = createConnection();
+
+    instance.addFirst(connection, true);
+
+    assertThat(instance.getDeque()).hasSize(1);
+    verify(connection).isActive();
+    verify(connection).passivate(true);
+  }
+
+  @Test
+  public void addFirstWithFalseAddsActiveConnectionToManager() {
+    PooledConnection connection = createConnection();
+
+    instance.addFirst(connection, false);
+
+    assertThat(instance.getDeque()).hasSize(1);
+    verify(connection).isActive();
+    verify(connection).passivate(false);
+  }
+
+  @Test
+  public void addFirstAddsInactiveConnectionToManager() {
+    PooledConnection connection = createConnection();
+    when(connection.isActive()).thenReturn(false);
+
+    instance.addFirst(connection, true);
+
+    assertThat(instance.getDeque()).hasSize(1);
+    verify(connection).isActive();
+    verify(connection, never()).passivate(anyBoolean());
+  }
+
+
+  @Test
+  public void addLastWithTrueAddsActiveConnectionToManager() {
+    PooledConnection connection = createConnection();
+
+    instance.addLast(connection, true);
+
+    assertThat(instance.getDeque()).hasSize(1);
+    verify(connection).isActive();
+    verify(connection).passivate(true);
+  }
+
+  @Test
+  public void addLastWithFalseAddsActiveConnectionToManager() {
+    PooledConnection connection = createConnection();
+
+    instance.addLast(connection, false);
+
+    assertThat(instance.getDeque()).hasSize(1);
+    verify(connection).isActive();
+    verify(connection).passivate(false);
+  }
+
+  @Test
+  public void addLastAddsInactiveConnectionToManager() {
+    PooledConnection connection = createConnection();
+    when(connection.isActive()).thenReturn(false);
+
+    instance.addLast(connection, true);
+
+    assertThat(instance.getDeque()).hasSize(1);
+    verify(connection).isActive();
+    verify(connection, never()).passivate(anyBoolean());
+  }
+
+  @Test
+  public void addFirstTakesPrecedenceOverAddLast() {
+    PooledConnection expected = createConnection();
+
+    instance.addLast(createConnection(), true);
+    instance.addFirst(expected, true);
+    instance.addLast(createConnection(), true);
+    PooledConnection connection = instance.useFirst();
+
+    assertThat(instance.getDeque()).hasSize(2);
+    assertThat(connection).isSameAs(expected);
+  }
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccountingTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccountingTest.java
new file mode 100644
index 0000000..35bf3dc
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/pooling/ConnectionAccountingTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.client.internal.pooling;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+public class ConnectionAccountingTest {
+  @Test
+  public void constructorSetsMinMax() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    assertThat(a.getMinimum()).isEqualTo(1);
+    assertThat(a.getMaximum()).isEqualTo(2);
+    assertThat(a.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void canPrefillWhenUnderMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    assertThat(a.tryPrefill()).isTrue();
+    assertThat(a.getCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void cantPrefillWhenAtMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    assertThat(a.tryPrefill()).isFalse();
+    assertThat(a.getCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void cantPrefillWhenAboveMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    a.create();
+    a.create();
+    assertThat(a.getCount()).isEqualTo(2);
+
+    assertThat(a.tryPrefill()).isFalse();
+    assertThat(a.getCount()).isEqualTo(2);
+  }
+
+  @Test
+  public void cancelPrefillDecrements() {
+    ConnectionAccounting a = new ConnectionAccounting(2, 3);
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    assertThat(a.tryPrefill()).isTrue();
+    assertThat(a.getCount()).isEqualTo(2);
+
+    a.cancelTryPrefill();
+    assertThat(a.getCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void canCreateWhenUnderMax() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 1);
+    assertThat(a.getCount()).isEqualTo(0);
+
+    assertThat(a.tryCreate()).isTrue();
+    assertThat(a.getCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void cantCreateWhenAtMax() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 1);
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    assertThat(a.tryCreate()).isFalse();
+    assertThat(a.getCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void createRegardlessOfMax() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 1);
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    a.create();
+    assertThat(a.getCount()).isEqualTo(2);
+    assertThat(a.getMaximum()).isEqualTo(1);
+  }
+
+  @Test
+  public void cancelCreateDecrementsCount() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 1);
+    a.tryCreate();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    a.cancelTryCreate();
+    assertThat(a.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void tryDestroyDestroysAConnectionOverMax() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 1);
+    a.create();
+    a.create();
+    assertThat(a.getCount()).isEqualTo(2);
+
+    assertThat(a.tryDestroy()).isTrue();
+  }
+
+  @Test
+  public void tryDoesNotDestroyAtMax() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 1);
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    assertThat(a.tryDestroy()).isFalse();
+  }
+
+  @Test
+  public void cancelTryDestroyIncrementsCount() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 1);
+    a.create();
+    a.create();
+    a.tryDestroy();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    a.cancelTryDestroy();
+    assertThat(a.getCount()).isEqualTo(2);
+  }
+
+  @Test
+  public void destroyAndIsUnderMinimumReturnsTrueGoingBelowMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+
+    assertThat(a.destroyAndIsUnderMinimum(1)).isTrue();
+    assertThat(a.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void destroyAndIsUnderMinimumReturnsFalseGoingToMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    a.create();
+    a.create();
+    assertThat(a.getCount()).isEqualTo(2);
+
+    assertThat(a.destroyAndIsUnderMinimum(1)).isFalse();
+    assertThat(a.getCount()).isEqualTo(1);
+  }
+
+  @Test
+  public void destroyAndIsUnderMinimumReturnsFalseStayingAboveMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    a.create();
+    a.create();
+    a.create();
+    assertThat(a.getCount()).isEqualTo(3);
+
+    assertThat(a.destroyAndIsUnderMinimum(1)).isFalse();
+    assertThat(a.getCount()).isEqualTo(2);
+  }
+
+  @Test
+  public void destroyAndIsUnderMinimumDecrementsByMultiple() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    a.create();
+    a.create();
+    a.create();
+    assertThat(a.getCount()).isEqualTo(3);
+
+    a.destroyAndIsUnderMinimum(3);
+    assertThat(a.getCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void isUnderMinTrueWhenUnderMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    assertThat(a.isUnderMinimum()).isTrue();
+  }
+
+  @Test
+  public void isUnderMinFalseWhenAtOrOverMin() {
+    ConnectionAccounting a = new ConnectionAccounting(0, 2);
+    assertThat(a.isUnderMinimum()).isFalse();
+
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+    assertThat(a.isUnderMinimum()).isFalse();
+  }
+
+  @Test
+  public void isOverMinFalseWhenUnderOrAtMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    assertThat(a.isOverMinimum()).isFalse();
+
+    a.create();
+    assertThat(a.getCount()).isEqualTo(1);
+    assertThat(a.isOverMinimum()).isFalse();
+  }
+
+  @Test
+  public void isOverMinTrueWhenOverMin() {
+    ConnectionAccounting a = new ConnectionAccounting(1, 2);
+    a.create();
+    a.create();
+    assertThat(a.getCount()).isEqualTo(2);
+
+    assertThat(a.isOverMinimum()).isTrue();
+  }
+}


Mime
View raw message