geode-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GEODE-3637) configureClientSSLSocket call can block Acceptor thread
Date Mon, 04 Dec 2017 23:17:00 GMT

    [ https://issues.apache.org/jira/browse/GEODE-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16277734#comment-16277734
] 

ASF GitHub Bot commented on GEODE-3637:
---------------------------------------

WireBaron commented on a change in pull request #1117: GEODE-3637: Reimplement client queue
initialization. Adding shutdown …
URL: https://github.com/apache/geode/pull/1117#discussion_r154804125
 
 

 ##########
 File path: geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 ##########
 @@ -534,103 +540,126 @@ public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
     this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
-    {
-      ThreadPoolExecutor tmp_pool = null;
-      String gName = "ServerConnection "
-          // + serverSock.getInetAddress()
-          + "on port " + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
-
-        public Thread newThread(final Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
+    pool = initializeServerConnectionThreadPool();
+    hsPool = initializeHandshakerThreadPool();
+    clientQueueInitPool = initializeClientQueueInitializerThreadPool();
+
+    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+    String postAuthzFactoryName =
+        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+    isPostAuthzCallbackPresent =
+        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ?
true : false;
+  }
+
+  private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
+    String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(Runnable command) {
+        String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incAcceptThreadsCreated();
+        return new Thread(socketThreadGroup, command, threadName);
+      }
+    };
+    try {
+      final BlockingQueue blockingQueue = new SynchronousQueue();
+      final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler()
{
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+          try {
+            blockingQueue.put(r);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt(); // preserve the state
+            throw new RejectedExecutionException(
+                LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
           }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incConnectionThreadsCreated();
-          Runnable r = new Runnable() {
-            public void run() {
-              try {
-                command.run();
-              } catch (CancelException e) { // bug 39463
-                // ignore
-              } finally {
-                ConnectionTable.releaseThreadsSockets();
-              }
-            }
-          };
-          return new Thread(socketThreadGroup, r, tName);
         }
       };
-      try {
-        if (isSelector()) {
-          tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
-              getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
-        } else {
-          tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections,
0L,
-              TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
-        }
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        throw poolInitException;
-      }
-      this.pool = tmp_pool;
+      logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
+      return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
+          socketThreadFactory, rejectedExecutionHandler);
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      this.pool.shutdownNow();
+      throw poolInitException;
     }
-    {
-      ThreadPoolExecutor tmp_hsPool = null;
-      String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+  }
 
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
+  private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException
{
+    final ThreadGroup clientQueueThreadGroup =
+        LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
 
-        public Thread newThread(Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
+    ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String threadName =
+            clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        Runnable runnable = new Runnable() {
+          public void run() {
+            try {
+              command.run();
+            } catch (CancelException e) {
+              logger.debug("Client Queue Initialization was canceled.", e);
+            }
           }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incAcceptThreadsCreated();
-          return new Thread(socketThreadGroup, command, tName);
-        }
-      };
-      try {
-        final BlockingQueue bq = new SynchronousQueue();
-        final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
-          public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+        };
+        return new Thread(clientQueueThreadGroup, runnable, threadName);
+      }
+    };
+    return new PooledExecutorWithDMStats(new SynchronousQueue(), 16, getStats().getCnxPoolHelper(),
 
 Review comment:
   Can we move some of these magic numbers to class constants.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> configureClientSSLSocket call can block Acceptor thread
> -------------------------------------------------------
>
>                 Key: GEODE-3637
>                 URL: https://issues.apache.org/jira/browse/GEODE-3637
>             Project: Geode
>          Issue Type: Bug
>          Components: client/server
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Vahram Aharonyan
>            Assignee: Udo Kohlmeyer
>            Priority: Critical
>
> org.apache.geode.internal.net.SocketCreator#configureClientSSLSocket timeout for Socket
is being configured before starting SSL handshake only if passed "timeout" argument is larger
than 0.
> Having sslSocket.startHandshake issued without setting timeout can result to the blocking
of caller thread as in GEODE-2898, GEODE-3023.
> Below is the example of Handshaker thread stack-trace that got stacked:
> "Handshaker /10.124.195.100:10000 Thread 183" Id=526300 in RUNNABLE (running in native)
> Total blocked: 4   Total waited: 884
>   java.net.SocketInputStream.socketRead0(Native Method)
>   java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>   java.net.SocketInputStream.read(SocketInputStream.java:171)
>   java.net.SocketInputStream.read(SocketInputStream.java:141)
>   sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>   sun.security.ssl.InputRecord.read(InputRecord.java:503)
>   sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
>   sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375)
>   sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403)
>   sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387)
>   org.apache.geode.internal.net.SocketCreator.configureClientSSLSocket(SocketCreator.java:1088)
>   org.apache.geode.internal.net.SocketCreator.connect(SocketCreator.java:967)
>   org.apache.geode.internal.net.SocketCreator.connect(SocketCreator.java:929)
>   org.apache.geode.internal.net.SocketCreator.connectForServer(SocketCreator.java:908)
>   org.apache.geode.internal.tcp.Connection.<init>(Connection.java:1306)
>   org.apache.geode.internal.tcp.Connection.createSender(Connection.java:1094)
>   org.apache.geode.internal.tcp.ConnectionTable.getOrderedAndOwned(ConnectionTable.java:553)
>   org.apache.geode.internal.tcp.ConnectionTable.get(ConnectionTable.java:664)
>   org.apache.geode.internal.tcp.TCPConduit.getConnection(TCPConduit.java:1037)
>   org.apache.geode.distributed.internal.direct.DirectChannel.getConnections(DirectChannel.java:543)
>   org.apache.geode.distributed.internal.direct.DirectChannel.sendToMany(DirectChannel.java:319)
>   org.apache.geode.distributed.internal.direct.DirectChannel.send(DirectChannel.java:605)
>   org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager.directChannelSend(GMSMembershipManager.java:1684)
>   org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager.send(GMSMembershipManager.java:1875)
>   org.apache.geode.distributed.internal.DistributionChannel.send(DistributionChannel.java:82)
>   org.apache.geode.distributed.internal.DistributionManager.sendOutgoing(DistributionManager.java:3416)
>   org.apache.geode.distributed.internal.DistributionManager.sendMessage(DistributionManager.java:3453)
>   org.apache.geode.distributed.internal.DistributionManager.putOutgoing(DistributionManager.java:1832)
>   org.apache.geode.internal.cache.UpdateAttributesProcessor.sendProfileUpdate(UpdateAttributesProcessor.java:162)
>   org.apache.geode.internal.cache.UpdateAttributesProcessor.distribute(UpdateAttributesProcessor.java:97)
>   org.apache.geode.internal.cache.DistributedRegion.initialized(DistributedRegion.java:1128)
>   org.apache.geode.internal.cache.LocalRegion.initialize(LocalRegion.java:2413)
>   org.apache.geode.internal.cache.DistributedRegion.initialize(DistributedRegion.java:1117)
>   org.apache.geode.internal.cache.HARegion.initialize(HARegion.java:345)
>   org.apache.geode.internal.cache.GemFireCacheImpl.createVMRegion(GemFireCacheImpl.java:3308)
>   org.apache.geode.internal.cache.HARegion.getInstance(HARegion.java:265)
>   org.apache.geode.internal.cache.ha.HARegionQueue.createHARegion(HARegionQueue.java:348)
>   org.apache.geode.internal.cache.ha.HARegionQueue.<init>(HARegionQueue.java:328)
>   org.apache.geode.internal.cache.ha.HARegionQueue$BlockingHARegionQueue.<init>(HARegionQueue.java:2199)
>   org.apache.geode.internal.cache.ha.HARegionQueue$DurableHARegionQueue.<init>(HARegionQueue.java:2450)
>   org.apache.geode.internal.cache.ha.HARegionQueue.getHARegionQueueInstance(HARegionQueue.java:2030)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.<init>(CacheClientProxy.java:2315)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.initializeMessageDispatcher(CacheClientProxy.java:1728)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.initializeProxy(CacheClientNotifier.java:660)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.registerClient(CacheClientNotifier.java:587)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.registerGFEClient(CacheClientNotifier.java:379)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.registerClient(CacheClientNotifier.java:324)
>   org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.handleNewClientConnection(AcceptorImpl.java:1510)
>   org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$5.run(AcceptorImpl.java:1298)
>   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message