geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [51/51] [abbrv] geode git commit: GEODE-3416: Reduce synchronization blockages in SocketCloser. Remove synchronization blocks around HashMap. Replace that implementation with simpler ThreadPool that is not unbounded and does not grow as the number of rem
Date Mon, 21 Aug 2017 21:09:08 GMT
GEODE-3416: Reduce synchronization blockages in SocketCloser.
Remove synchronization blocks around HashMap. Replace that implementation
with simpler ThreadPool that is not unbounded and does not grow as the
number of remoteAddress (clients/peers) are added


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/c6b20a91
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/c6b20a91
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/c6b20a91

Branch: refs/heads/feature/GEODE-3416
Commit: c6b20a91f701315639b12458f404075395016c87
Parents: b43f502
Author: Dave Barnes <dbarnes@pivotal.io>
Authored: Thu Aug 10 17:11:50 2017 -0700
Committer: Udo Kohlmeyer <ukohlmeyer@pivotal.io>
Committed: Mon Aug 21 14:07:30 2017 -0700

----------------------------------------------------------------------
 .../cache/tier/sockets/CacheClientProxy.java    |  47 +-----
 .../apache/geode/internal/net/SocketCloser.java | 165 +++++++++++--------
 .../internal/net/SocketCloserJUnitTest.java     | 155 ++++++-----------
 3 files changed, 141 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/c6b20a91/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index d7e3548..34f232d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -181,11 +181,7 @@ public class CacheClientProxy implements ClientSession {
    * True if we are connected to a client.
    */
   private volatile boolean connected = false;
-  // /**
-  // * A string representing interest in all keys
-  // */
-  // protected static final String ALL_KEYS = "ALL_KEYS";
-  //
+
   /**
    * True if a marker message is still in the ha queue.
    */
@@ -459,47 +455,6 @@ public class CacheClientProxy implements ClientSession {
     return this.proxyID;
   }
 
-  // the following code was commented out simply because it was not used
-  // /**
-  // * Determines if the proxy represents the client host (and only the host, not
-  // * necessarily the exact VM running on the host)
-  // *
-  // * @return Whether the proxy represents the client host
-  // */
-  // protected boolean representsClientHost(String clientHost)
-  // {
-  // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
-  // return this._remoteHostAddress.equals(clientHost);
-  // }
-
-  // protected boolean representsClientVM(DistributedMember remoteMember)
-  // {
-  // // logger.warn("Is input port " + clientPort + " contained in " +
-  // // logger.warn("Does input host " + clientHost + " equal " +
-  // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
-  // // logger.warn("representsClientVM: " +
-  // // (representsClientHost(clientHost) && containsPort(clientPort)));
-  // return (proxyID.getDistributedMember().equals(remoteMember));
-  // }
-
-  // /**
-  // * Determines if the CacheClientUpdater proxied by this instance is listening
-  // * on the input clientHost and clientPort
-  // *
-  // * @param clientHost
-  // * The host name of the client to compare
-  // * @param clientPort
-  // * The port number of the client to compare
-  // *
-  // * @return Whether the CacheClientUpdater proxied by this instance is
-  // * listening on the input clientHost and clientPort
-  // */
-  // protected boolean representsCacheClientUpdater(String clientHost,
-  // int clientPort)
-  // {
-  // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
-  // }
-
   protected boolean isMember(ClientProxyMembershipID memberId) {
     return this.proxyID.equals(memberId);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/c6b20a91/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index 6d86fd8..0a9a903 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -14,11 +14,17 @@
  */
 package org.apache.geode.internal.net;
 
+import org.apache.geode.SystemFailure;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.logging.log4j.Logger;
+
 import java.io.IOException;
 import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -26,12 +32,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-
 /**
  * This class allows sockets to be closed without blocking. In some cases we have seen a
call of
  * socket.close block for minutes. This class maintains a thread pool for every other member
we have
@@ -57,7 +57,7 @@ public class SocketCloser {
    * will queue up waiting for a thread.
    */
   static final int ASYNC_CLOSE_POOL_MAX_THREADS =
-      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 8).intValue();
+      Integer.getInteger("p2p.ASYNC_CLOSE_POOL_MAX_THREADS", 4).intValue();
   /**
    * How many milliseconds the synchronous requester waits for the async close to happen.
Default is
    * 0. Prior releases waited 50ms.
@@ -66,13 +66,16 @@ public class SocketCloser {
       Long.getLong("p2p.ASYNC_CLOSE_WAIT_MILLISECONDS", 0).longValue();
 
 
-  /** map of thread pools of async close threads */
-  private final HashMap<String, ThreadPoolExecutor> asyncCloseExecutors = new HashMap<>();
+  /**
+   * map of thread pools of async close threads
+   */
+  private final ConcurrentHashMap<String, ExecutorService> asyncCloseExecutors =
+      new ConcurrentHashMap<>();
   private final long asyncClosePoolKeepAliveSeconds;
   private final int asyncClosePoolMaxThreads;
   private final long asyncCloseWaitTime;
   private final TimeUnit asyncCloseWaitUnits;
-  private boolean closed;
+  private Boolean closed = Boolean.FALSE;
 
   public SocketCloser() {
     this(ASYNC_CLOSE_POOL_KEEP_ALIVE_SECONDS, ASYNC_CLOSE_POOL_MAX_THREADS,
@@ -96,26 +99,47 @@ public class SocketCloser {
     return this.asyncClosePoolMaxThreads;
   }
 
-  private ThreadPoolExecutor getAsyncThreadExecutor(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool == null) {
-        final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose",
logger);
-        ThreadFactory tf = new ThreadFactory() {
-          public Thread newThread(final Runnable command) {
-            Thread thread = new Thread(tg, command);
-            thread.setDaemon(true);
-            return thread;
-          }
-        };
-        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
-        pool = new ThreadPoolExecutor(this.asyncClosePoolMaxThreads, this.asyncClosePoolMaxThreads,
-            this.asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, workQueue, tf);
-        pool.allowCoreThreadTimeOut(true);
-        asyncCloseExecutors.put(address, pool);
+  private ExecutorService getAsyncThreadExecutor(String address) {
+    ExecutorService executorService = asyncCloseExecutors.get(address);
+    if (executorService == null) {
+      // To be used for pre-1.8 jdk releases.
+      // executorService = createThreadPoolExecutor();
+
+      executorService = getWorkStealingPool(asyncClosePoolMaxThreads);
+
+      ExecutorService previousThreadPoolExecutor =
+          asyncCloseExecutors.putIfAbsent(address, executorService);
+
+      if (previousThreadPoolExecutor != null) {
+        executorService.shutdownNow();
+        return previousThreadPoolExecutor;
       }
-      return pool;
     }
+    return executorService;
+  }
+
+  private ExecutorService getWorkStealingPool(int maxParallelThreads) {
+    return Executors.newWorkStealingPool(maxParallelThreads);
+  }
+
+  /**
+   * @deprecated since GEODE 1.3.0. Use @link{getWorkStealingPool}
+   */
+  @Deprecated
+  private ExecutorService createThreadPoolExecutor() {
+    final ThreadGroup threadGroup =
+        LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+    ThreadFactory threadFactory = new ThreadFactory() {
+      public Thread newThread(final Runnable command) {
+        Thread thread = new Thread(threadGroup, command);
+        thread.setDaemon(true);
+        return thread;
+      }
+    };
+
+    return new ThreadPoolExecutor(asyncClosePoolMaxThreads, asyncClosePoolMaxThreads,
+        asyncClosePoolKeepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+        threadFactory);
   }
 
   /**
@@ -123,19 +147,11 @@ public class SocketCloser {
    * longer needed. Currently a thread pool is kept for each address and if you know that
an address
    * no longer needs its pool then you should call this method.
    */
-  public void releaseResourcesForAddress(String address) {
-    synchronized (asyncCloseExecutors) {
-      ThreadPoolExecutor pool = asyncCloseExecutors.get(address);
-      if (pool != null) {
-        pool.shutdown();
-        asyncCloseExecutors.remove(address);
-      }
-    }
-  }
 
-  private boolean isClosed() {
-    synchronized (asyncCloseExecutors) {
-      return this.closed;
+  public void releaseResourcesForAddress(String address) {
+    ExecutorService executorService = asyncCloseExecutors.remove(address);
+    if (executorService != null) {
+      executorService.shutdown();
     }
   }
 
@@ -144,35 +160,22 @@ public class SocketCloser {
    * called then the asyncClose will be done synchronously.
    */
   public void close() {
-    synchronized (asyncCloseExecutors) {
+    synchronized (closed) {
       if (!this.closed) {
         this.closed = true;
-        for (ThreadPoolExecutor pool : asyncCloseExecutors.values()) {
-          pool.shutdown();
-        }
-        asyncCloseExecutors.clear();
+      } else {
+        return;
       }
     }
+    for (ExecutorService executorService : asyncCloseExecutors.values()) {
+      executorService.shutdown();
+    }
+    asyncCloseExecutors.clear();
   }
 
-  private void asyncExecute(String address, Runnable r) {
-    // Waiting 50ms for the async close request to complete is what the old (close per thread)
-    // code did. But now that we will not create a thread for every close request
-    // it seems better to let the thread that requested the close to move on quickly.
-    // So the default has changed to not wait. The system property p2p.ASYNC_CLOSE_WAIT_MILLISECONDS
-    // can be set to how many milliseconds to wait.
-    if (this.asyncCloseWaitTime == 0) {
-      getAsyncThreadExecutor(address).execute(r);
-    } else {
-      Future<?> future = getAsyncThreadExecutor(address).submit(r);
-      try {
-        future.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) {
-        // We want this code to wait at most 50ms for the close to happen.
-        // It is ok to ignore these exception and let the close continue
-        // in the background.
-      }
-    }
+  private Future asyncExecute(String address, Runnable runnableToExecute) {
+    ExecutorService asyncThreadExecutor = getAsyncThreadExecutor(address);
+    return asyncThreadExecutor.submit(runnableToExecute);
   }
 
   /**
@@ -181,29 +184,30 @@ public class SocketCloser {
    * this method may block for a certain amount of time. If it is called after the SocketCloser
is
    * closed then a normal synchronous close is done.
    * 
-   * @param sock the socket to close
+   * @param socket the socket to close
    * @param address identifies who the socket is connected to
    * @param extra an optional Runnable with stuff to execute in the async thread
    */
-  public void asyncClose(final Socket sock, final String address, final Runnable extra) {
-    if (sock == null || sock.isClosed()) {
+  public void asyncClose(final Socket socket, final String address, final Runnable extra)
{
+    if (socket == null || socket.isClosed()) {
       return;
     }
     boolean doItInline = false;
     try {
-      synchronized (asyncCloseExecutors) {
-        if (isClosed()) {
+      Future submittedTask = null;
+      synchronized (closed) {
+        if (closed) {
           // this SocketCloser has been closed so do a synchronous, inline, close
           doItInline = true;
         } else {
-          asyncExecute(address, new Runnable() {
+          submittedTask = asyncExecute(address, new Runnable() {
             public void run() {
               Thread.currentThread().setName("AsyncSocketCloser for " + address);
               try {
                 if (extra != null) {
                   extra.run();
                 }
-                inlineClose(sock);
+                inlineClose(socket);
               } finally {
                 Thread.currentThread().setName("unused AsyncSocketCloser");
               }
@@ -211,6 +215,9 @@ public class SocketCloser {
           });
         }
       }
+      if (submittedTask != null) {
+        waitForFutureTaskWithTimeout(submittedTask);
+      }
     } catch (OutOfMemoryError ignore) {
       // If we can't start a thread to close the socket just do it inline.
       // See bug 50573.
@@ -220,16 +227,28 @@ public class SocketCloser {
       if (extra != null) {
         extra.run();
       }
-      inlineClose(sock);
+      inlineClose(socket);
     }
   }
 
+  private void waitForFutureTaskWithTimeout(Future submittedTask) {
+    if (this.asyncCloseWaitTime != 0) {
+      try {
+        submittedTask.get(this.asyncCloseWaitTime, this.asyncCloseWaitUnits);
+      } catch (InterruptedException | ExecutionException | TimeoutException e) {
+        // We want this code to wait at most the asyncCloseWaitTime for the close to happen.
+        // It is ok to ignore these exception and let the close continue
+        // in the background.
+      }
+    }
+  }
 
   /**
    * Closes the specified socket
    * 
    * @param sock the socket to close
    */
+
   private static void inlineClose(final Socket sock) {
     // the next two statements are a mad attempt to fix bug
     // 36041 - segv in jrockit in pthread signaling code. This

http://git-wip-us.apache.org/repos/asf/geode/blob/c6b20a91/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
index 942cad4..a8b1d48 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/net/SocketCloserJUnitTest.java
@@ -14,22 +14,21 @@
  */
 package org.apache.geode.internal.net;
 
-import static org.junit.Assert.*;
-
-import java.net.Socket;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import org.apache.geode.internal.net.SocketCloser;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.UnitTest;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Tests the default SocketCloser.
@@ -62,86 +61,49 @@ public class SocketCloserJUnitTest {
    */
   @Test
   public void testAsync() {
-    final CountDownLatch cdl = new CountDownLatch(1);
+    final CountDownLatch countDownLatch = new CountDownLatch(1);
     final AtomicInteger waitingToClose = new AtomicInteger(0);
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          waitingToClose.incrementAndGet();
-          cdl.await();
-        } catch (InterruptedException e) {
-        }
-      }
-    };
 
     final int SOCKET_COUNT = 100;
-    final Socket[] aSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      aSockets[i] = createClosableSocket();
-    }
-    // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(aSockets[i], "A", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, aSockets[i].isClosed());
-    }
-    final Socket[] bSockets = new Socket[SOCKET_COUNT];
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      bSockets[i] = createClosableSocket();
-    }
+    final int REMOTE_CLIENT_COUNT = 200;
+
+    List<Socket> trackedSockets = new ArrayList<>();
     // Schedule a 100 sockets for async close.
-    // They should all be stuck on cdl.
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      this.socketCloser.asyncClose(bSockets[i], "B", r);
-    }
-    // Make sure the sockets have not been closed
-    for (int i = 0; i < SOCKET_COUNT; i++) {
-      assertEquals(false, bSockets[i].isClosed());
+    // They should all be stuck on countDownLatch.
+    for (int i = 0; i < REMOTE_CLIENT_COUNT; i++) {
+      Socket[] aSockets = new Socket[SOCKET_COUNT];
+      String address = i + "";
+      for (int j = 0; j < SOCKET_COUNT; j++) {
+        aSockets[j] = createClosableSocket();
+        trackedSockets.add(aSockets[j]);
+        this.socketCloser.asyncClose(aSockets[j], address, () -> {
+          try {
+            waitingToClose.incrementAndGet();
+            countDownLatch.await();
+          } catch (InterruptedException e) {
+          }
+        });
+      }
     }
+
     // close the socketCloser first to verify that the sockets
     // that have already been scheduled will be still be closed.
-    this.socketCloser.releaseResourcesForAddress("A");
     this.socketCloser.close();
-    // Each thread pool (one for A and one for B) has a max of 8 threads.
-    // So verify that this many are currently waiting on cdl.
-    {
-      final int maxThreads = this.socketCloser.getMaxThreads();
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          return waitingToClose.get() == 2 * maxThreads;
-        }
-
-        public String description() {
-          return "expected " + 2 * maxThreads + " waiters but found only " + waitingToClose.get();
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
-    // now count down the latch that allows the sockets to close
-    cdl.countDown();
+    countDownLatch.countDown();
     // now all the sockets should get closed; use a wait criteria
     // since a thread pool is doing to closes
-    {
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          for (int i = 0; i < SOCKET_COUNT; i++) {
-            if (!aSockets[i].isClosed() || !bSockets[i].isClosed()) {
-              return false;
-            }
-          }
-          return true;
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
+      boolean areAllClosed = true;
+      for (Iterator<Socket> iterator = trackedSockets.iterator(); iterator.hasNext();)
{
+        Socket socket = iterator.next();
+        if (socket.isClosed()) {
+          iterator.remove();
+          continue;
         }
-
-        public String description() {
-          return "one or more sockets did not close";
-        }
-      };
-      Wait.waitForCriterion(wc, 5000, 10, true);
-    }
+        areAllClosed = false;
+      }
+      return areAllClosed;
+    });
   }
 
   /**
@@ -150,18 +112,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocket() throws Exception {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
     Socket s = createClosableSocket();
     s.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    Wait.pause(10);
-    assertEquals(false, runnableCalled.get());
+    this.socketCloser.asyncClose(s, "A", () -> runnableCalled.set(true));
+    Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> !runnableCalled.get());
   }
 
   /**
@@ -170,25 +125,11 @@ public class SocketCloserJUnitTest {
   @Test
   public void testClosedSocketCloser() {
     final AtomicBoolean runnableCalled = new AtomicBoolean();
-    Runnable r = new Runnable() {
-      @Override
-      public void run() {
-        runnableCalled.set(true);
-      }
-    };
 
-    final Socket s = createClosableSocket();
+    final Socket closableSocket = createClosableSocket();
     this.socketCloser.close();
-    this.socketCloser.asyncClose(s, "A", r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        return runnableCalled.get() && s.isClosed();
-      }
-
-      public String description() {
-        return "runnable was not called or socket was not closed";
-      }
-    };
-    Wait.waitForCriterion(wc, 5000, 10, true);
+    this.socketCloser.asyncClose(closableSocket, "A", () -> runnableCalled.set(true));
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> runnableCalled.get() && closableSocket.isClosed());
   }
 }


Mime
View raw message