geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [2/2] incubator-geode git commit: refactored the async close code into a new SocketCloser class
Date Thu, 17 Sep 2015 23:47:59 GMT
refactored the async close code into a new SocketCloser class


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/16a6bc1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/16a6bc1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/16a6bc1a

Branch: refs/heads/feature/GEODE-332
Commit: 16a6bc1a60735fd94fae5bd5a180d5523b51617f
Parents: 4c320a4
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Thu Sep 17 16:41:07 2015 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Thu Sep 17 16:41:07 2015 -0700

----------------------------------------------------------------------
 .../gemstone/gemfire/internal/SocketCloser.java | 231 +++++++++++++++++++
 .../gemfire/internal/SocketCreator.java         | 156 -------------
 .../cache/tier/sockets/CacheClientNotifier.java |  10 +
 .../cache/tier/sockets/CacheClientProxy.java    |   6 +-
 .../gemfire/internal/tcp/Connection.java        |   5 +-
 .../gemfire/internal/tcp/ConnectionTable.java   |  15 +-
 6 files changed, 258 insertions(+), 165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/16a6bc1a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
new file mode 100644
index 0000000..04ca0d8
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCloser.java
@@ -0,0 +1,231 @@
+package com.gemstone.gemfire.internal;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
+
+/**
+ * This class allows sockets to be closed without blocking.
+ * In some cases we have seen a call of socket.close block for minutes.
+ * This class maintains a thread pool for every other member we have
+ * connected sockets to. Any request to close by default returns immediately
+ * to the caller while the close is called by a background thread.
+ * The requester can wait for a configured amount of time by setting
+ * the "p2p.ASYNC_CLOSE_WAIT_MILLISECONDS" system property.
+ * Idle threads that are not doing a close will timeout after 2 minutes.
+ * This can be configured by setting the
+ * "p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS" system property.
+ * A pool exists for each remote address that we have a socket connected to.
+ * That way if close is taking a long time to one address we can still get closes
+ * done to another address.
+ * Each address pool by default has at most 8 threads. This max threads can be
+ * configured using the "p2p.ASYNC_CLOSE_POOL_MAX_THREADS" system property.
+ */
+public class SocketCloser {
+  private static final Logger logger = LogService.getLogger();
+  /** Number of seconds to wait before timing out an unused async close thread. Default is
120 (2 minutes). */
+  private static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS",
120).longValue();
+  /** Maximum number of threads that can be doing a socket close. Any close requests over
this max will queue up waiting for a thread. */
+  private static final int ASYNC_CLOSE_POOL_MAX_THREADS = Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS",
8).intValue();
+  /** How many milliseconds the synchronous requester waits for the async close to happen.
Default is 0. Prior releases waited 50ms. */ 
+  private static final long ASYNC_CLOSE_WAIT_MILLISECONDS = Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS",
0).longValue();
+  
+
+  /** map of thread pools of async close threads */
+  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
+  private final long asyncClosePoolKeepAliveSeconds;
+  private final int asyncClosePoolMaxThreads;
+  private final long asyncCloseWaitMilliseconds;
+  private boolean closed;
+  
+  public SocketCloser() {
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS, ASYNC_CLOSE_WAIT_MILLISECONDS);
+  }
+  public SocketCloser(int asyncClosePoolMaxThreads) {
+    this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, asyncClosePoolMaxThreads, ASYNC_CLOSE_WAIT_MILLISECONDS);
+  }
+  public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads,
long asyncCloseWaitMilliseconds) {
+    this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
+    this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
+    this.asyncCloseWaitMilliseconds = asyncCloseWaitMilliseconds;
+  }
+
+  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
+    synchronized (asyncCloseExecutors) {
+      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
+      if (pool == null) {
+        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose",
logger);
+        ThreadFactory tf = new ThreadFactory() { 
+          public Thread newThread(final Runnable command) { 
+            Thread thread = new Thread(tg, command); 
+            thread.setDaemon(true);
+            return thread;
+          } 
+        }; 
+        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();

+        pool = new ThreadPoolExecutor(1, this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds,
TimeUnit.SECONDS, workQueue, tf);
+        asyncCloseExecutors.put(address, pool);
+      }
+      return pool;
+    }
+  }
+  /**
+   * Call this method if you know all the resources in the closer
+   * for the given address are no longer needed.
+   * Currently a thread pool is kept for each address and if you
+   * know that an address no longer needs its pool then you should
+   * call this method.
+   */
+  public void releaseResourcesForAddress(String address) {
+    synchronized (asyncCloseExecutors) {
+      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
+      if (pool != null) {
+        pool.shutdown();
+        asyncCloseExecutors.remove(address);
+      }
+    }
+  }
+  private boolean isClosed() {
+    synchronized (asyncCloseExecutors) {
+      return this.closed;
+    }
+  }
+  /**
+   * Call close when you are all done with your socket closer.
+   * If you call asyncClose after close is called then the
+   * asyncClose will be done synchronously.
+   */
+  public void close() {
+    synchronized (asyncCloseExecutors) {
+      if (!this.closed) {
+        this.closed = true;
+        for (ThreadPoolExecutor pool: asyncCloseExecutors.values()) {
+          pool.shutdownNow();
+        }
+        asyncCloseExecutors.clear();
+      }
+    }
+  }
+  private void asyncExecute(String address, Runnable r) {
+    // Waiting 50ms for the async close request to complete is what the old (close per thread)
+    // code did. But now that we will not create a thread for every close request
+    // it seems better to let the thread that requested the close to move on quickly.
+    // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
+    // can be set to how many milliseconds to wait.
+    if (this.asyncCloseWaitMilliseconds == 0) {
+      getAsyncThreadExecutor(address).execute(r);
+    } else {
+      Future<?> future = getAsyncThreadExecutor(address).submit(r);
+      try {
+        future.get(this.asyncCloseWaitMilliseconds, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        // We want this code to wait at most 50ms for the close to happen.
+        // It is ok to ignore these exception and let the close continue
+        // in the background.
+      }
+    }
+  }
+  /**
+   * Closes the specified socket in a background thread.
+   * In some cases we see close hang (see bug 33665).
+   * Depending on how the SocketCloser is configured (see ASYNC_CLOSE_WAIT_MILLISECONDS)
+   * this method may block for a certain amount of time.
+   * If it is called after the SocketCloser is closed then a normal
+   * synchronous close is done.
+   * @param sock the socket to close
+   * @param address identifies who the socket is connected to
+   * @param extra an optional Runnable with stuff to execute in the async thread
+   */
+  public void asyncClose(final Socket sock, final String address, final Runnable extra) {
+    if (sock == null || sock.isClosed()) {
+      return;
+    }
+    boolean doItInline = false;
+    try {
+      synchronized (asyncCloseExecutors) {
+        if (isClosed()) {
+          // this SocketCloser has been closed so do a synchronous, inline, close
+          doItInline = true;
+        } else {
+          asyncExecute(address, new Runnable() {
+            public void run() {
+              Thread.currentThread().setName("AsyncSocketCloser for " + address);
+              try {
+                if (extra != null) {
+                  extra.run();
+                }
+                inlineClose(sock);
+              } finally {
+                Thread.currentThread().setName("unused AsyncSocketCloser");
+              }
+            }
+          });
+        }
+      }
+    } catch (OutOfMemoryError ignore) {
+      // If we can't start a thread to close the socket just do it inline.
+      // See bug 50573.
+      doItInline = true;
+    }
+    if (doItInline) {
+      inlineClose(sock);
+    }
+  }
+  
+
+  /**
+   * Closes the specified socket
+   * @param sock the socket to close
+   */
+  private static void inlineClose(final Socket sock) {
+    // the next two statements are a mad attempt to fix bug
+    // 36041 - segv in jrockit in pthread signaling code.  This
+    // seems to alleviate the problem.
+    try {
+      sock.shutdownInput();
+      sock.shutdownOutput();
+    } catch (Exception e) {
+    }
+    try {
+      sock.close();
+    } catch (IOException ignore) {
+    } catch (VirtualMachineError err) {
+      SystemFailure.initiateFailure(err);
+      // If this ever returns, rethrow the error.  We're poisoned
+      // now, so don't let this thread continue.
+      throw err;
+    } catch (java.security.ProviderException pe) {
+      // some ssl implementations have trouble with termination and throw
+      // this exception.  See bug #40783
+    } catch (Error e) {
+      // Whenever you catch Error or Throwable, you must also
+      // catch VirtualMachineError (see above).  However, there is
+      // _still_ a possibility that you are dealing with a cascading
+      // error condition, so you also need to check to see if the JVM
+      // is still usable:
+      SystemFailure.checkFailure();
+      // Sun's NIO implementation has been known to throw Errors
+      // that are caused by IOExceptions.  If this is the case, it's
+      // okay.
+      if (e.getCause() instanceof IOException) {
+        // okay...
+      } else {
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/16a6bc1a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index e263ae4..940936f 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -75,20 +75,11 @@ import com.gemstone.gemfire.internal.cache.wan.TransportFilterServerSocket;
 import com.gemstone.gemfire.internal.cache.wan.TransportFilterSocketFactory;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.util.PasswordUtil;
 import com.gemstone.org.jgroups.util.ConnectionWatcher;
 
 import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import javax.net.ssl.*;
 
@@ -1205,153 +1196,6 @@ public class SocketCreator  implements com.gemstone.org.jgroups.util.SockCreator
     return (String[]) v.toArray( new String[ v.size() ] );
   }
   
-  /** map of thread pools of async close threads */
-  private static final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new
HashMap<>();
-  /** Number of seconds to wait before timing out an unused async close thread. Default is
120 (2 minutes). */
-  private final static long ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME",
120).longValue();
-  /** Maximum number of threads that can be doing a socket close. Any close requests over
this max will queue up waiting for a thread. */
-  private final static int ASYNC_CLOSE_POOL_MAX_THREADS = Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS",
8).intValue();
-  /** How many milliseconds the synchronous requester waits for the async close to happen.
Default is 0. Prior releases waited 50ms. */ 
-  private final static long ASYNC_CLOSE_WAIT_MILLISECONDS = Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS",
0).longValue();
-  
-  private static ThreadPoolExecutor getAsyncThreadExecutor(String who) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(who);
-      if (pool == null) {
-        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose",
logger);
-        ThreadFactory tf = new ThreadFactory() { 
-          public Thread newThread(final Runnable command) { 
-            Thread thread = new Thread(tg, command); 
-            thread.setDaemon(true);
-            return thread;
-          } 
-        }; 
-        BlockingQueue workQueue = new LinkedBlockingQueue(); 
-        pool = new ThreadPoolExecutor(1, ASYNC_CLOSE_POOL_MAX_THREADS, ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME,
TimeUnit.SECONDS, workQueue, tf);
-        asyncCloseExecutors.put(who, pool);
-      }
-      return pool;
-    }
-  }
-  public static void closeAsyncThreadExecutor(String who) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(who);
-      if (pool != null) {
-        pool.shutdown();
-        asyncCloseExecutors.remove(who);
-      }
-    }
-  }
-  public static void closeAsyncThreadExecutors() {
-    synchronized (asyncCloseExecutors) {
-      for (ThreadPoolExecutor pool: asyncCloseExecutors.values()) {
-        pool.shutdownNow();
-      }
-      asyncCloseExecutors.clear();
-    }
-  }
-  private static void asyncExecute(String who, Runnable r) {
-    // Waiting 50ms for the async close request to complete is what the old (close per thread)
-    // code did. But now that we will not create a thread for every close request
-    // it seems better to let the thread that requested the close to move on quickly.
-    // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
-    // can be set to how many milliseconds to wait.
-    if (ASYNC_CLOSE_WAIT_MILLISECONDS == 0) {
-      getAsyncThreadExecutor(who).execute(r);
-    } else {
-      Future future = getAsyncThreadExecutor(who).submit(r);
-      try {
-        future.get(ASYNC_CLOSE_WAIT_MILLISECONDS, TimeUnit.MILLISECONDS);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) {
-        // We want this code to wait at most 50ms for the close to happen.
-        // It is ok to ignore these exception and let the close continue
-        // in the background.
-      }
-    }
-  }
-  /**
-   * Closes the specified socket in a background thread and waits a limited
-   * amount of time for the close to complete. In some cases we see close
-   * hang (see bug 33665).
-   * Made public so it can be used from CacheClientProxy.
-   * @param sock the socket to close
-   * @param who who the socket is connected to
-   * @param extra an optional Runnable with stuff to execute in the async thread
-   */
-  public static void asyncClose(final Socket sock, final String who, final Runnable extra)
{
-    if (sock == null || sock.isClosed()) {
-      return;
-    }
-    try {
-      asyncExecute(who, new Runnable() {
-        public void run() {
-          Thread.currentThread().setName("AsyncSocketCloser for " + who);
-          try {
-          if (extra != null) {
-            extra.run();
-          }
-          inlineClose(sock);
-          } finally {
-            Thread.currentThread().setName("unused AsyncSocketCloser");
-          }
-        }
-      });
-    } catch (OutOfMemoryError ignore) {
-      // If we can't start a thread to close the socket just do it inline.
-      // See bug 50573.
-      inlineClose(sock);
-      return;
-    }
-  }
-  
-
-  /**
-   * Closes the specified socket
-   * @param sock the socket to close
-   */
-  public static void inlineClose(final Socket sock) {
-    
-    // the next two statements are a mad attempt to fix bug
-    // 36041 - segv in jrockit in pthread signaling code.  This
-    // seems to alleviate the problem.
-    try {
-      sock.shutdownInput();
-      sock.shutdownOutput();
-    }
-    catch (Exception e) {
-    }
-    try {
-      sock.close();
-    } catch (IOException ignore) {
-    } 
-    catch (VirtualMachineError err) {
-      SystemFailure.initiateFailure(err);
-      // If this ever returns, rethrow the error.  We're poisoned
-      // now, so don't let this thread continue.
-      throw err;
-    }
-    catch (java.security.ProviderException pe) {
-      // some ssl implementations have trouble with termination and throw
-      // this exception.  See bug #40783
-    }
-    catch (Error e) {
-      // Whenever you catch Error or Throwable, you must also
-      // catch VirtualMachineError (see above).  However, there is
-      // _still_ a possibility that you are dealing with a cascading
-      // error condition, so you also need to check to see if the JVM
-      // is still usable:
-      SystemFailure.checkFailure();
-      // Sun's NIO implementation has been known to throw Errors
-      // that are caused by IOExceptions.  If this is the case, it's
-      // okay.
-      if (e.getCause() instanceof IOException) {
-        // okay...
-
-      } else {
-        throw e;
-      }
-    }
-  }
   
   protected void initializeClientSocketFactory() {
     this.clientSocketFactory = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/16a6bc1a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 2cede25..d23e1d7 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -73,6 +73,7 @@ import com.gemstone.gemfire.internal.ClassLoadUtil;
 import com.gemstone.gemfire.internal.DummyStatisticsFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
 import com.gemstone.gemfire.internal.InternalInstantiator;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.Version;
@@ -1668,6 +1669,8 @@ public class CacheClientNotifier {
 
       // Close the statistics
       this._statistics.close();
+      
+      this.socketCloser.close();
     } 
   }
 
@@ -2120,6 +2123,7 @@ public class CacheClientNotifier {
     // Set the Cache
     this.setCache((GemFireCacheImpl)cache);
     this.acceptorStats = acceptorStats;
+    this.socketCloser = new SocketCloser(1); // we only need one thread per client
 
     // Set the LogWriter
     this.logWriter = (InternalLogWriter)cache.getLogger();
@@ -2385,6 +2389,10 @@ public class CacheClientNotifier {
     return this.acceptorStats;
   }
   
+  public SocketCloser getSocketCloser() {
+    return this.socketCloser;
+  }
+  
   public void addCompiledQuery(DefaultQuery query){
     if (this.compiledQueries.putIfAbsent(query.getQueryString(), query) == null){
       // Added successfully.
@@ -2651,6 +2659,8 @@ public class CacheClientNotifier {
 
   private SystemTimer.SystemTimerTask clientPingTask;
   
+  private final SocketCloser socketCloser;
+  
   private static final long CLIENT_PING_TASK_PERIOD =
     Long.getLong("gemfire.serverToClientPingPeriod", 60000);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/16a6bc1a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
index be0220a..15ad73b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientProxy.java
@@ -63,7 +63,7 @@ import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.SocketCreator;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 import com.gemstone.gemfire.internal.Version;
@@ -961,7 +961,7 @@ public class CacheClientProxy implements ClientSession {
       // 1. check to see if dispatcher is still alive
       if (this._messageDispatcher.isAlive()) {
         if (this._socket != null && !this._socket.isClosed()) {
-          SocketCreator.asyncClose(this._socket, this._remoteHostAddress, null);
+          this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
null);
           getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
         }
         destroyRQ();
@@ -1009,7 +1009,7 @@ public class CacheClientProxy implements ClientSession {
     // replaced when the client reconnects.
     releaseCommBuffer();
     if (this._remoteHostAddress != null) {
-      SocketCreator.closeAsyncThreadExecutor(this._remoteHostAddress);
+      this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(this._remoteHostAddress);
       this._remoteHostAddress = null;
     }
     try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/16a6bc1a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index 040c92e..b341acc 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -62,6 +62,7 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.DSFIDFactory;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SocketUtils;
 import com.gemstone.gemfire.internal.SystemTimer;
@@ -579,7 +580,7 @@ public class Connection implements Runnable {
       }
       catch (IOException io) {
         logger.fatal(LocalizedMessage.create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS),
io);
-        SocketCreator.asyncClose(s, this.remoteAddr.toString(), null);
+        t.getSocketCloser().asyncClose(s, this.remoteAddr.toString(), null);
         throw io;
       }
     }
@@ -844,7 +845,7 @@ public class Connection implements Runnable {
       r.run();
     }
     else {
-      SocketCreator.asyncClose(this.socket, String.valueOf(this.remoteAddr), r);
+      this.owner.getSocketCloser().asyncClose(this.socket, String.valueOf(this.remoteAddr),
r);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/16a6bc1a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index bb2de65..508eba2 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -40,6 +40,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
 import com.gemstone.gemfire.distributed.internal.membership.jgroup.JGroupMembershipManager;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.SocketCloser;
 import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -140,10 +141,12 @@ public class ConnectionTable  {
   /**
    * Executor used by p2p reader and p2p handshaker threads.
    */
-  private Executor p2pReaderThreadPool;
+  private final Executor p2pReaderThreadPool;
   /** Number of seconds to wait before timing out an unused p2p reader thread. Default is
120 (2 minutes). */
   private final static long READER_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME",
120).longValue();
   
+  private final SocketCloser socketCloser;
+  
   /**
    * The most recent instance to be created
    * 
@@ -217,6 +220,7 @@ public class ConnectionTable  {
     this.threadConnMaps = new ArrayList();
     this.threadConnectionMap = new ConcurrentHashMap();
     this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
+    this.socketCloser = new SocketCloser();
   /*  NOMUX: if (TCPConduit.useNIO) {
       inputMuxManager = new InputMuxManager(this);
       inputMuxManager.start(c.logger);
@@ -764,7 +768,6 @@ public class ConnectionTable  {
         if (localExec instanceof ExecutorService) {
           ((ExecutorService)localExec).shutdown();
         }
-        this.p2pReaderThreadPool = null;
       }
     }
     closeReceivers(false);
@@ -776,7 +779,7 @@ public class ConnectionTable  {
         m.clear();
       }        
     }
-    SocketCreator.closeAsyncThreadExecutors();
+    this.socketCloser.close();
   }
 
   public void executeCommand(Runnable runnable) { 
@@ -943,11 +946,15 @@ public class ConnectionTable  {
       }
       
       if (remoteAddress != null) {
-        SocketCreator.closeAsyncThreadExecutor(remoteAddress.toString());
+        this.socketCloser.releaseResourcesForAddress(remoteAddress.toString());
       }
     }
   }
   
+  SocketCloser getSocketCloser() {
+    return this.socketCloser;
+  }
+  
   /** check to see if there are still any receiver threads for the given end-point */
   protected boolean hasReceiversFor(Stub endPoint) {
     synchronized (this.receivers) {


Mime
View raw message