accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [accumulo] branch master updated: Give ThriftTransportPool higher concurrent granularity (#1374)
Date Mon, 02 Dec 2019 22:10:53 GMT
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e5182b  Give ThriftTransportPool higher concurrent granularity (#1374)
2e5182b is described below

commit 2e5182b131224c5b4fd692b61ee065b55c43bd5e
Author: Laura Schanno <lbschanno@gmail.com>
AuthorDate: Mon Dec 2 17:10:43 2019 -0500

    Give ThriftTransportPool higher concurrent granularity (#1374)
    
    Instead of locking the entire cache when performing an operation for a
    single ThriftTransportKey, make it possible to lock access to the cached
    connections for the key in question without blocking operations for
    other keys.
    
    In addition, refactor sections of the class to improve the clarity of
    intent for operations that obtain locks individually for each transport
    key, and add documentation to this effect.
    
    Work for #1122
---
 .../core/clientImpl/ThriftTransportPool.java       | 535 ++++++++++++++-------
 .../java/org/apache/accumulo/core/util/Once.java   |  41 ++
 2 files changed, 407 insertions(+), 169 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
index 29da40d..78ef155 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java
@@ -30,12 +30,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.singletons.SingletonManager;
 import org.apache.accumulo.core.singletons.SingletonService;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Once;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -44,7 +50,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
 public class ThriftTransportPool {
 
@@ -56,7 +63,6 @@ public class ThriftTransportPool {
     Map<CachedTTransport,CachedConnection> reserved = new HashMap<>();
 
     public CachedConnection reserveAny() {
-
       CachedConnection cachedConnection = unreserved.poll(); // safe pop
       if (cachedConnection != null) {
         cachedConnection.reserve();
@@ -67,14 +73,282 @@ public class ThriftTransportPool {
       }
       return cachedConnection;
     }
+
+    private void removeExpiredConnections(final ArrayList<CachedConnection> expired,
+        final long killTime) {
+      long currTime = System.currentTimeMillis();
+      while (isLastUnreservedExpired(currTime, killTime)) {
+        expired.add(unreserved.removeLast());
+      }
+    }
+
+    boolean isLastUnreservedExpired(final long currTime, final long killTime) {
+      return !unreserved.isEmpty() && (currTime - unreserved.peekLast().lastReturnTime)
> killTime;
+    }
+
+    void checkReservedForStuckIO() {
+      reserved.values().forEach(c -> c.transport.checkForStuckIO(STUCK_THRESHOLD));
+    }
+
+    void closeAllTransports() {
+      closeTransports(unreserved);
+      closeTransports(reserved.values());
+    }
+
+    void closeTransports(final Iterable<CachedConnection> stream) {
+      stream.forEach((connection) -> {
+        try {
+          connection.transport.close();
+        } catch (Exception e) {
+          log.debug("Error closing transport during shutdown", e);
+        }
+      });
+    }
+
+    CachedConnection removeReserved(CachedTTransport transport) {
+      return reserved.remove(transport);
+    }
   }
 
-  private Map<ThriftTransportKey,CachedConnections> cache = new HashMap<>();
+  private static class ConnectionPool {
+    final Lock[] locks;
+    final ConcurrentHashMap<ThriftTransportKey,CachedConnections> connections =
+        new ConcurrentHashMap<>();
+    private volatile boolean shutdown = false;
+
+    ConnectionPool() {
+      // intentionally using a prime number, don't use 31
+      locks = new Lock[37];
+      for (int i = 0; i < locks.length; i++) {
+        locks[i] = new ReentrantLock();
+      }
+    }
+
+    Set<ThriftTransportKey> getThriftTransportKeys() {
+      return connections.keySet();
+    }
+
+    /**
+     * Reserve and return a new {@link CachedConnection} from the {@link CachedConnections}
mapped
+     * to the specified transport key. If a {@link CachedConnections} is not found, one will
be
+     * created.
+     *
+     * <p>
+     *
+     * This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
+     * until the operation completes.
+     *
+     * @param key
+     *          the transport key
+     * @return the reserved {@link CachedConnection}
+     */
+    CachedConnection reserveAny(final ThriftTransportKey key) {
+      // It's possible that multiple locks from executeWithinLock will overlap with a single
lock
+      // inside the ConcurrentHashMap which can unnecessarily block threads. Access the
+      // ConcurrentHashMap outside of executeWithinLock to prevent this.
+      var connections = getOrCreateCachedConnections(key);
+      return executeWithinLock(key, connections::reserveAny);
+    }
+
+    /**
+     * Reserve and return a new {@link CachedConnection} from the {@link CachedConnections}
mapped
+     * to the specified transport key. If a {@link CachedConnections} is not found, null
will be
+     * returned.
+     *
+     * <p>
+     *
+     * This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
+     * until the operation completes.
+     *
+     * @param key
+     *          the transport key
+     * @return the reserved {@link CachedConnection}, or null if none were available.
+     */
+    CachedConnection reserveAnyIfPresent(final ThriftTransportKey key) {
+      // It's possible that multiple locks from executeWithinLock will overlap with a single
lock
+      // inside the ConcurrentHashMap which can unnecessarily block threads. Access the
+      // ConcurrentHashMap outside of executeWithinLock to prevent this.
+      var connections = getCachedConnections(key);
+      return connections == null ? null : executeWithinLock(key, connections::reserveAny);
+    }
+
+    /**
+     * Puts the specified connection into the reserved map of the {@link CachedConnections}
for the
+     * specified transport key. If a {@link CachedConnections} is not found, one will be
created.
+     *
+     * <p>
+     *
+     * This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
+     * until the operation completes.
+     *
+     * @param key
+     *          the transport key
+     * @param connection
+     *          the reserved connection
+     */
+    void putReserved(final ThriftTransportKey key, final CachedConnection connection) {
+      // It's possible that multiple locks from executeWithinLock will overlap with a single
lock
+      // inside the ConcurrentHashMap which can unnecessarily block threads. Access the
+      // ConcurrentHashMap outside of executeWithinLock to prevent this.
+      var connections = getOrCreateCachedConnections(key);
+      executeWithinLock(key, () -> connections.reserved.put(connection.transport, connection));
+    }
+
+    /**
+     * Returns the connection for the specified transport back to the queue of unreserved
+     * connections for the {@link CachedConnections} for the specified transport's key. If
a
+     * {@link CachedConnections} is not found, one will be created. If the transport saw
an error,
+     * the connection for the transport will be unreserved, and it and all other unreserved
+     * connections will be added to the specified toBeClosed list, and the connections' unreserved
+     * list will be cleared.
+     *
+     * <p>
+     *
+     * This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
+     * until the operation completes.
+     *
+     * @param transport
+     *          the transport
+     * @param toBeClosed
+     *          the list to add connections that must be closed after this operation finishes
+     * @return true if the connection for the transport existed and was initially reserved,
or false
+     *         otherwise
+     */
+    boolean returnTransport(final CachedTTransport transport,
+        final List<CachedConnection> toBeClosed) {
+      // It's possible that multiple locks from executeWithinLock will overlap with a single
lock
+      // inside the ConcurrentHashMap which can unnecessarily block threads. Access the
+      // ConcurrentHashMap outside of executeWithinLock to prevent this.
+      var connections = getOrCreateCachedConnections(transport.getCacheKey());
+      return executeWithinLock(transport.getCacheKey(),
+          () -> unreserveConnection(transport, connections, toBeClosed)); // inline
+    }
+
+    @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK",
+        justification = "FindBugs doesn't recognize that all locks in ConnectionPool.locks
are subsequently unlocked in the try-finally in ConnectionPool.shutdown()")
+    void shutdown() {
+      // Obtain all locks.
+      for (Lock lock : locks) {
+        lock.lock();
+      }
+
+      // All locks are now acquired, so nothing else should be able to run concurrently...
+      try {
+        // Check if an shutdown has already been initiated.
+        if (shutdown) {
+          return;
+        }
+        shutdown = true;
+        connections.values().forEach(CachedConnections::closeAllTransports);
+      } finally {
+        for (Lock lock : locks) {
+          lock.unlock();
+        }
+      }
+    }
+
+    <T> T executeWithinLock(final ThriftTransportKey key, Supplier<T> function)
{
+      Lock lock = getLock(key);
+      try {
+        return function.get();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    void executeWithinLock(final ThriftTransportKey key, Consumer<ThriftTransportKey>
consumer) {
+      Lock lock = getLock(key);
+      try {
+        consumer.accept(key);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    Lock getLock(final ThriftTransportKey key) {
+      Lock lock = locks[(key.hashCode() & Integer.MAX_VALUE) % locks.length];
+
+      lock.lock();
+
+      if (shutdown) {
+        lock.unlock();
+        throw new TransportPoolShutdownException(
+            "The Accumulo singleton for connection pooling is disabled.  This is likely caused
by "
+                + "all AccumuloClients being closed or garbage collected.");
+      }
+
+      return lock;
+    }
+
+    CachedConnections getCachedConnections(final ThriftTransportKey key) {
+      return connections.get(key);
+    }
+
+    CachedConnections getOrCreateCachedConnections(final ThriftTransportKey key) {
+      return connections.computeIfAbsent(key, k -> new CachedConnections());
+    }
+
+    boolean unreserveConnection(final CachedTTransport transport,
+        final CachedConnections connections, final List<CachedConnection> toBeClosed)
{
+      if (connections != null) {
+        CachedConnection connection = connections.removeReserved(transport);
+        if (connection != null) {
+          if (transport.sawError) {
+            unreserveConnectionAndClearUnreserved(connections, connection, toBeClosed);
+          } else {
+            returnConnectionToUnreserved(connections, connection);
+          }
+          return true;
+        }
+      }
+      return false;
+    }
+
+    void unreserveConnectionAndClearUnreserved(final CachedConnections connections,
+        final CachedConnection connection, final List<CachedConnection> toBeClosed)
{
+      toBeClosed.add(connection);
+      connection.unreserve();
+      // Remove all unreserved cached connection when a sever has an error, not just the
+      // connection that was returned.
+      toBeClosed.addAll(connections.unreserved);
+      connections.unreserved.clear();
+    }
+
+    void returnConnectionToUnreserved(final CachedConnections connections,
+        final CachedConnection connection) {
+      log.trace("Returned connection {} ioCount: {}", connection.transport.getCacheKey(),
+          connection.transport.ioCount);
+      connection.lastReturnTime = System.currentTimeMillis();
+      connection.unreserve();
+      // Using LIFO ensures that when the number of pooled connections exceeds the working
+      // set size that the idle times at the end of the list grow. The connections with
+      // large idle times will be cleaned up. Using a FIFO could continually reset the idle
+      // times of all connections, even when there are more than the working set size.
+      connections.unreserved.push(connection);
+    }
+
+    List<CachedConnection> removeExpiredConnections(final long killTime) {
+      ArrayList<CachedConnection> expired = new ArrayList<>();
+      for (Entry<ThriftTransportKey,CachedConnections> entry : connections.entrySet())
{
+        CachedConnections connections = entry.getValue();
+        executeWithinLock(entry.getKey(), (key) -> {
+          connections.removeExpiredConnections(expired, killTime);
+          connections.checkReservedForStuckIO();
+        });
+      }
+      return expired;
+    }
+  }
+
+  private final ConnectionPool connectionPool = new ConnectionPool();
+
   private Map<ThriftTransportKey,Long> errorCount = new HashMap<>();
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
 
-  private Thread checkThread;
+  private Once checkStarter = new Once(() -> {
+    new Daemon(new Closer(instance), "Thrift Connection Pool Checker").start();
+  });
 
   private static final Logger log = LoggerFactory.getLogger(ThriftTransportPool.class);
 
@@ -119,42 +393,7 @@ public class ThriftTransportPool {
 
     private void closeConnections() throws InterruptedException {
       while (true) {
-
-        ArrayList<CachedConnection> connectionsToClose = new ArrayList<>();
-
-        synchronized (pool) {
-          for (CachedConnections cachedConns : pool.getCache().values()) {
-            Deque<CachedConnection> unres = cachedConns.unreserved;
-
-            long currTime = System.currentTimeMillis();
-
-            // The following code is structured to avoid removing from the middle of the
array
-            // deqeue which would be costly. It also assumes the oldest are at the end.
-            while (!unres.isEmpty() && currTime - unres.peekLast().lastReturnTime
> pool.killTime) {
-              connectionsToClose.add(unres.removeLast());
-            }
-
-            for (CachedConnection cachedConnection : cachedConns.reserved.values()) {
-              cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
-            }
-          }
-
-          Iterator<Entry<ThriftTransportKey,Long>> iter = pool.errorTime.entrySet().iterator();
-          while (iter.hasNext()) {
-            Entry<ThriftTransportKey,Long> entry = iter.next();
-            long delta = System.currentTimeMillis() - entry.getValue();
-            if (delta >= STUCK_THRESHOLD) {
-              pool.errorCount.remove(entry.getKey());
-              iter.remove();
-            }
-          }
-        }
-
-        // close connections outside of sync block
-        for (CachedConnection cachedConnection : connectionsToClose) {
-          cachedConnection.transport.close();
-        }
-
+        pool.closeExpiredConnections();
         Thread.sleep(500);
       }
     }
@@ -415,17 +654,16 @@ public class ThriftTransportPool {
   private TTransport getTransport(ThriftTransportKey cacheKey) throws TTransportException
{
     // compute hash code outside of lock, this lowers the time the lock is held
     cacheKey.precomputeHashCode();
-    synchronized (this) {
-      // atomically reserve location if it exist in cache
-      CachedConnection cachedConnection =
-          getCache().computeIfAbsent(cacheKey, ck -> new CachedConnections()).reserveAny();
-      if (cachedConnection != null) {
-        log.trace("Using existing connection to {}", cacheKey.getServer());
-        return cachedConnection.transport;
-      }
-    }
 
-    return createNewTransport(cacheKey);
+    ConnectionPool pool = getConnectionPool();
+    CachedConnection connection = pool.reserveAny(cacheKey);
+
+    if (connection != null) {
+      log.trace("Using existing connection to {}", cacheKey.getServer());
+      return connection.transport;
+    } else {
+      return createNewTransport(cacheKey);
+    }
   }
 
   @VisibleForTesting
@@ -437,42 +675,38 @@ public class ThriftTransportPool {
     if (preferCachedConnection) {
       HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers);
 
-      synchronized (this) {
+      ConnectionPool pool = getConnectionPool();
 
-        // randomly pick a server from the connection cache
-        serversSet.retainAll(getCache().keySet());
+      // randomly pick a server from the connection cache
+      serversSet.retainAll(pool.getThriftTransportKeys());
 
-        if (serversSet.size() > 0) {
-          ArrayList<ThriftTransportKey> cachedServers = new ArrayList<>(serversSet);
-          Collections.shuffle(cachedServers, random);
+      if (serversSet.size() > 0) {
+        ArrayList<ThriftTransportKey> cachedServers = new ArrayList<>(serversSet);
+        Collections.shuffle(cachedServers, random);
 
-          for (ThriftTransportKey ttk : cachedServers) {
-            CachedConnection cachedConnection = getCache().get(ttk).reserveAny();
-            if (cachedConnection != null) {
-              final String serverAddr = ttk.getServer().toString();
-              log.trace("Using existing connection to {}", serverAddr);
-              return new Pair<>(serverAddr, cachedConnection.transport);
-            }
+        for (ThriftTransportKey ttk : cachedServers) {
+          CachedConnection connection = pool.reserveAny(ttk);
+          if (connection != null) {
+            final String serverAddr = ttk.getServer().toString();
+            log.trace("Using existing connection to {}", serverAddr);
+            return new Pair<>(serverAddr, connection.transport);
           }
+
         }
       }
     }
 
+    ConnectionPool pool = getConnectionPool();
     int retryCount = 0;
     while (servers.size() > 0 && retryCount < 10) {
+
       int index = random.nextInt(servers.size());
       ThriftTransportKey ttk = servers.get(index);
 
       if (preferCachedConnection) {
-        synchronized (this) {
-          CachedConnections cachedConns = getCache().get(ttk);
-          if (cachedConns != null) {
-            CachedConnection cachedConnection = cachedConns.reserveAny();
-            if (cachedConnection != null) {
-              final String serverAddr = ttk.getServer().toString();
-              return new Pair<>(serverAddr, cachedConnection.transport);
-            }
-          }
+        CachedConnection connection = pool.reserveAnyIfPresent(ttk);
+        if (connection != null) {
+          return new Pair<>(ttk.getServer().toString(), connection.transport);
         }
       }
 
@@ -496,91 +730,69 @@ public class ThriftTransportPool {
 
     CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
 
-    CachedConnection cc = new CachedConnection(tsc);
-    cc.reserve();
+    CachedConnection connection = new CachedConnection(tsc);
+    connection.reserve();
 
     try {
-      synchronized (this) {
-        CachedConnections cachedConns =
-            getCache().computeIfAbsent(cacheKey, ck -> new CachedConnections());
-        cachedConns.reserved.put(cc.transport, cc);
-      }
+      ConnectionPool pool = getConnectionPool();
+      pool.putReserved(cacheKey, connection);
     } catch (TransportPoolShutdownException e) {
-      cc.transport.close();
+      connection.transport.close();
       throw e;
     }
-    return cc.transport;
+
+    return connection.transport;
   }
 
-  public void returnTransport(TTransport tsc) {
-    if (tsc == null) {
+  public void returnTransport(TTransport transport) {
+    if (transport == null) {
       return;
     }
 
-    boolean existInCache = false;
-    CachedTTransport ctsc = (CachedTTransport) tsc;
-
+    CachedTTransport cachedTransport = (CachedTTransport) transport;
     ArrayList<CachedConnection> closeList = new ArrayList<>();
+    ConnectionPool pool = getConnectionPool();
+    boolean existInCache = pool.returnTransport(cachedTransport, closeList);
 
-    synchronized (this) {
-      CachedConnections cachedConns = getCache().get(ctsc.getCacheKey());
-      if (cachedConns != null) {
-        CachedConnection cachedConnection = cachedConns.reserved.remove(ctsc);
-        if (cachedConnection != null) {
-          if (ctsc.sawError) {
-            closeList.add(cachedConnection);
-
-            log.trace("Returned connection had error {}", ctsc.getCacheKey());
+    // close outside of sync block
+    closeList.forEach((connection) -> {
+      try {
+        connection.transport.close();
+      } catch (Exception e) {
+        log.debug("Failed to close connection w/ errors", e);
+      }
+    });
 
-            Long ecount = errorCount.merge(ctsc.getCacheKey(), 1L, Long::sum);
+    if (cachedTransport.sawError) {
 
-            // logs the first time an error occurred
-            errorTime.computeIfAbsent(ctsc.getCacheKey(), k -> System.currentTimeMillis());
+      boolean shouldWarn = false;
+      Long ecount = null;
 
-            if (ecount >= ERROR_THRESHOLD && serversWarnedAbout.add(ctsc.getCacheKey()))
{
-              log.warn(
-                  "Server {} had {} failures in a short time period, will not complain anymore",
-                  ctsc.getCacheKey(), ecount);
-            }
+      synchronized (errorCount) {
 
-            cachedConnection.unreserve();
+        ecount = errorCount.merge(cachedTransport.getCacheKey(), 1L, Long::sum);
 
-            // remove all unreserved cached connection when a sever has an error, not just
the
-            // connection that was returned
-            closeList.addAll(cachedConns.unreserved);
-            cachedConns.unreserved.clear();
+        // logs the first time an error occurred
+        errorTime.computeIfAbsent(cachedTransport.getCacheKey(), k -> System.currentTimeMillis());
 
-          } else {
-            log.trace("Returned connection {} ioCount: {}", ctsc.getCacheKey(),
-                cachedConnection.transport.ioCount);
-
-            cachedConnection.lastReturnTime = System.currentTimeMillis();
-            cachedConnection.unreserve();
-            // Using LIFO ensures that when the #
-            // of pooled connections exceeds the working set size that the
-            // idle times at the end of the list grow. The connections with large idle times
will be
-            // cleaned up. Using a FIFO could continually reset the idle
-            // times of all connections, even when there are more than the working set size.
-            cachedConns.unreserved.push(cachedConnection);
-          }
-          existInCache = true;
+        if (ecount >= ERROR_THRESHOLD && serversWarnedAbout.add(cachedTransport.getCacheKey()))
{
+          // boolean facilitates logging outside of lock
+          shouldWarn = true;
         }
       }
-    }
 
-    // close outside of sync block
-    for (CachedConnection cachedConnection : closeList) {
-      try {
-        cachedConnection.transport.close();
-      } catch (Exception e) {
-        log.debug("Failed to close connection w/ errors", e);
+      log.trace("Returned connection had error {}", cachedTransport.getCacheKey());
+
+      if (shouldWarn) {
+        log.warn("Server {} had {} failures in a short time period, will not complain anymore",
+            cachedTransport.getCacheKey(), ecount);
       }
     }
 
     if (!existInCache) {
       log.warn("Returned tablet server connection to cache that did not come from cache");
       // close outside of sync block
-      tsc.close();
+      transport.close();
     }
   }
 
@@ -644,52 +856,37 @@ public class ThriftTransportPool {
     }
   }
 
-  public synchronized void startCheckerThread() {
-    if (cache != null && checkThread == null) {
-      checkThread = new Daemon(new Closer(instance), "Thrift Connection Pool Checker");
-      checkThread.start();
-    }
+  public void startCheckerThread() {
+    checkStarter.run();
   }
 
-  private void shutdown() {
-    Thread ctl;
-    synchronized (this) {
-      if (cache == null)
-        return;
-
-      // close any connections in the pool... even ones that are in use
-      for (CachedConnections cachedConn : getCache().values()) {
-        for (CachedConnection cc : Iterables.concat(cachedConn.reserved.values(),
-            cachedConn.unreserved)) {
-          try {
-            cc.transport.close();
-          } catch (Exception e) {
-            log.debug("Error closing transport during shutdown", e);
-          }
-        }
-      }
+  void closeExpiredConnections() {
+    List<CachedConnection> expiredConnections;
 
-      // this will render the pool unusable and cause the background thread to exit
-      this.cache = null;
+    ConnectionPool pool = getConnectionPool();
+    expiredConnections = pool.removeExpiredConnections(killTime);
 
-      ctl = checkThread;
-    }
-
-    if (ctl != null) {
-      try {
-        ctl.interrupt();
-        ctl.join();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+    synchronized (errorCount) {
+      Iterator<Entry<ThriftTransportKey,Long>> iter = errorTime.entrySet().iterator();
+      while (iter.hasNext()) {
+        Entry<ThriftTransportKey,Long> entry = iter.next();
+        long delta = System.currentTimeMillis() - entry.getValue();
+        if (delta >= STUCK_THRESHOLD) {
+          errorCount.remove(entry.getKey());
+          iter.remove();
+        }
       }
     }
+
+    // Close connections outside of sync block
+    expiredConnections.forEach((c) -> c.transport.close());
+  }
+
+  private void shutdown() {
+    connectionPool.shutdown();
   }
 
-  private Map<ThriftTransportKey,CachedConnections> getCache() {
-    if (cache == null)
-      throw new TransportPoolShutdownException(
-          "The Accumulo singleton for connection pooling is disabled.  This is likely caused
by "
-              + "all AccumuloClients being closed or garbage collected.");
-    return cache;
+  private ConnectionPool getConnectionPool() {
+    return connectionPool;
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Once.java b/core/src/main/java/org/apache/accumulo/core/util/Once.java
new file mode 100644
index 0000000..ffba095
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/Once.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class Once implements Runnable {
+
+  private final AtomicBoolean hasRun = new AtomicBoolean(false);
+  private final Runnable action;
+
+  public Once(Runnable action) {
+    this.action = action;
+  }
+
+  @Override
+  public void run() {
+    if (hasRun.get())
+      return;
+
+    if (hasRun.compareAndSet(false, true)) {
+      action.run();
+    }
+  }
+}


Mime
View raw message