geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [geode] 02/13: GEODE-5020: Move HARegionQueue system properties to SystemPropertyHelper
Date Mon, 23 Apr 2018 05:46:40 GMT
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7cc9e8fbc09a4f553b5d6b94a0615820fdd3c9af
Author: Kirk Lund <klund@apache.org>
AuthorDate: Thu Apr 5 15:11:38 2018 -0700

    GEODE-5020: Move HARegionQueue system properties to SystemPropertyHelper
    
    Rename ClientSubscriptionExpiryDataLossRegressionTest as
    HARegionQueueExpiryRegressionTest.
    
    Rename Bug48879DUnitTest as HARegionQueueThreadIdExpiryRegressionTest.
    
    Overhaul HARegionQueueThreadIdExpiryRegressionTest:
    * remove Thread sleeps
    * use Rules and Awaitility
    * use System properties instead of public static variables
---
 .../geode/internal/cache/ha/HARegionQueue.java     |  67 ++++--
 .../geode/internal/lang/SystemPropertyHelper.java  |   4 +
 .../geode/internal/cache/ha/Bug48879DUnitTest.java | 227 ---------------------
 ...java => HARegionQueueExpiryRegressionTest.java} |   6 +-
 .../internal/cache/ha/HARegionQueueJUnitTest.java  |   4 +-
 .../HARegionQueueThreadIdExpiryRegressionTest.java | 198 ++++++++++++++++++
 6 files changed, 255 insertions(+), 251 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 630bdea..8a5ccbb 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -14,6 +14,10 @@
  */
 package org.apache.geode.internal.cache.ha;
 
+import static org.apache.geode.internal.lang.SystemPropertyHelper.HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.THREAD_ID_EXPIRY_TIME_PROPERTY;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.getProductIntegerProperty;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -30,6 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
@@ -67,7 +72,6 @@ import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -239,12 +243,6 @@ public class HARegionQueue implements RegionQueue {
   public static final long INIT_OF_SEQUENCEID = -1L;
 
   /**
-   * Constant used to set region entry expiry time using system property.
-   */
-  public static final String REGION_ENTRY_EXPIRY_TIME =
-      DistributionConfig.GEMFIRE_PREFIX + "MessageTimeToLive";
-
-  /**
    * The default frequency (in seconds) at which a message will be sent by the primary to
all the
    * secondary nodes to remove the events which have already been dispatched from the queue.
    */
@@ -298,12 +296,6 @@ public class HARegionQueue implements RegionQueue {
   static boolean testMarkerMessageRecieved = false;
   static boolean isUsedByTest = false;
 
-  static final int DEFAULT_THREAD_ID_EXPIRY_TIME = 300;
-  /**
-   * For testing purposes, allows resetting expiry time for ThreadIdentifiers. In seconds.
-   */
-  public static int threadIdExpiryTime = DEFAULT_THREAD_ID_EXPIRY_TIME;
-
   /**
    * Used by durable queues to maintain acked events by client
    */
@@ -604,6 +596,7 @@ public class HARegionQueue implements RegionQueue {
    *
    * @param object object to put onto the queue
    */
+  @Override
   public boolean put(Object object) throws CacheException, InterruptedException {
     this.giiLock.readLock().lock(); // fix for bug #41681 - durable client misses event
     try {
@@ -1125,6 +1118,7 @@ public class HARegionQueue implements RegionQueue {
   /**
    * Returns the underlying region that backs this queue.
    */
+  @Override
   public HARegion getRegion() {
     return this.region;
   }
@@ -1190,6 +1184,7 @@ public class HARegionQueue implements RegionQueue {
    * @throws CacheException The exception can be thrown by BlockingQueue if it encounters
    *         InterruptedException while waiting for data
    */
+  @Override
   public Object take() throws CacheException, InterruptedException {
     Conflatable object = null;
     Long next = null;
@@ -1227,6 +1222,7 @@ public class HARegionQueue implements RegionQueue {
     return object;
   }
 
+  @Override
   public List take(int batchSize) throws CacheException, InterruptedException {
     List batch = new ArrayList(batchSize * 2);
     for (int i = 0; i < batchSize; i++) {
@@ -1260,6 +1256,7 @@ public class HARegionQueue implements RegionQueue {
    * Removes the events that were peeked by this thread. The events are destroyed from the
queue and
    * conflation map and DispatchedAndCurrentEvents are updated accordingly.
    */
+  @Override
   public void remove() throws InterruptedException {
     List peekedIds = (List) HARegionQueue.peekedEventsContext.get();
 
@@ -1352,6 +1349,7 @@ public class HARegionQueue implements RegionQueue {
     // ARB: Implemented in DurableHARegionQueue.
   }
 
+  @Override
   public Object peek() throws InterruptedException {
     if (Thread.interrupted())
       throw new InterruptedException();
@@ -1393,6 +1391,7 @@ public class HARegionQueue implements RegionQueue {
     return object;
   }
 
+  @Override
   public List peek(int batchSize) throws InterruptedException {
     return peek(batchSize, -1);
   }
@@ -1436,6 +1435,7 @@ public class HARegionQueue implements RegionQueue {
    *
    * @return The list of events peeked
    */
+  @Override
   public List peek(int batchSize, int timeToWait) throws InterruptedException {
     long start = System.currentTimeMillis();
     long end = start + timeToWait;
@@ -1531,10 +1531,12 @@ public class HARegionQueue implements RegionQueue {
     return batch;
   }
 
+  @Override
   public void addCacheListener(CacheListener listener) {
     // nothing
   }
 
+  @Override
   public void removeCacheListener() {
     // nothing
   }
@@ -1738,6 +1740,7 @@ public class HARegionQueue implements RegionQueue {
    *
    * @return the size of the queue
    */
+  @Override
   public int size() {
     acquireReadLock();
     try {
@@ -1983,8 +1986,9 @@ public class HARegionQueue implements RegionQueue {
                 .toLocalizedString(new Object[] {BLOCKING_HA_QUEUE, NON_BLOCKING_HA_QUEUE}));
     }
     if (!isDurable) {
-      Integer expiryTime = Integer.getInteger(REGION_ENTRY_EXPIRY_TIME, hrqa.getExpiryTime());
-      hrqa.setExpiryTime(expiryTime);
+      Optional<Integer> expiryTime =
+          getProductIntegerProperty(HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY);
+      hrqa.setExpiryTime(expiryTime.orElseGet(hrqa::getExpiryTime));
       ExpirationAttributes ea =
           new ExpirationAttributes(hrqa.getExpiryTime(), ExpirationAction.LOCAL_INVALIDATE);
       hrq.region.getAttributesMutator().setEntryTimeToLive(ea);
@@ -3266,6 +3270,7 @@ public class HARegionQueue implements RegionQueue {
      *
      * @see org.apache.geode.internal.DataSerializableFixedID#fromData(java.io.DataInput)
      */
+    @Override
     public void fromData(DataInput in) throws IOException, ClassNotFoundException {
       synchronized (this) {
         this.lastDispatchedSequenceId = in.readLong();
@@ -3278,6 +3283,7 @@ public class HARegionQueue implements RegionQueue {
      *
      * @see org.apache.geode.internal.DataSerializableFixedID#getDSFID()
      */
+    @Override
     public int getDSFID() {
       return DISPATCHED_AND_CURRENT_EVENTS;
     }
@@ -3287,6 +3293,7 @@ public class HARegionQueue implements RegionQueue {
      *
      * @see org.apache.geode.internal.DataSerializableFixedID#toData(java.io.DataOutput)
      */
+    @Override
     public void toData(DataOutput out) throws IOException {
       synchronized (this) { // fix for bug #41621
         out.writeLong(this.lastDispatchedSequenceId);
@@ -3307,6 +3314,7 @@ public class HARegionQueue implements RegionQueue {
   }
 
   // TODO:Asif : Remove this method
+  @Override
   public void remove(int top) {
     throw new UnsupportedOperationException(
         LocalizedStrings.HARegionQueue_HAREGIONQUEUE_AND_ITS_DERIVED_CLASS_DO_NOT_SUPPORT_THIS_OPERATION
@@ -3543,6 +3551,7 @@ public class HARegionQueue implements RegionQueue {
         // Start a new thread which will update the clientMessagesRegion for
         // each of the HAEventWrapper instances present in the wrapperSet
         Thread regionCleanupTask = new Thread(new Runnable() {
+          @Override
           public void run() {
             try {
               Iterator iter = wrapperSet.iterator();
@@ -3779,16 +3788,24 @@ public class HARegionQueue implements RegionQueue {
 
   /** this is used to expire thread identifiers, even in primary queues */
   static class ThreadIdentifierCustomExpiry implements CustomExpiry {
-    private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS = new ExpirationAttributes(
-        HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
-    private static volatile ExpirationAttributes testExpAtts = null;
 
+    /**
+     * expiry time for ThreadIdentifiers. In seconds.
+     */
+    static final int DEFAULT_THREAD_ID_EXPIRY_TIME = 300;
+
+    private static final ExpirationAttributes DEFAULT_THREAD_ID_EXP_ATTS =
+        new ExpirationAttributes(DEFAULT_THREAD_ID_EXPIRY_TIME, ExpirationAction.LOCAL_INVALIDATE);
+
+    private static volatile ExpirationAttributes testExpAtts;
+
+    @Override
     public ExpirationAttributes getExpiry(Region.Entry entry) {
       // Use key to determine expiration.
       Object key = entry.getKey();
       if (key instanceof ThreadIdentifier) {
-        final int expTime = HARegionQueue.threadIdExpiryTime;
-        if (expTime != HARegionQueue.DEFAULT_THREAD_ID_EXPIRY_TIME) {
+        final int expTime = calculateThreadIdExpiryTime();
+        if (expTime != DEFAULT_THREAD_ID_EXPIRY_TIME) {
           // This should only happen in unit test code
           ExpirationAttributes result = testExpAtts;
           if (result == null || result.getTimeout() != expTime) {
@@ -3806,6 +3823,14 @@ public class HARegionQueue implements RegionQueue {
       }
     }
 
-    public void close() {}
+    @Override
+    public void close() {
+      // nothing
+    }
+
+    private static int calculateThreadIdExpiryTime() {
+      Optional<Integer> expiryTime = getProductIntegerProperty(THREAD_ID_EXPIRY_TIME_PROPERTY);
+      return expiryTime.orElse(DEFAULT_THREAD_ID_EXPIRY_TIME);
+    }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
index fc2551d..a039628 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/lang/SystemPropertyHelper.java
@@ -61,6 +61,10 @@ public class SystemPropertyHelper {
 
   public static final String DEFAULT_DISK_DIRS_PROPERTY = "defaultDiskDirs";
 
+  public static final String HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY = "MessageTimeToLive";
+
+  public static final String THREAD_ID_EXPIRY_TIME_PROPERTY = "threadIdExpiryTime";
+
   /**
    * This method will try to look up "geode." and "gemfire." versions of the system property.
It
    * will check and prefer "geode." setting first, then try to check "gemfire." setting.
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug48879DUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug48879DUnitTest.java
deleted file mode 100644
index 8e7952e..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/Bug48879DUnitTest.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-package org.apache.geode.internal.cache.ha;
-
-import static org.apache.geode.distributed.ConfigurationProperties.*;
-import static org.junit.Assert.*;
-
-import java.util.Properties;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.internal.AvailablePort;
-import org.apache.geode.internal.OSProcess;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
-import org.apache.geode.test.junit.categories.DistributedTest;
-
-@Category({DistributedTest.class, ClientSubscriptionTest.class})
-@SuppressWarnings("serial")
-public class Bug48879DUnitTest extends JUnit4DistributedTestCase {
-
-  private static VM vm0 = null;
-  private static VM vm1 = null;
-
-  private static GemFireCacheImpl cache;
-
-  public static final String REGION_NAME = "Bug48879DUnitTest_region";
-
-  public static final int SLEEP_TIME = 40000;
-
-  public Bug48879DUnitTest() {
-    super();
-  }
-
-  @Override
-  public final void postSetUp() throws Exception {
-    disconnectAllFromDS();
-    Host host = Host.getHost(0);
-    vm0 = host.getVM(0); // server1
-    vm1 = host.getVM(1); // server2
-
-    int port0 = (Integer) vm0.invoke(() -> Bug48879DUnitTest.createCacheServer());
-    int port1 = (Integer) vm1.invoke(() -> Bug48879DUnitTest.createCacheServer());
-
-    createClientCache(host, new Integer[] {port0, port1}, Boolean.TRUE);
-  }
-
-  @Override
-  public final void preTearDown() throws Exception {
-    closeCache();
-
-    vm0.invoke(() -> Bug48879DUnitTest.closeCache());
-    vm1.invoke(() -> Bug48879DUnitTest.closeCache());
-  }
-
-  public static void closeCache() throws Exception {
-    HARegionQueue.threadIdExpiryTime = 300;
-    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "MessageTimeToLive", "180");
-    if (cache != null) {
-      cache.close();
-    }
-  }
-
-  @SuppressWarnings({"unused", "deprecation"})
-  public static Integer createCacheServer() throws Exception {
-    Bug48879DUnitTest test = new Bug48879DUnitTest();
-    System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "MessageTimeToLive", "30");
-    cache = (GemFireCacheImpl) CacheFactory.create(test.getSystem());
-    HARegionQueue.threadIdExpiryTime = (SLEEP_TIME / 1000) - 10;
-    cache.setMessageSyncInterval(SLEEP_TIME / 500);
-
-    RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
-
-    Region<String, String> region = rf.create(REGION_NAME);
-
-    CacheServer server = cache.addCacheServer();
-    server.setPort(AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET));
-    server.start();
-    return server.getPort();
-  }
-
-  @SuppressWarnings("deprecation")
-  public static void createClientCache(Host host, Integer[] ports, Boolean doRI) throws Exception
{
-
-    Properties props = new Properties();
-    props.setProperty(STATISTIC_ARCHIVE_FILE, "client_" + OSProcess.getId() + ".gfs");
-    props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
-
-    DistributedSystem ds = new Bug48879DUnitTest().getSystem(props);
-    ds.disconnect();
-    ClientCacheFactory ccf = new ClientCacheFactory(props);
-    ccf.setPoolSubscriptionEnabled(doRI);
-    ccf.setPoolSubscriptionAckInterval(50);
-    ccf.setPoolSubscriptionRedundancy(1);
-    for (int port : ports) {
-      ccf.addPoolServer(host.getHostName(), port);
-    }
-    cache = (GemFireCacheImpl) ccf.create();
-
-    ClientRegionFactory<String, String> crf =
-        cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY);
-
-    Region<String, String> region = crf.create(REGION_NAME);
-
-    if (doRI) {
-      region.registerInterest("ALL_KEYS");
-    }
-
-  }
-
-  @SuppressWarnings({"unused", "unchecked"})
-  public static void doPuts(Integer numOfThreads, Integer puts) throws Exception {
-    Region<String, String> region = cache.getRegion(REGION_NAME);
-    final int putsPerThread = puts;
-
-    Thread[] threads = new Thread[numOfThreads];
-
-    for (int i = 0; i < numOfThreads; i++) {
-      final String threadId = "Thread_" + i + "X";
-      threads[i] = new Thread(new Runnable() {
-        @SuppressWarnings("rawtypes")
-        public void run() {
-          Region region = cache.getRegion(REGION_NAME);
-          for (int i = 0; i < putsPerThread; i++) {
-            region.put(threadId + i, "VALUE_" + i);
-          }
-        }
-      });
-      threads[i].start();
-    }
-
-    for (int i = 0; i < numOfThreads; i++) {
-      try {
-        threads[i].join();
-      } catch (InterruptedException ie) {
-      }
-    }
-    Thread.sleep(5000);
-    region.put("LAST", "LAST");
-  }
-
-  public static Boolean isPrimaryServer() {
-    return ((CacheClientProxy) CacheClientNotifier.getInstance().getClientProxies().toArray()[0])
-        .isPrimary();
-  }
-
-  public static void verifyStats(Integer numOfEvents, Integer expectedTids) throws Exception
{
-    HARegionQueueStats stats =
-        ((CacheClientProxy) CacheClientNotifier.getInstance().getClientProxies().toArray()[0])
-            .getHARegionQueue().getStatistics();
-
-    long actualExpiry = stats.getEventsExpired();
-    long expectedExpiry = isPrimaryServer() ? 0 : numOfEvents + 1; // +1 for LAST key
-    assertEquals(
-        "Expected eventsExpired: " + expectedExpiry + " but actual eventsExpired: " + actualExpiry
-            + (isPrimaryServer() ? " at primary." : " at secondary."),
-        expectedExpiry, actualExpiry);
-
-    int actualTids = stats.getThreadIdentiferCount();
-    assertTrue("Expected ThreadIdentifier count <= 1 but actual: " + actualTids
-        + (isPrimaryServer() ? " at primary." : " at secondary."), actualTids <= 1); //
Sometimes we
-                                                                                     // may
see 1
-                                                                                     // threadIdentifier
-                                                                                     // due
to slow
-                                                                                     // machines,
-                                                                                     // but
never
-                                                                                     // equal
to
-                                                                                     // expectedTids
-  }
-
-  public static void verifyThreadsBeforeExpiry(Integer expectedTids) throws Exception {
-    HARegionQueueStats stats =
-        ((CacheClientProxy) CacheClientNotifier.getInstance().getClientProxies().toArray()[0])
-            .getHARegionQueue().getStatistics();
-
-    int actualTids = stats.getThreadIdentiferCount();
-    assertTrue("Expected ThreadIdentifier count >= " + expectedTids + " but actual: "
+ actualTids
-        + (isPrimaryServer() ? " at primary." : " at secondary."), actualTids >= expectedTids);
-  }
-
-  @Test
-  public void testThreadIdentfiersExpiry() throws Exception {
-    // create server1 and server2
-    // create client with redundancy = 1
-    // put events in region
-    int threads = 10;
-    int putsPerThread = 1;
-    vm0.invoke(() -> Bug48879DUnitTest.doPuts(threads, putsPerThread));
-    vm0.invoke(() -> Bug48879DUnitTest.verifyThreadsBeforeExpiry(threads));
-    vm1.invoke(() -> Bug48879DUnitTest.verifyThreadsBeforeExpiry(threads));
-    // sleep till expiry time elapses
-    Thread.sleep(SLEEP_TIME * 2 + 30000);
-
-    // Assert that threadidentifiers are expired and region events are retained on primary
server
-    vm0.invoke(() -> Bug48879DUnitTest.verifyStats(threads * putsPerThread, threads));
-    // Assert that region events and threadidentifiers are expired on secondary server.
-    vm1.invoke(() -> Bug48879DUnitTest.verifyStats(threads * putsPerThread, threads));
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueExpiryRegressionTest.java
similarity index 95%
rename from geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueExpiryRegressionTest.java
index 668ea8b..ab0885b 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/ClientSubscriptionExpiryDataLossRegressionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueExpiryRegressionTest.java
@@ -19,6 +19,8 @@ import static org.apache.geode.cache30.ClientServerTestCase.configureConnectionP
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.apache.geode.internal.cache.tier.sockets.ConflationDUnitTest.setIsSlowStart;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY;
 import static org.apache.geode.test.dunit.Host.getHost;
 import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
 import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
@@ -66,7 +68,7 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
  * TRAC #36853: HA events can expire on primary server and this can cause data loss.
  */
 @Category({DistributedTest.class, ClientSubscriptionTest.class})
-public class ClientSubscriptionExpiryDataLossRegressionTest extends CacheTestCase {
+public class HARegionQueueExpiryRegressionTest extends CacheTestCase {
 
   /** The time in milliseconds by which the start of dispatcher will be delayed */
   private static final int DISPATCHER_SLOWSTART_TIME = 10_000;
@@ -129,7 +131,7 @@ public class ClientSubscriptionExpiryDataLossRegressionTest extends CacheTestCas
   }
 
   private int createServerCache() throws IOException {
-    System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME, String.valueOf(1));
+    System.setProperty(GEODE_PREFIX + HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY, String.valueOf(1));
     System.setProperty("slowStartTimeForTesting", String.valueOf(DISPATCHER_SLOWSTART_TIME));
 
     AttributesFactory factory = new AttributesFactory();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index bce4688..9723d93 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -15,6 +15,8 @@
 package org.apache.geode.internal.cache.ha;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.notNullValue;
@@ -1326,7 +1328,7 @@ public class HARegionQueueJUnitTest {
    */
   @Test
   public void testExpiryUsingSystemProperty() throws Exception {
-    System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME, "1");
+    System.setProperty(GEODE_PREFIX + HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY, "1");
 
     HARegionQueueAttributes haa = new HARegionQueueAttributes();
     HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName(), haa);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueThreadIdExpiryRegressionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueThreadIdExpiryRegressionTest.java
new file mode 100644
index 0000000..7f293c6
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueThreadIdExpiryRegressionTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.cache.client.ClientRegionShortcut.CACHING_PROXY;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.THREAD_ID_EXPIRY_TIME_PROPERTY;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+/**
+ * HARegionQueueThreadIdExpiryRegressionTest
+ * <p>
+ * TRAC #48879: Leaking ThreadIdentifiers and DispatchedAndCurrentEvents objects when client
uses
+ * many short lived threads
+ */
+@Category({DistributedTest.class, ClientSubscriptionTest.class})
+@SuppressWarnings("serial")
+public class HARegionQueueThreadIdExpiryRegressionTest implements Serializable {
+
+  private static final int MESSAGE_SYNC_INTERVAL_SECONDS = 60;
+  private static final String EXPIRY_TIME_SECONDS = "30";
+
+  private VM server1;
+  private VM server2;
+
+  private String regionName;
+  private String hostName;
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setUp() throws Exception {
+    server1 = getVM(0);
+    server2 = getVM(1);
+
+    regionName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    hostName = getHostName();
+
+    int port1 = server1.invoke(() -> createCacheServer());
+    int port2 = server2.invoke(() -> createCacheServer());
+
+    createClientCache(port1, port2);
+  }
+
+  @Test
+  public void testThreadIdentifiersExpiry() throws Exception {
+    int puts = 10;
+
+    server1.invoke(() -> doPuts(puts));
+
+    server1.invoke(() -> verifyThreadsBeforeExpiration(1));
+    server2.invoke(() -> verifyThreadsBeforeExpiration(1));
+
+    server1.invoke(() -> awaitToVerifyStatsAfterExpiration(puts));
+    server2.invoke(() -> awaitToVerifyStatsAfterExpiration(puts));
+  }
+
+  private int createCacheServer() throws IOException {
+    System.setProperty(GEODE_PREFIX + HA_REGION_QUEUE_EXPIRY_TIME_PROPERTY, EXPIRY_TIME_SECONDS);
+    System.setProperty(GEODE_PREFIX + THREAD_ID_EXPIRY_TIME_PROPERTY, EXPIRY_TIME_SECONDS);
+
+    cacheRule.createCache();
+    cacheRule.getCache().setMessageSyncInterval(MESSAGE_SYNC_INTERVAL_SECONDS);
+
+    RegionFactory<String, String> rf = cacheRule.getCache().createRegionFactory(REPLICATE);
+    rf.create(regionName);
+
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientCache(int port1, int port2) {
+    ClientCacheFactory ccf = new ClientCacheFactory();
+    ccf.setPoolSubscriptionEnabled(true);
+    ccf.setPoolSubscriptionAckInterval(50);
+    ccf.setPoolSubscriptionRedundancy(1);
+    ccf.addPoolServer(hostName, port1);
+    ccf.addPoolServer(hostName, port2);
+
+    clientCacheRule.createClientCache(ccf);
+
+    ClientRegionFactory<String, String> crf =
+        clientCacheRule.getClientCache().createClientRegionFactory(CACHING_PROXY);
+
+    Region<String, String> region = crf.create(regionName);
+    region.registerInterest("ALL_KEYS");
+  }
+
+  private void doPuts(int puts) {
+    Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+    for (int i = 1; i <= puts; i++) {
+      region.put("KEY-" + i, "VALUE-" + i);
+    }
+  }
+
+  private CacheClientProxy getCacheClientProxy() {
+    return CacheClientNotifier.getInstance().getClientProxies().iterator().next();
+  }
+
+  private boolean isPrimaryServer() {
+    return getCacheClientProxy().isPrimary();
+  }
+
+  private void verifyThreadsBeforeExpiration(int expectedThreadIds) {
+    HARegionQueueStats stats = getCacheClientProxy().getHARegionQueue().getStatistics();
+
+    int actualThreadIds = stats.getThreadIdentiferCount();
+
+    assertThat(actualThreadIds)
+        .as("Expected ThreadIdentifier count >= " + expectedThreadIds + " but actual:
"
+            + actualThreadIds + (isPrimaryServer() ? " at primary." : " at secondary."))
+        .isGreaterThanOrEqualTo(expectedThreadIds);
+  }
+
+  private void awaitToVerifyStatsAfterExpiration(int numOfEvents) {
+    await().atMost(2, MINUTES).until(() -> {
+      verifyStatsAfterExpiration(numOfEvents);
+    });
+  }
+
+  private void verifyStatsAfterExpiration(int numOfEvents) {
+    HARegionQueueStats stats = getCacheClientProxy().getHARegionQueue().getStatistics();
+
+    long actualEventsExpired = stats.getEventsExpired();
+    long expectedEventsExpired = isPrimaryServer() ? 0 : numOfEvents;
+
+    assertThat(actualEventsExpired)
+        .as("Expected eventsExpired: " + expectedEventsExpired + " but actual eventsExpired:
"
+            + actualEventsExpired + (isPrimaryServer() ? " at primary." : " at secondary."))
+        .isEqualTo(expectedEventsExpired);
+
+    int actualThreadIds = stats.getThreadIdentiferCount();
+
+    // Sometimes we may see 1 threadIdentifier due to slow machines, but never equal to
+    // expectedThreadIds
+    assertThat(actualThreadIds).as("Expected ThreadIdentifier count <= 1 but actual: "
+        + actualThreadIds + (isPrimaryServer() ? " at primary." : " at secondary.")).isEqualTo(0);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
klund@apache.org.

Mime
View raw message