geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] geode git commit: GEODE-3416: Cleanup SocketCloser code to reduce the synchronization Old code would force single threaded behavior, new code will synchronize on the checking or changing or the closed flag
Date Fri, 11 Aug 2017 16:51:55 GMT
GEODE-3416: Cleanup SocketCloser code to reduce the synchronization
Old code would force single threaded behavior, new code will synchronize
on the checking or changing or the closed flag


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/821e03dc
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/821e03dc
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/821e03dc

Branch: refs/heads/feature/GEODE-3416
Commit: 821e03dc6f45a6a9a4c43045dfbb235a626c2ec7
Parents: ea2fc82
Author: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
Authored: Fri Aug 11 09:45:45 2017 -0700
Committer: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
Committed: Fri Aug 11 09:51:34 2017 -0700

----------------------------------------------------------------------
 .../cache/tier/sockets/CacheClientProxy.java    |   4 +-
 .../apache/geode/internal/net/SocketCloser.java | 192 +++++++++++++------
 .../apache/geode/internal/tcp/Connection.java   |   4 +-
 .../internal/net/SocketCloserJUnitTest.java     |  10 +-
 4 files changed, 140 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 98bfed9..34f232d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -949,7 +949,8 @@ public class CacheClientProxy implements ClientSession {
   private void closeSocket() {
     if (this._socketClosed.compareAndSet(false, true)) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, null);
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
+          null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
     }
   }
@@ -963,6 +964,7 @@ public class CacheClientProxy implements ClientSession {
     {
       String remoteHostAddress = this._remoteHostAddress;
       if (remoteHostAddress != null) {
+        this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
         this._remoteHostAddress = null;
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index fbbe797..46d69a8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -21,8 +21,10 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -49,27 +51,32 @@ public class SocketCloser {
    * minutes).
    */
   static final long ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120);
+      Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS", 120).longValue();
   /**
    * Maximum number of threads that can be doing a socket close. Any close requests over
this max
    * will queue up waiting for a thread.
    */
-  private static final int ASYNC_CLOSE_POOL_MAX_THREADS =
-      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8);
+  static final int ASYNC_CLOSE_POOL_MAX_THREADS =
+      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 4).intValue();
   /**
    * How many milliseconds the synchronous requester waits for the async close to happen.
Default is
    * 0. Prior releases waited 50ms.
    */
-  private static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
-      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0);
+  static final long ASYNC_CLOSE_WAIT_MILLISECONDS =
+      Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
 
 
+  /**
+   * map of thread pools of async close threads
+   */
+  private final ConcurrentHashMap<String, ExecutorService>
+      asyncCloseExecutors =
+      new ConcurrentHashMap<>();
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
-  private boolean closed;
-  private final ExecutorService socketCloserThreadPool;
+  private Boolean closed = Boolean.FALSE;
 
   public SocketCloser() {
     this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -82,30 +89,71 @@ public class SocketCloser {
   }
 
   public SocketCloser(long asyncClosePoolKeepAliveSeconds, int asyncClosePoolMaxThreads,
-      long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
+                      long asyncCloseWaitTime, TimeUnit asyncCloseWaitUnits) {
     this.asyncClosePoolKeepAliveSeconds = asyncClosePoolKeepAliveSeconds;
     this.asyncClosePoolMaxThreads = asyncClosePoolMaxThreads;
     this.asyncCloseWaitTime = asyncCloseWaitTime;
     this.asyncCloseWaitUnits = asyncCloseWaitUnits;
-
-    final ThreadGroup threadGroup =
-        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-    ThreadFactory threadFactory = command -> {
-      Thread thread = new Thread(threadGroup, command);
-      thread.setDaemon(true);
-      return thread;
-    };
-    socketCloserThreadPool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads,
-        this.asyncClosePoolMaxThreads, this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<>(), threadFactory);
   }
 
   public int getMaxThreads() {
     return this.asyncClosePoolMaxThreads;
   }
 
-  private boolean isClosed() {
-    return this.closed;
+  private ExecutorService getAsyncThreadExecutor(String address) {
+    ExecutorService executorService = asyncCloseExecutors.get(address);
+    if (executorService == null) {
+      //To be used for pre-1.8 jdk releases.
+//      createThreadPool();
+
+      executorService = Executors.newWorkStealingPool(asyncClosePoolMaxThreads);
+
+      ExecutorService
+          previousThreadPoolExecutor =
+          asyncCloseExecutors.putIfAbsent(address, executorService);
+
+      if (previousThreadPoolExecutor != null) {
+        executorService.shutdownNow();
+        return previousThreadPoolExecutor;
+      }
+    }
+    return executorService;
+  }
+
+  /**
+   * @deprecated this method is to be used for pre 1.8 jdk.
+   */
+  @Deprecated
+  private void createThreadPool() {
+    ExecutorService executorService;
+    final ThreadGroup
+        threadGroup =
+        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+    ThreadFactory threadFactory = new ThreadFactory() {
+      public Thread newThread(final Runnable command) {
+        Thread thread = new Thread(threadGroup, command);
+        thread.setDaemon(true);
+        return thread;
+      }
+    };
+
+    executorService =
+        new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
+            asyncCloseWaitTime,
+            asyncCloseWaitUnits, new LinkedBlockingQueue<>(), threadFactory);
+  }
+
+  /**
+   * Call this method if you know all the resources in the closer for the given address are
no
+   * longer needed. Currently a thread pool is kept for each address and if you know that
an address
+   * no longer needs its pool then you should call this method.
+   */
+
+  public void releaseResourcesForAddress(String address) {
+    ExecutorService executorService = asyncCloseExecutors.remove(address);
+    if (executorService != null) {
+      executorService.shutdown();
+    }
   }
 
   /**
@@ -113,84 +161,104 @@ public class SocketCloser {
    * called then the asyncClose will be done synchronously.
    */
   public void close() {
-    if (!this.closed) {
-      this.closed = true;
-      socketCloserThreadPool.shutdown();
+    synchronized (closed) {
+      if (!this.closed) {
+        this.closed = true;
+      } else {
+        return;
+      }
+    }
+    for (ExecutorService executorService : asyncCloseExecutors.values()) {
+      executorService.shutdown();
+      asyncCloseExecutors.clear();
     }
   }
 
+  private Future asyncExecute(String address, Runnable runnableToExecute) {
+    ExecutorService asyncThreadExecutor = getAsyncThreadExecutor(address);
+    return asyncThreadExecutor.submit(runnableToExecute);
+  }
+
   /**
    * Closes the specified socket in a background thread. In some cases we see close hang
(see bug
    * 33665). Depending on how the SocketCloser is configured (see ASYNC_CLOSE_WAIT_MILLISECONDS)
    * this method may block for a certain amount of time. If it is called after the SocketCloser
is
    * closed then a normal synchronous close is done.
-   * 
-   * @param socket the socket to close
-   * @param runnableCode an optional Runnable with stuff to execute in the async thread
+   * @param sock the socket to close
+   * @param address identifies who the socket is connected to
+   * @param extra an optional Runnable with stuff to execute in the async thread
    */
-  public void asyncClose(final Socket socket, final Runnable runnableCode) {
-    if (socket == null || socket.isClosed()) {
+  public void asyncClose(final Socket sock, final String address, final Runnable extra) {
+    if (sock == null || sock.isClosed()) {
       return;
     }
-
     boolean doItInline = false;
     try {
-      if (isClosed()) {
-        // this SocketCloser has been closed so do a synchronous, inline, close
-        doItInline = true;
-      } else {
-        socketCloserThreadPool.execute(() -> {
-          if (runnableCode != null) {
-            runnableCode.run();
-          }
-          inlineClose(socket);
-        });
-        if (this.asyncCloseWaitTime != 0) {
-          try {
-            Future future = socketCloserThreadPool.submit(() -> {
-              if (runnableCode != null) {
-                runnableCode.run();
+      Future submittedTask = null;
+      synchronized (closed) {
+        if (closed) {
+          // this SocketCloser has been closed so do a synchronous, inline, close
+          doItInline = true;
+        } else {
+          submittedTask = asyncExecute(address, new Runnable() {
+            public void run() {
+              Thread.currentThread().setName("AsyncSocketCloser for " + address);
+              try {
+                if (extra != null) {
+                  extra.run();
+                }
+                inlineClose(sock);
+              } finally {
+                Thread.currentThread().setName("unused AsyncSocketCloser");
               }
-              inlineClose(socket);
-            });
-            future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
-          } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            // We want this code to wait at most 50ms for the close to happen.
-            // It is ok to ignore these exception and let the close continue
-            // in the background.
-          }
+            }
+          });
         }
       }
+      if (submittedTask != null) {
+        waitForFutureTaskWithTimeout(submittedTask);
+      }
     } catch (OutOfMemoryError ignore) {
       // If we can't start a thread to close the socket just do it inline.
       // See bug 50573.
       doItInline = true;
     }
     if (doItInline) {
-      if (runnableCode != null) {
-        runnableCode.run();
+      if (extra != null) {
+        extra.run();
       }
-      inlineClose(socket);
+      inlineClose(sock);
     }
   }
 
+  private void waitForFutureTaskWithTimeout(Future submittedTask) {
+    if (this.asyncCloseWaitTime != 0) {
+      try {
+        submittedTask.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        // We want this code to wait at most 50ms for the close to happen.
+        // It is ok to ignore these exception and let the close continue
+        // in the background.
+      }
+    }
+  }
 
   /**
    * Closes the specified socket
-   * 
-   * @param socket the socket to close
+   * @param sock the socket to close
    */
-  private void inlineClose(final Socket socket) {
+
+  private static void inlineClose(final Socket sock) {
     // the next two statements are a mad attempt to fix bug
     // 36041 - segv in jrockit in pthread signaling code. This
     // seems to alleviate the problem.
     try {
-      socket.shutdownInput();
-      socket.shutdownOutput();
+      sock.shutdownInput();
+      sock.shutdownOutput();
     } catch (Exception e) {
     }
     try {
-      socket.close();
+      sock.close();
     } catch (IOException ignore) {
     } catch (VirtualMachineError err) {
       SystemFailure.initiateFailure(err);

http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 954a33c..0ecb3bf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -562,7 +562,7 @@ public class Connection implements Runnable {
       } catch (IOException io) {
         logger.fatal(LocalizedMessage
             .create(LocalizedStrings.Connection_UNABLE_TO_GET_P2P_CONNECTION_STREAMS), io);
-        t.getSocketCloser().asyncClose(socket, null);
+        t.getSocketCloser().asyncClose(socket, this.remoteAddr.toString(), null);
         throw io;
       }
     }
@@ -847,7 +847,7 @@ public class Connection implements Runnable {
         Socket s = this.socket;
         if (s != null && !s.isClosed()) {
           prepareForAsyncClose();
-          this.owner.getSocketCloser().asyncClose(s, null);
+          this.owner.getSocketCloser().asyncClose(s, String.valueOf(this.remoteAddr), null);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/geode/blob/821e03dc/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index b6dbfe2..90315ce 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -72,11 +72,11 @@ public class SocketCloserJUnitTest {
     // They should all be stuck on countDownLatch.
     for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
       Socket[] aSockets = new Socket[SOCKET_COUNT];
-
+      String address = i + "";
       for (int j = 0; j < SOCKET_COUNT; j++) {
         aSockets[j] = createClosableSocket();
         trackedSockets.add(aSockets[j]);
-        this.socketCloser.asyncClose(aSockets[j], () -> {
+        this.socketCloser.asyncClose(aSockets[j], address, () -> {
           try {
             waitingToClose.incrementAndGet();
             countDownLatch.await();
@@ -94,7 +94,7 @@ public class SocketCloserJUnitTest {
     // since a thread pool is doing to closes
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
       boolean areAllClosed = true;
-      for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();)
{
+      for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();
) {
         Socket socket = iterator.next();
         if (socket.isClosed()) {
           iterator.remove();
@@ -115,7 +115,7 @@ public class SocketCloserJUnitTest {
 
     Socket s = createClosableSocket();
     s.close();
-    this.socketCloser.asyncClose(s, () -> runnableCalled.set(true));
+    this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
     Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get());
   }
 
@@ -128,7 +128,7 @@ public class SocketCloserJUnitTest {
 
     final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(closableSocket, () -> runnableCalled.set(true));
+    this.socketCloser.asyncClose(closableSocket, "A", () -> runnableCalled.set(true));
     Awaitility.await().atMost(5, TimeUnit.SECONDS)
         .until(() -> runnableCalled.get() && closableSocket.isClosed());
   }


Mime
View raw message