geode-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GEODE-3637) configureClientSSLSocket call can block Acceptor thread
Date Mon, 04 Dec 2017 21:36:00 GMT

    [ https://issues.apache.org/jira/browse/GEODE-3637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16277552#comment-16277552
] 

ASF GitHub Bot commented on GEODE-3637:
---------------------------------------

jhuynh1 commented on a change in pull request #1117: GEODE-3637: Reimplement client queue
initialization. Adding shutdown …
URL: https://github.com/apache/geode/pull/1117#discussion_r154781830
 
 

 ##########
 File path: geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
 ##########
 @@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.server.ClientSubscriptionConfig;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedLockBlackboard;
+import org.apache.geode.distributed.DistributedLockBlackboardImpl;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class AcceptorImplClientQueueDUnitTest implements Serializable {
+  private final Host host = Host.getHost(0);
+  private static final int numberOfEntries = 200;
+  private static final AtomicInteger eventCount = new AtomicInteger(0);
+  private static final AtomicBoolean completedClient2 = new AtomicBoolean(false);
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule =
+      CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
+          .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
+
+  @Rule
+  public SerializableTestName name = new SerializableTestName();
+
+  @Rule
+  public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  private DistributedLockBlackboard blackboard = null;
+
+  @Before
+  public void setup() throws Exception {
+    blackboard = DistributedLockBlackboardImpl.getInstance();
+  }
+
+  @After
+  public void tearDown() throws RemoteException {
+    blackboard.initCount();
+    host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
+      InitialImageOperation.slowImageProcessing = 0;
+      System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
+    }));
+  }
+
+  @Test
+  public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception
{
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
+      try {
+        return createSubscriptionServer(cacheRule.getCache());
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolSubscriptionEnabled(true);
+      clientCacheFactory.setPoolSubscriptionRedundancy(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
+          .set("durable-client-timeout", "300").set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      cache.readyForEvents();
+      cache.close(true);
+    });
+    vm3.invoke("Start Client2 to add entries to region", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      for (int i = 0; i < numberOfEntries; i++) {
+        region.put(i, i);
+      }
+      cache.close();
+    });
+
+    int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
+      try {
+        int serverPort = createSubscriptionServer(cacheRule.getCache());
+        InitialImageOperation.slowImageProcessing = 30;
+        return serverPort;
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm0.invoke("Turn on slow image processsing", () -> {
+      InitialImageOperation.slowImageProcessing = 30;
+    });
+
+    AsyncInvocation<Boolean> completedClient1 =
+        vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () ->
{
+
+          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+          clientCacheFactory.setPoolSubscriptionEnabled(true);
+          clientCacheFactory.setPoolSubscriptionRedundancy(1);
+          clientCacheFactory.setPoolMinConnections(1);
+          clientCacheFactory.setPoolMaxConnections(1);
+          clientCacheFactory.setPoolReadTimeout(200);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+          ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
+              .set("durable-client-timeout", "300").set("mcast-port", "0");
+          blackboard.incCount();
+          ClientCache cache = cacheFactory.create();
+
+          ClientRegionFactory<Object, Object> clientRegionFactory =
+              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter()
{
+            @Override
+            public void afterCreate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+
+            @Override
+            public void afterUpdate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+          }).create("subscriptionRegion");
+
+          region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+          cache.readyForEvents();
+          Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+              .until(() -> eventCount.get() == numberOfEntries);
+          cache.close();
+          return eventCount.get() == numberOfEntries;
+        });
+
+    vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+      while (true) {
+        Thread.sleep(100);
 
 Review comment:
   Possible timing issue?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> configureClientSSLSocket call can block Acceptor thread
> -------------------------------------------------------
>
>                 Key: GEODE-3637
>                 URL: https://issues.apache.org/jira/browse/GEODE-3637
>             Project: Geode
>          Issue Type: Bug
>          Components: client/server
>    Affects Versions: 1.1.0, 1.2.0
>            Reporter: Vahram Aharonyan
>            Assignee: Udo Kohlmeyer
>            Priority: Critical
>
> org.apache.geode.internal.net.SocketCreator#configureClientSSLSocket timeout for Socket
is being configured before starting SSL handshake only if passed "timeout" argument is larger
than 0.
> Having sslSocket.startHandshake issued without setting timeout can result to the blocking
of caller thread as in GEODE-2898, GEODE-3023.
> Below is the example of Handshaker thread stack-trace that got stacked:
> "Handshaker /10.124.195.100:10000 Thread 183" Id=526300 in RUNNABLE (running in native)
> Total blocked: 4   Total waited: 884
>   java.net.SocketInputStream.socketRead0(Native Method)
>   java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>   java.net.SocketInputStream.read(SocketInputStream.java:171)
>   java.net.SocketInputStream.read(SocketInputStream.java:141)
>   sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
>   sun.security.ssl.InputRecord.read(InputRecord.java:503)
>   sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)
>   sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375)
>   sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403)
>   sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387)
>   org.apache.geode.internal.net.SocketCreator.configureClientSSLSocket(SocketCreator.java:1088)
>   org.apache.geode.internal.net.SocketCreator.connect(SocketCreator.java:967)
>   org.apache.geode.internal.net.SocketCreator.connect(SocketCreator.java:929)
>   org.apache.geode.internal.net.SocketCreator.connectForServer(SocketCreator.java:908)
>   org.apache.geode.internal.tcp.Connection.<init>(Connection.java:1306)
>   org.apache.geode.internal.tcp.Connection.createSender(Connection.java:1094)
>   org.apache.geode.internal.tcp.ConnectionTable.getOrderedAndOwned(ConnectionTable.java:553)
>   org.apache.geode.internal.tcp.ConnectionTable.get(ConnectionTable.java:664)
>   org.apache.geode.internal.tcp.TCPConduit.getConnection(TCPConduit.java:1037)
>   org.apache.geode.distributed.internal.direct.DirectChannel.getConnections(DirectChannel.java:543)
>   org.apache.geode.distributed.internal.direct.DirectChannel.sendToMany(DirectChannel.java:319)
>   org.apache.geode.distributed.internal.direct.DirectChannel.send(DirectChannel.java:605)
>   org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager.directChannelSend(GMSMembershipManager.java:1684)
>   org.apache.geode.distributed.internal.membership.gms.mgr.GMSMembershipManager.send(GMSMembershipManager.java:1875)
>   org.apache.geode.distributed.internal.DistributionChannel.send(DistributionChannel.java:82)
>   org.apache.geode.distributed.internal.DistributionManager.sendOutgoing(DistributionManager.java:3416)
>   org.apache.geode.distributed.internal.DistributionManager.sendMessage(DistributionManager.java:3453)
>   org.apache.geode.distributed.internal.DistributionManager.putOutgoing(DistributionManager.java:1832)
>   org.apache.geode.internal.cache.UpdateAttributesProcessor.sendProfileUpdate(UpdateAttributesProcessor.java:162)
>   org.apache.geode.internal.cache.UpdateAttributesProcessor.distribute(UpdateAttributesProcessor.java:97)
>   org.apache.geode.internal.cache.DistributedRegion.initialized(DistributedRegion.java:1128)
>   org.apache.geode.internal.cache.LocalRegion.initialize(LocalRegion.java:2413)
>   org.apache.geode.internal.cache.DistributedRegion.initialize(DistributedRegion.java:1117)
>   org.apache.geode.internal.cache.HARegion.initialize(HARegion.java:345)
>   org.apache.geode.internal.cache.GemFireCacheImpl.createVMRegion(GemFireCacheImpl.java:3308)
>   org.apache.geode.internal.cache.HARegion.getInstance(HARegion.java:265)
>   org.apache.geode.internal.cache.ha.HARegionQueue.createHARegion(HARegionQueue.java:348)
>   org.apache.geode.internal.cache.ha.HARegionQueue.<init>(HARegionQueue.java:328)
>   org.apache.geode.internal.cache.ha.HARegionQueue$BlockingHARegionQueue.<init>(HARegionQueue.java:2199)
>   org.apache.geode.internal.cache.ha.HARegionQueue$DurableHARegionQueue.<init>(HARegionQueue.java:2450)
>   org.apache.geode.internal.cache.ha.HARegionQueue.getHARegionQueueInstance(HARegionQueue.java:2030)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientProxy$MessageDispatcher.<init>(CacheClientProxy.java:2315)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientProxy.initializeMessageDispatcher(CacheClientProxy.java:1728)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.initializeProxy(CacheClientNotifier.java:660)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.registerClient(CacheClientNotifier.java:587)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.registerGFEClient(CacheClientNotifier.java:379)
>   org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.registerClient(CacheClientNotifier.java:324)
>   org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.handleNewClientConnection(AcceptorImpl.java:1510)
>   org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$5.run(AcceptorImpl.java:1298)
>   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message