geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [1/2] incubator-geode git commit: p2p readers and handshakers threads are now pooled
Date Tue, 15 Sep 2015 23:47:54 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-332 [created] 6d6c760cc


p2p readers and handshakers threads are now pooled


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fa017689
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fa017689
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fa017689

Branch: refs/heads/feature/GEODE-332
Commit: fa017689ef73d9003eade82c6cb27634c03be05f
Parents: 4e65f0c
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Tue Sep 15 16:03:40 2015 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Tue Sep 15 16:03:40 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/tcp/Connection.java        | 58 +++++++++++--------
 .../gemfire/internal/tcp/ConnectionTable.java   | 61 +++++++++++++++++++-
 2 files changed, 93 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index cd1b7dc..630ecfe 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -115,6 +115,11 @@ public class Connection implements Runnable {
 
   /** the table holding this connection */
   final ConnectionTable owner;
+  
+  /** Set to false once run() is terminating. Using this instead of Thread.isAlive  
+    * as the reader thread may be a pooled thread.
+    */ 
+  private volatile boolean isRunning = false; 
 
   /** true if connection is a shared resource that can be used by more than one thread */
   private boolean sharedResource;
@@ -136,11 +141,14 @@ public class Connection implements Runnable {
   }
 
   private final static ThreadLocal isReaderThread = new ThreadLocal();
-  // return true if this thread is a reader thread
   public final static void makeReaderThread() {
     // mark this thread as a reader thread
-    isReaderThread.set(Boolean.TRUE);
+    makeReaderThread(true);
   }
+  private final static void makeReaderThread(boolean v) {
+    isReaderThread.set(v);
+  }
+  // return true if this thread is a reader thread
   public final static boolean isReaderThread() {
     Object o = isReaderThread.get();
     if (o == null) {
@@ -523,7 +531,7 @@ public class Connection implements Runnable {
     Connection c = new Connection(t, s);
     boolean readerStarted = false;
     try {
-      c.startReader();
+      c.startReader(t);
       readerStarted = true;
     } finally {
       if (!readerStarted) {
@@ -822,11 +830,12 @@ public class Connection implements Runnable {
     Runnable r = new Runnable() {
       public void run() {
         boolean rShuttingDown = readerShuttingDown;
+        final Thread localRef = readerThread;
         synchronized(stateLock) {
-          if (readerThread != null && readerThread.isAlive() &&
+          if (localRef != null && isRunning &&
               !rShuttingDown && connectionState == STATE_READING
               || connectionState == STATE_READING_ACK) {
-            readerThread.interrupt();
+            localRef.interrupt();
           }
         }
       }
@@ -951,7 +960,7 @@ public class Connection implements Runnable {
    *
    * @throws IOException if handshake fails
    */
-  private void attemptHandshake() throws IOException {
+  private void attemptHandshake(ConnectionTable connTable) throws IOException {
     // send HANDSHAKE
     // send this server's port.  It's expected on the other side
     if (useNIO()) {
@@ -961,7 +970,7 @@ public class Connection implements Runnable {
       handshakeStream();
     }
 
-    startReader(); // this reader only reads the handshake and then exits
+    startReader(connTable); // this reader only reads the handshake and then exits
     waitForHandshake(); // waiting for reply
   }
 
@@ -1099,7 +1108,7 @@ public class Connection implements Runnable {
         if (conn != null) {
           // handshake
           try {
-            conn.attemptHandshake();
+            conn.attemptHandshake(t);
             if (conn.isSocketClosed()) {
               // something went wrong while reading the handshake
               // and the socket was closed or this guy sent us a
@@ -1601,14 +1610,14 @@ public class Connection implements Runnable {
         this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE)
{
       isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
     }
-    if (!beingSick && this.readerThread != null && !isIBM && this.readerThread.isAlive()
+    if (!beingSick && this.readerThread != null && !isIBM && this.isRunning
         && this.readerThread != Thread.currentThread()) {
       try {
         this.readerThread.join(500);
-        if (this.readerThread.isAlive() && !this.readerShuttingDown
+        if (this.isRunning && !this.readerShuttingDown
             && owner.getDM().getRootCause() == null) { // don't wait twice if there's
a system failure
           this.readerThread.join(1500);
-          if (this.readerThread.isAlive()) {
+          if (this.isRunning) {
             logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH,
this));
           }
         }
@@ -1677,26 +1686,22 @@ public class Connection implements Runnable {
   }
 
   /** starts a reader thread */
-  private void startReader() {
-    ThreadGroup group =
-      LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
-    Assert.assertTrue(this.readerThread == null);
-    this.readerThread =
-      new Thread(group, this, p2pReaderName());
-    this.readerThread.setDaemon(true);
-    stopped = false;
-    this.readerThread.start();
-  }
+  private void startReader(ConnectionTable connTable) { 
+    Assert.assertTrue(!this.isRunning); 
+    stopped = false; 
+    this.isRunning = true; 
+    connTable.executeCommand(this);  
+  } 
 
 
   /** in order to read non-NIO socket-based messages we need to have a thread
       actively trying to grab bytes out of the sockets input queue.
       This is that thread. */
   public void run() {
+    this.readerThread = Thread.currentThread();
+    this.readerThread.setName(p2pReaderName());
     ConnectionTable.threadWantsSharedResources();
-    if (this.isReceiver) {
-      makeReaderThread();
-    }
+    makeReaderThread(this.isReceiver);
     try {
       if (useNIO()) {
         runNioReader();
@@ -1725,6 +1730,9 @@ public class Connection implements Runnable {
       // for the handshake.
       // see bug 37524 for an example of listeners hung in waitForHandshake
       notifyHandshakeWaiter(false);
+      this.isRunning = false;
+      this.readerThread.setName("idle p2p reader");
+      this.readerThread = null;
     } // finally
   }
 
@@ -3307,7 +3315,7 @@ public class Connection implements Runnable {
   protected Object stateLock = new Object();
   
   /** for timeout processing, this is the current state of the connection */
-  protected byte connectionState;
+  protected byte connectionState = STATE_IDLE;
   
   /*~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~*/
   /** the connection is idle, but may be in use */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 9beb947..525c687 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -19,8 +19,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.logging.log4j.Logger;
@@ -37,6 +44,7 @@ import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 
@@ -129,7 +137,13 @@ public class ConnectionTable  {
    */
   private volatile boolean closed = false;
 
-
+  /**
+   * Executor used by p2p reader and p2p handshaker threads.
+   */
+  private Executor p2pReaderThreadPool;
+  /** Number of seconds to wait before timing out an unused p2p reader thread. Default is
120 (2 minutes). */
+  private final static long READER_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME",
120).longValue();
+  
   /**
    * The most recent instance to be created
    * 
@@ -202,11 +216,40 @@ public class ConnectionTable  {
     this.threadOrderedConnMap = new ThreadLocal();
     this.threadConnMaps = new ArrayList();
     this.threadConnectionMap = new ConcurrentHashMap();
+    this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
   /*  NOMUX: if (TCPConduit.useNIO) {
       inputMuxManager = new InputMuxManager(this);
       inputMuxManager.start(c.logger);
     }*/
   }
+  
+  private Executor createThreadPoolForIO(boolean conserveSockets) { 
+    Executor executor = null; 
+    final ThreadGroup connectionRWGroup = LoggingThreadGroup.createThreadGroup("P2P Reader
Threads", logger);
+    if (conserveSockets) { 
+      executor = new Executor() { 
+        @Override 
+        public void execute(Runnable command) { 
+          Thread th = new Thread(connectionRWGroup, command); 
+          th.setDaemon(true); 
+          th.start(); 
+        } 
+      }; 
+    } 
+    else { 
+      BlockingQueue synchronousQueue = new SynchronousQueue(); 
+      ThreadFactory tf = new ThreadFactory() { 
+        public Thread newThread(final Runnable command) { 
+          Thread thread = new Thread(connectionRWGroup, command); 
+          thread.setDaemon(true); 
+          return thread; 
+        } 
+      }; 
+      executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME,

+          TimeUnit.SECONDS, synchronousQueue, tf); 
+    } 
+    return executor; 
+  } 
 
   /** conduit sends connected() after establishing the server socket */
 //   protected void connected() {
@@ -715,6 +758,15 @@ public class ConnectionTable  {
         this.threadConnMaps.clear();
       }
     }
+    {
+      Executor localExec = this.p2pReaderThreadPool;
+      if (localExec != null) {
+        if (localExec instanceof ExecutorService) {
+          ((ExecutorService)localExec).shutdown();
+        }
+        this.p2pReaderThreadPool = null;
+      }
+    }
     closeReceivers(false);
     
     Map m = (Map)this.threadOrderedConnMap.get();
@@ -726,6 +778,13 @@ public class ConnectionTable  {
     }
   }
 
+  public void executeCommand(Runnable runnable) { 
+    Executor local = this.p2pReaderThreadPool;
+    if (local != null) {
+      local.execute(runnable);
+    }
+  }
+  
   /**
    * Close all receiving threads.  This is used during shutdown and is also
    * used by a test hook that makes us deaf to incoming messages.


Mime
View raw message