geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [04/14] geode git commit: GEODE-2632: refactoring preparations for SecurityService and BaseCommand changes
Date Wed, 31 May 2017 23:12:52 GMT
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index 4028ab3..929093d 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -15,6 +15,8 @@
 package org.apache.geode.internal.cache.ha;
 
 import static org.apache.geode.distributed.ConfigurationProperties.*;
+import static org.hamcrest.CoreMatchers.*;
+import static org.hamcrest.number.OrderingComparison.*;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
@@ -23,101 +25,75 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.awaitility.Awaitility;
-
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TestName;
 
-import org.apache.geode.LogWriter;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.EntryEvent;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 /**
  * This is a test for the APIs of a HARegionQueue and verifies that the head, tail and size counters
  * are updated properly.
+ *
+ * TODO: need to rewrite a bunch of tests in HARegionQueueJUnitTest
  */
 @Category({IntegrationTest.class, ClientSubscriptionTest.class})
 public class HARegionQueueJUnitTest {
 
-  /** The cache instance */
-  protected InternalCache cache = null;
+  /** total number of threads doing put operations */
+  private static final int TOTAL_PUT_THREADS = 10;
 
-  /** Logger for this test */
-  protected LogWriter logger;
+  private static HARegionQueue hrqForTestSafeConflationRemoval;
+  private static List list1;
 
-  /** The <code>RegionQueue</code> instance */
-  protected HARegionQueue rq;
+  protected InternalCache cache;
+  private HARegionQueue haRegionQueue;
 
-  /** total number of threads doing put operations */
-  private static final int TOTAL_PUT_THREADS = 10;
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
 
-  boolean expiryCalled = false;
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
 
-  volatile boolean encounteredException = false;
-  boolean allowExpiryToProceed = false;
-  boolean complete = false;
+  @Rule
+  public TestName testName = new TestName();
 
   @Before
   public void setUp() throws Exception {
-    cache = createCache();
-    logger = cache.getLogger();
-    encounteredException = false;
+    this.cache = createCache();
   }
 
   @After
   public void tearDown() throws Exception {
-    cache.close();
-  }
-
-  /**
-   * Creates the cache instance for the test
-   */
-  private InternalCache createCache() throws CacheException {
-    return (InternalCache) new CacheFactory().set(MCAST_PORT, "0").create();
-  }
-
-  /**
-   * Creates HA region-queue object
-   */
-  private HARegionQueue createHARegionQueue(String name)
-      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache,
-        HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
-    return regionqueue;
-  }
-
-  /**
-   * Creates region-queue object
-   */
-  private HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
-      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-    HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs,
-        HARegionQueue.NON_BLOCKING_HA_QUEUE, false);
-    return regionqueue;
+    this.cache.close();
+    hrqForTestSafeConflationRemoval = null;
   }
 
   /**
@@ -129,14 +105,10 @@ public class HARegionQueueJUnitTest {
    */
   @Test
   public void testQueuePutWithoutConflation() throws Exception {
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation BEGIN");
-
-    rq = createHARegionQueue("testOfferNoConflation");
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
     int putPerProducer = 20;
     createAndRunProducers(false, false, false, putPerProducer);
-    assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size());
-
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation END");
+    assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS));
   }
 
   /**
@@ -149,14 +121,10 @@ public class HARegionQueueJUnitTest {
    */
   @Test
   public void testQueuePutWithConflation() throws Exception {
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation BEGIN");
-
-    rq = createHARegionQueue("testOfferConflation");
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
     int putPerProducer = 20;
     createAndRunProducers(true, false, true, putPerProducer);
-    assertEquals(putPerProducer, rq.size());
-
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation END");
+    assertThat(this.haRegionQueue.size(), is(putPerProducer));
   }
 
   /**
@@ -166,319 +134,150 @@ public class HARegionQueueJUnitTest {
    * 3)Wait till all put-threads complete their job <br>
    * 4)verify that the size of the queue is equal to the total number of puts done by one thread (as
    * rest of them will be duplicates and hence will be replaced)
-   *
-   * TODO:Dinesh : Work on optimizing the handling of receiving duplicate events
    */
   @Test
   public void testQueuePutWithDuplicates() throws Exception {
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates BEGIN");
-
-    rq = createHARegionQueue("testQueuePutWithDuplicates");
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
     int putPerProducer = 20;
-    // createAndRunProducers(false, true, true, putPerProducer);
-    /* Suyog: Only one thread can enter DACE at a time */
     createAndRunProducers(false, false, true, putPerProducer);
-    assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size());
-
-    logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates END");
-  }
-
-  /**
-   * Creates and runs the put threads which will create the conflatable objects and add them to the
-   * queue
-   * 
-   * @param generateSameKeys - if all the producers need to put objects with same set of keys
-   *        (needed for conflation testing)
-   * @param generateSameIds - if all the producers need to put objects with same set of ids (needed
-   *        for duplicates testing)
-   * @param conflationEnabled - true if all producers need to put objects with conflation enabled,
-   *        false otherwise.
-   * @param putPerProducer - number of objects offered to the queue by each producer
-   * @throws Exception - thrown if any problem occurs in test execution
-   */
-  private void createAndRunProducers(boolean generateSameKeys, boolean generateSameIds,
-      boolean conflationEnabled, int putPerProducer) throws Exception {
-    Producer[] putThreads = new Producer[TOTAL_PUT_THREADS];
-
-    int i = 0;
-
-    // Create the put-threads, each generating same/different set of ids/keys as
-    // per the parameters
-    for (i = 0; i < TOTAL_PUT_THREADS; i++) {
-      String keyPrefix = null;
-      long startId;
-      if (generateSameKeys) {
-        keyPrefix = "key";
-      } else {
-        keyPrefix = i + "key";
-      }
-      if (generateSameIds) {
-        startId = 1;
-      } else {
-        startId = i * 100000;
-      }
-      putThreads[i] =
-          new Producer("Producer-" + i, keyPrefix, startId, putPerProducer, conflationEnabled);
-    }
-
-    // start the put-threads
-    for (i = 0; i < TOTAL_PUT_THREADS; i++) {
-      putThreads[i].start();
-    }
-
-    // call join on the put-threads so that this thread waits till they complete
-    // before doing verfication
-    for (i = 0; i < TOTAL_PUT_THREADS; i++) {
-      ThreadUtils.join(putThreads[i], 30 * 1000);
-    }
-    assertFalse(encounteredException);
+    assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS));
   }
 
   /*
    * Test method for 'org.apache.geode.internal.cache.ha.HARegionQueue.addDispatchedMessage(Object)'
    */
   @Test
-  public void testAddDispatchedMessageObject() {
-    try {
-      // HARegionQueue haRegionQueue = new HARegionQueue("testing", cache);
-      HARegionQueue haRegionQueue = createHARegionQueue("testing");
-      assertTrue(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty());
-      // TODO:
-
-      haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1);
-      haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2);
+  public void testAddDispatchedMessageObject() throws Exception {
+    this.haRegionQueue = createHARegionQueue(this.testName.getMethodName());
+    assertThat(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true));
 
-      assertTrue(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty());
-      // HARegionQueue.getDispatchedMessagesMapForTesting().clear();
+    this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1);
+    this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2);
 
-    } catch (Exception e) {
-      throw new AssertionError("Test encountered an exception due to ", e);
-    }
+    assertThat(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true));
   }
 
   /**
    * tests the blocking peek functionality of BlockingHARegionQueue
    */
   @Test
-  public void testBlockQueue() {
-    exceptionInThread = false;
-    testFailed = false;
-    try {
-      final HARegionQueue bQ = HARegionQueue.getHARegionQueueInstance("testing", cache,
-          HARegionQueue.BLOCKING_HA_QUEUE, false);
-      Thread[] threads = new Thread[10];
-      final CyclicBarrier barrier = new CyclicBarrier(threads.length + 1);
-      for (int i = 0; i < threads.length; i++) {
-        threads[i] = new Thread() {
-          public void run() {
-            try {
-              barrier.await();
-              long startTime = System.currentTimeMillis();
-              Object obj = bQ.peek();
-              if (obj == null) {
-                testFailed = true;
-                message.append(
-                    " Failed :  failed since object was null and was not expected to be null \n");
-              }
-              long totalTime = System.currentTimeMillis() - startTime;
+  public void testBlockQueue() throws Exception {
+    HARegionQueue regionQueue = HARegionQueue.getHARegionQueueInstance(
+        this.testName.getMethodName(), this.cache, HARegionQueue.BLOCKING_HA_QUEUE, false);
+    Thread[] threads = new Thread[10];
+    int threadsLength = threads.length;
+    CyclicBarrier barrier = new CyclicBarrier(threadsLength + 1);
+
+    for (int i = 0; i < threadsLength; i++) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            barrier.await();
+            long startTime = System.currentTimeMillis();
+            Object obj = regionQueue.peek();
+            if (obj == null) {
+              errorCollector.addError(new AssertionError(
+                  "Failed :  failed since object was null and was not expected to be null"));
+            }
+            long totalTime = System.currentTimeMillis() - startTime;
 
-              if (totalTime < 2000) {
-                testFailed = true;
-                message
-                    .append(" Failed :  Expected time to be greater than 2000 but it is not so ");
-              }
-            } catch (Exception e) {
-              exceptionInThread = true;
-              exception = e;
+            if (totalTime < 2000) {
+              errorCollector.addError(new AssertionError(
+                  " Failed :  Expected time to be greater than 2000 but it is not so "));
             }
+          } catch (Exception e) {
+            errorCollector.addError(e);
           }
-        };
-
-      }
-
-      for (int k = 0; k < threads.length; k++) {
-        threads[k].start();
-      }
-      barrier.await();
-      Thread.sleep(5000);
-
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      bQ.put(new ConflatableObject("key", "value", id, false, "testing"));
-
-      long startTime = System.currentTimeMillis();
-      for (int k = 0; k < threads.length; k++) {
-        ThreadUtils.join(threads[k], 60 * 1000);
-      }
-
-      long totalTime = System.currentTimeMillis() - startTime;
-
-      if (totalTime >= 60000) {
-        fail(" Test taken too long ");
-      }
-
-      if (testFailed) {
-        fail(" test failed due to " + message);
-      }
-
-    } catch (Exception e) {
-      throw new AssertionError(" Test failed due to ", e);
+        }
+      };
     }
-  }
-
-  private static volatile int counter = 0;
-
-  protected boolean exceptionInThread = false;
-
-  protected boolean testFailed = false;
-
-  protected StringBuffer message = new StringBuffer();
-
-  protected Exception exception = null;
 
-  private synchronized int getCounter() {
-    return ++counter;
-  }
-
-  /**
-   * Thread to perform PUTs into the queue
-   */
-  class Producer extends Thread {
-    /** total number of puts by this thread */
-    long totalPuts = 0;
-
-    /** sleep between successive puts */
-    long sleeptime = 10;
-
-    /** prefix to keys of all objects put by this thread */
-    String keyPrefix;
+    for (Thread thread1 : threads) {
+      thread1.start();
+    }
 
-    /** startingId for sequence-ids of all objects put by this thread */
-    long startingId;
+    barrier.await();
 
-    /** name of this producer thread */
-    String producerName;
+    Thread.sleep(5000);
 
-    /**
-     * boolean to indicate whether this thread should create conflation enabled entries
-     */
-    boolean createConflatables;
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+    regionQueue
+        .put(new ConflatableObject("key", "value", id, false, this.testName.getMethodName()));
 
-    /**
-     * Constructor
-     * 
-     * @param name - name for this thread
-     * @param keyPrefix - prefix to keys of all objects put by this thread
-     * @param startingId - startingId for sequence-ids of all objects put by this thread
-     * @param totalPuts total number of puts by this thread
-     * @param createConflatableEvents - boolean to indicate whether this thread should create
-     *        conflation enabled entries
-     */
-    Producer(String name, String keyPrefix, long startingId, long totalPuts,
-        boolean createConflatableEvents) {
-      super(name);
-      this.producerName = name;
-      this.keyPrefix = keyPrefix;
-      this.startingId = startingId;
-      this.totalPuts = totalPuts;
-      this.createConflatables = createConflatableEvents;
-      setDaemon(true);
+    long startTime = System.currentTimeMillis();
+    for (Thread thread : threads) {
+      ThreadUtils.join(thread, 60 * 1000);
     }
 
-    /** Create Conflatable objects and put them into the Queue. */
-    @Override
-    public void run() {
-      if (producerName == null) {
-        producerName = Thread.currentThread().getName();
-      }
-      for (long i = 0; i < totalPuts; i++) {
-        String REGION_NAME = "test";
-        try {
-          ConflatableObject event = new ConflatableObject(keyPrefix + i, "val" + i,
-              new EventID(new byte[] {1}, startingId, startingId + i), createConflatables,
-              REGION_NAME);
-
-          logger.fine("putting for key =  " + keyPrefix + i);
-          rq.put(event);
-          Thread.sleep(sleeptime);
-        } catch (VirtualMachineError e) {
-          SystemFailure.initiateFailure(e);
-          throw e;
-        } catch (Throwable e) {
-          logger.severe("Exception while running Producer;continue running.", e);
-          encounteredException = true;
-          break;
-        }
-      }
-      logger.info(producerName + " :  Puts completed");
+    long totalTime = System.currentTimeMillis() - startTime;
+
+    if (totalTime >= 60000) {
+      fail(" Test taken too long ");
     }
   }
 
   /**
-   * tests whether expiry of entry in the regin queue occurs as expected
+   * tests whether expiry of entry in the region queue occurs as expected
    */
   @Test
-  public void testExpiryPositive()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testExpiryPositive() throws Exception {
     HARegionQueueAttributes haa = new HARegionQueueAttributes();
     haa.setExpiryTime(1);
-    HARegionQueue regionqueue = createHARegionQueue("testing", haa);
+
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
     long start = System.currentTimeMillis();
-    regionqueue.put(
-        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing"));
-    Map map = (Map) regionqueue.getConflationMapForTesting().get("testing");
+
+    regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+        this.testName.getMethodName()));
+
+    Map map = (Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName());
     waitAtLeast(1000, start, () -> {
-      assertEquals(Collections.EMPTY_MAP, map);
-      assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys());
+      assertThat(map, is(Collections.emptyMap()));
+      assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
     });
   }
 
   /**
-   * Wait until a given runnable stops throwing exceptions. It should take at least
-   * minimumElapsedTime after the supplied start time to happen.
-   *
-   * This is useful for validating that an entry doesn't expire until a certain amount of time has
-   * passed
-   */
-  protected void waitAtLeast(final int minimumElapsedTIme, final long start,
-      final Runnable runnable) {
-    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(runnable);
-    long elapsed = System.currentTimeMillis() - start;
-    assertTrue(elapsed >= minimumElapsedTIme);
-  }
-
-  /**
    * tests whether expiry of a conflated entry in the region queue occurs as expected
    */
   @Test
-  public void testExpiryPositiveWithConflation()
-      throws InterruptedException, IOException, ClassNotFoundException {
+  public void testExpiryPositiveWithConflation() throws Exception {
     HARegionQueueAttributes haa = new HARegionQueueAttributes();
     haa.setExpiryTime(1);
-    HARegionQueue regionqueue = createHARegionQueue("testing", haa);
+
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
     long start = System.currentTimeMillis();
-    regionqueue.put(
-        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing"));
-    regionqueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2),
-        true, "testing"));
-    assertTrue(
+
+    regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+        this.testName.getMethodName()));
+
+    regionQueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2),
+        true, this.testName.getMethodName()));
+
+    assertThat(
         " Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
-        !(regionqueue.size() == 0));
-    assertTrue(
+        !regionQueue.isEmpty(), is(true));
+    assertThat(
         " Expected the available id's size not  to be zero since expiry time has not  been exceeded but it is not so ",
-        !(regionqueue.getAvalaibleIds().size() == 0));
-    assertTrue(
+        !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+    assertThat(
         " Expected conflation map size not  to be zero since expiry time has not been exceeded but it is not so "
-            + ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key"))),
-        !((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key")) == null));
-    assertTrue(
+            + ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+                .get("key"),
+        ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+            .get("key"),
+        not(sameInstance(null)));
+    assertThat(
         " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
-        !(regionqueue.getEventsMapForTesting().size() == 0));
+        !regionQueue.getEventsMapForTesting().isEmpty(), is(true));
 
     waitAtLeast(1000, start, () -> {
-      assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys());
-      assertEquals(Collections.EMPTY_SET, regionqueue.getAvalaibleIds());
-      assertEquals(Collections.EMPTY_MAP, regionqueue.getConflationMapForTesting().get("testing"));
-      assertEquals(Collections.EMPTY_MAP, regionqueue.getEventsMapForTesting());
+      assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet()));
+      assertThat(regionQueue.getAvalaibleIds(), is(Collections.emptySet()));
+      assertThat(regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()),
+          is(Collections.emptyMap()));
+      assertThat(regionQueue.getEventsMapForTesting(), is(Collections.emptyMap()));
     });
   }
 
@@ -486,38 +285,37 @@ public class HARegionQueueJUnitTest {
    * tests a ThreadId not being expired if it was updated
    */
   @Test
-  public void testNoExpiryOfThreadId() {
-    try {
-      HARegionQueueAttributes haa = new HARegionQueueAttributes();
-      haa.setExpiryTime(45);
-      // RegionQueue regionqueue = new HARegionQueue("testing", cache, haa);
-      HARegionQueue regionqueue = createHARegionQueue("testing", haa);
-      EventID ev1 = new EventID(new byte[] {1}, 1, 1);
-      EventID ev2 = new EventID(new byte[] {1}, 1, 2);
-      Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
-      Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing");
-      regionqueue.put(cf1);
-      final long tailKey = regionqueue.tailKey.get();
-      regionqueue.put(cf2);
-      // Invalidate will trigger the expiration of the entry
-      // See HARegionQueue.createCacheListenerForHARegion
-      regionqueue.getRegion().invalidate(tailKey);
-      assertTrue(
-          " Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
-          !(regionqueue.size() == 0));
-      assertTrue(" Expected the available id's size not  to have counter 1 but it has ",
-          !(regionqueue.getAvalaibleIds().contains(new Long(1))));
-      assertTrue(" Expected the available id's size to have counter 2 but it does not have ",
-          (regionqueue.getAvalaibleIds().contains(new Long(2))));
-      assertTrue(" Expected eventID map not to have the first event, but it has",
-          !(regionqueue.getCurrentCounterSet(ev1).contains(new Long(1))));
-      assertTrue(" Expected eventID map to have the second event, but it does not",
-          (regionqueue.getCurrentCounterSet(ev2).contains(new Long(2))));
-    }
+  public void testNoExpiryOfThreadId() throws Exception {
+    HARegionQueueAttributes haa = new HARegionQueueAttributes();
+    haa.setExpiryTime(45);
 
-    catch (Exception e) {
-      throw new AssertionError("test failed due to ", e);
-    }
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa);
+    EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+    EventID ev2 = new EventID(new byte[] {1}, 1, 2);
+    Conflatable cf1 =
+        new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+    Conflatable cf2 =
+        new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName());
+
+    regionQueue.put(cf1);
+    long tailKey = regionQueue.tailKey.get();
+    regionQueue.put(cf2);
+
+    // Invalidate will trigger the expiration of the entry
+    // See HARegionQueue.createCacheListenerForHARegion
+    regionQueue.getRegion().invalidate(tailKey);
+
+    assertThat(
+        " Expected region size not to be zero since expiry time has not been exceeded but it is not so ",
+        !regionQueue.isEmpty(), is(true));
+    assertThat(" Expected the available id's size not  to have counter 1 but it has ",
+        !regionQueue.getAvalaibleIds().contains(1L), is(true));
+    assertThat(" Expected the available id's size to have counter 2 but it does not have ",
+        regionQueue.getAvalaibleIds().contains(2L), is(true));
+    assertThat(" Expected eventID map not to have the first event, but it has",
+        !regionQueue.getCurrentCounterSet(ev1).contains(1L), is(true));
+    assertThat(" Expected eventID map to have the second event, but it does not",
+        regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true));
   }
 
   /**
@@ -525,66 +323,64 @@ public class HARegionQueueJUnitTest {
    * being put in the queue
    */
   @Test
-  public void testQRMComingBeforeLocalPut() {
-    try {
-      // RegionQueue regionqueue = new HARegionQueue("testing", cache);
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      regionqueue.removeDispatchedEvents(id);
-      regionqueue.put(new ConflatableObject("key", "value", id, true, "testing"));
-      assertTrue(" Expected key to be null since QRM for the message id had already arrived ",
-          !regionqueue.getRegion().containsKey(new Long(1)));
-    } catch (Exception e) {
-      throw new AssertionError("test failed due to ", e);
-    }
+  public void testQRMComingBeforeLocalPut() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+
+    regionQueue.removeDispatchedEvents(id);
+    regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
+
+    assertThat(" Expected key to be null since QRM for the message id had already arrived ",
+        !regionQueue.getRegion().containsKey(1L), is(true));
   }
 
   /**
    * test verifies correct expiry of ThreadIdentifier in the HARQ if no corresponding put comes
    */
   @Test
-  public void testOnlyQRMComing() throws InterruptedException, IOException, ClassNotFoundException {
+  public void testOnlyQRMComing() throws Exception {
     HARegionQueueAttributes harqAttr = new HARegionQueueAttributes();
     harqAttr.setExpiryTime(1);
-    // RegionQueue regionqueue = new HARegionQueue("testing", cache, harqAttr);
-    HARegionQueue regionqueue = createHARegionQueue("testing", harqAttr);
+
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), harqAttr);
     EventID id = new EventID(new byte[] {1}, 1, 1);
     long start = System.currentTimeMillis();
-    regionqueue.removeDispatchedEvents(id);
-    assertTrue(" Expected testingID to be present since only QRM achieved ",
-        regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)));
+
+    regionQueue.removeDispatchedEvents(id);
+
+    assertThat(" Expected testingID to be present since only QRM achieved ",
+        regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)), is(true));
+
     waitAtLeast(1000, start,
-        () -> assertTrue(
+        () -> assertThat(
             " Expected testingID not to be present since it should have expired after 2.5 seconds",
-            !regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1))));
+            !regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)),
+            is(true)));
   }
 
   /**
    * test all relevant data structures are updated on a local put
    */
   @Test
-  public void testPutPath() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      Conflatable cf =
-          new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing");
-      regionqueue.put(cf);
-      assertTrue(" Expected region peek to return cf but it is not so ",
-          (regionqueue.peek().equals(cf)));
-      assertTrue(
-          " Expected the available id's size not  to be zero since expiry time has not  been exceeded but it is not so ",
-          !(regionqueue.getAvalaibleIds().size() == 0));
-      assertTrue(
-          " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ",
-          ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key"))
-              .equals(new Long(1))));
-      assertTrue(
-          " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
-          !(regionqueue.getEventsMapForTesting().size() == 0));
+  public void testPutPath() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    Conflatable cf = new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+        this.testName.getMethodName());
 
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
-    }
+    regionQueue.put(cf);
+
+    assertThat(" Expected region peek to return cf but it is not so ", regionQueue.peek(), is(cf));
+    assertThat(
+        " Expected the available id's size not  to be zero since expiry time has not  been exceeded but it is not so ",
+        !regionQueue.getAvalaibleIds().isEmpty(), is(true));
+    assertThat(
+        " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ",
+        ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+            .get("key"),
+        is(1L));
+    assertThat(
+        " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ",
+        !regionQueue.getEventsMapForTesting().isEmpty(), is(true));
   }
 
   /**
@@ -592,58 +388,64 @@ public class HARegionQueueJUnitTest {
    * there - verify the next five entries and their relevant data is present
    */
   @Test
-  public void testQRMDispatch() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      Conflatable[] cf = new Conflatable[10];
-      // put 10 conflatable objects
-      for (int i = 0; i < 10; i++) {
-        cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true,
-            "testing");
-        regionqueue.put(cf[i]);
-      }
-      // remove the first 5 by giving the right sequence id
-      regionqueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4));
-      // verify 1-5 not in region
-      for (long i = 1; i < 6; i++) {
-        assertTrue(!regionqueue.getRegion().containsKey(new Long(i)));
-      }
-      // verify 6-10 still in region queue
-      for (long i = 6; i < 11; i++) {
-        assertTrue(regionqueue.getRegion().containsKey(new Long(i)));
-      }
-      // verify 1-5 not in conflation map
-      for (long i = 0; i < 5; i++) {
-        assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing"))
-            .containsKey("key" + i));
-      }
-      // verify 6-10 in conflation map
-      for (long i = 5; i < 10; i++) {
-        assertTrue(
-            ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i));
-      }
+  public void testQRMDispatch() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    Conflatable[] cf = new Conflatable[10];
+
+    // put 10 conflatable objects
+    for (int i = 0; i < 10; i++) {
+      cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true,
+          this.testName.getMethodName());
+      regionQueue.put(cf[i]);
+    }
 
-      EventID eid = new EventID(new byte[] {1}, 1, 6);
-      // verify 1-5 not in eventMap
-      for (long i = 1; i < 6; i++) {
-        assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
-      // verify 6-10 in event Map
-      for (long i = 6; i < 11; i++) {
-        assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
+    // remove the first 5 by giving the right sequence id
+    regionQueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4));
 
-      // verify 1-5 not in available Id's map
-      for (long i = 1; i < 6; i++) {
-        assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
+    // verify 1-5 not in region
+    for (int i = 1; i < 6; i++) {
+      assertThat(!regionQueue.getRegion().containsKey((long) i), is(true));
+    }
 
-      // verify 6-10 in available id's map
-      for (long i = 6; i < 11; i++) {
-        assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
+    // verify 6-10 still in region queue
+    for (int i = 6; i < 11; i++) {
+      assertThat(regionQueue.getRegion().containsKey((long) i), is(true));
+    }
+
+    // verify 1-5 not in conflation map
+    for (int i = 0; i < 5; i++) {
+      assertThat(
+          !((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+              .containsKey("key" + i),
+          is(true));
+    }
+
+    // verify 6-10 in conflation map
+    for (int i = 5; i < 10; i++) {
+      assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+          .containsKey("key" + i), is(true));
+    }
+
+    EventID eid = new EventID(new byte[] {1}, 1, 6);
+
+    // verify 1-5 not in eventMap
+    for (int i = 1; i < 6; i++) {
+      assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+    }
+
+    // verify 6-10 in event Map
+    for (int i = 6; i < 11; i++) {
+      assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+    }
+
+    // verify 1-5 not in available Id's map
+    for (int i = 1; i < 6; i++) {
+      assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+    }
+
+    // verify 6-10 in available id's map
+    for (int i = 6; i < 11; i++) {
+      assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
     }
   }
 
@@ -652,68 +454,74 @@ public class HARegionQueueJUnitTest {
    * 1-7 not there - verify data for 8-10 is there
    */
   @Test
-  public void testQRMBeforePut() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
+  public void testQRMBeforePut() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
 
-      EventID[] ids = new EventID[10];
+    EventID[] ids = new EventID[10];
 
-      for (int i = 0; i < 10; i++) {
-        ids[i] = new EventID(new byte[] {1}, 1, i);
-      }
+    for (int i = 0; i < 10; i++) {
+      ids[i] = new EventID(new byte[] {1}, 1, i);
+    }
 
-      // first get the qrm message for the seventh id
-      regionqueue.removeDispatchedEvents(ids[6]);
-      Conflatable[] cf = new Conflatable[10];
-      // put 10 conflatable objects
-      for (int i = 0; i < 10; i++) {
-        cf[i] = new ConflatableObject("key" + i, "value", ids[i], true, "testing");
-        regionqueue.put(cf[i]);
-      }
+    // first get the qrm message for the seventh id
+    regionQueue.removeDispatchedEvents(ids[6]);
+    Conflatable[] cf = new Conflatable[10];
 
-      // verify 1-7 not in region
-      Set values = (Set) regionqueue.getRegion().values();
-      for (int i = 0; i < 7; i++) {
-        System.out.println(i);
-        assertTrue(!values.contains(cf[i]));
-      }
-      // verify 8-10 still in region queue
-      for (int i = 7; i < 10; i++) {
-        System.out.println(i);
-        assertTrue(values.contains(cf[i]));
-      }
-      // verify 1-8 not in conflation map
-      for (long i = 0; i < 7; i++) {
-        assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing"))
-            .containsKey("key" + i));
-      }
-      // verify 8-10 in conflation map
-      for (long i = 7; i < 10; i++) {
-        assertTrue(
-            ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i));
-      }
+    // put 10 conflatable objects
+    for (int i = 0; i < 10; i++) {
+      cf[i] =
+          new ConflatableObject("key" + i, "value", ids[i], true, this.testName.getMethodName());
+      regionQueue.put(cf[i]);
+    }
 
-      EventID eid = new EventID(new byte[] {1}, 1, 6);
-      // verify 1-7 not in eventMap
-      for (long i = 4; i < 11; i++) {
-        assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
-      // verify 8-10 in event Map
-      for (long i = 1; i < 4; i++) {
-        assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i)));
-      }
+    // verify 1-7 not in region
+    Set values = (Set) regionQueue.getRegion().values();
 
-      // verify 1-7 not in available Id's map
-      for (long i = 4; i < 11; i++) {
-        assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
+    for (int i = 0; i < 7; i++) {
+      System.out.println(i);
+      assertThat(!values.contains(cf[i]), is(true));
+    }
 
-      // verify 8-10 in available id's map
-      for (long i = 1; i < 4; i++) {
-        assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i)));
-      }
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
+    // verify 8-10 still in region queue
+    for (int i = 7; i < 10; i++) {
+      System.out.println(i);
+      assertThat(values.contains(cf[i]), is(true));
+    }
+
+    // verify 1-8 not in conflation map
+    for (int i = 0; i < 7; i++) {
+      assertThat(
+          !((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+              .containsKey("key" + i),
+          is(true));
+    }
+
+    // verify 8-10 in conflation map
+    for (int i = 7; i < 10; i++) {
+      assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()))
+          .containsKey("key" + i), is(true));
+    }
+
+    EventID eid = new EventID(new byte[] {1}, 1, 6);
+
+    // verify 1-7 not in eventMap
+    for (int i = 4; i < 11; i++) {
+      assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+    }
+
+    // verify 8-10 in event Map
+    for (int i = 1; i < 4; i++) {
+      assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true));
+    }
+
+    // verify 1-7 not in available Id's map
+    for (int i = 4; i < 11; i++) {
+      assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true));
+    }
+
+    // verify 8-10 in available id's map
+    for (int i = 1; i < 4; i++) {
+      assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true));
     }
   }
 
@@ -721,33 +529,33 @@ public class HARegionQueueJUnitTest {
    * test to verify conflation happens as expected
    */
   @Test
-  public void testConflation() {
-    try {
-      HARegionQueue regionqueue = createHARegionQueue("testing");
-      EventID ev1 = new EventID(new byte[] {1}, 1, 1);
-      EventID ev2 = new EventID(new byte[] {1}, 2, 2);
-      Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
-      Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing");
-      regionqueue.put(cf1);
-      Map conflationMap = regionqueue.getConflationMapForTesting();
-      assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(1)));
-      regionqueue.put(cf2);
-      // verify the conflation map has recorded the new key
-      assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(2)));
-      // the old key should not be present
-      assertTrue(!regionqueue.getRegion().containsKey(new Long(1)));
-      // available ids should not contain the old id (the old position)
-      assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(1)));
-      // available id should have the new id (the new position)
-      assertTrue(regionqueue.getAvalaibleIds().contains(new Long(2)));
-      // events map should not contain the old position
-      assertTrue(regionqueue.getCurrentCounterSet(ev1).isEmpty());
-      // events map should contain the new position
-      assertTrue(regionqueue.getCurrentCounterSet(ev2).contains(new Long(2)));
-
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
-    }
+  public void testConflation() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName());
+    EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+    EventID ev2 = new EventID(new byte[] {1}, 2, 2);
+    Conflatable cf1 =
+        new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+    Conflatable cf2 =
+        new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName());
+    regionQueue.put(cf1);
+
+    Map conflationMap = regionQueue.getConflationMapForTesting();
+    assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(1L));
+
+    regionQueue.put(cf2);
+
+    // verify the conflation map has recorded the new key
+    assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(2L));
+    // the old key should not be present
+    assertThat(!regionQueue.getRegion().containsKey(1L), is(true));
+    // available ids should not contain the old id (the old position)
+    assertThat(!regionQueue.getAvalaibleIds().contains(1L), is(true));
+    // available id should have the new id (the new position)
+    assertThat(regionQueue.getAvalaibleIds().contains(2L), is(true));
+    // events map should not contain the old position
+    assertThat(regionQueue.getCurrentCounterSet(ev1).isEmpty(), is(true));
+    // events map should contain the new position
+    assertThat(regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true));
   }
 
   /**
@@ -755,97 +563,58 @@ public class HARegionQueueJUnitTest {
    * events which are of ID greater than that contained in QRM should stay
    */
   @Test
-  public void testQRM() {
-    try {
-      RegionQueue regionqueue = createHARegionQueue("testing");
-      for (int i = 0; i < 10; ++i) {
-        regionqueue.put(new ConflatableObject("key" + (i + 1), "value",
-            new EventID(new byte[] {1}, 1, i + 1), true, "testing"));
-      }
-      EventID qrmID = new EventID(new byte[] {1}, 1, 5);
-      ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID);
-      Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
-      assertTrue(((Map) (conflationMap.get("testing"))).size() == 5);
-
-      Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
-      Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
-      assertTrue(availableIDs.size() == 5);
-      assertTrue(counters.size() == 5);
-      for (int i = 5; i < 10; ++i) {
-        assertTrue(((Map) (conflationMap.get("testing"))).containsKey("key" + (i + 1)));
-        assertTrue(availableIDs.contains(new Long((i + 1))));
-        assertTrue(counters.contains(new Long((i + 1))));
-      }
-      Region rgn = ((HARegionQueue) regionqueue).getRegion();
-      assertTrue(rgn.keySet().size() == 6);
+  public void testQRM() throws Exception {
+    RegionQueue regionqueue = createHARegionQueue(this.testName.getMethodName());
 
-    } catch (Exception e) {
-      throw new AssertionError("Exception occurred in test due to ", e);
+    for (int i = 0; i < 10; ++i) {
+      regionqueue.put(new ConflatableObject("key" + (i + 1), "value",
+          new EventID(new byte[] {1}, 1, i + 1), true, this.testName.getMethodName()));
     }
-  }
 
-  protected static HARegionQueue hrqFortestSafeConflationRemoval;
+    EventID qrmID = new EventID(new byte[] {1}, 1, 5);
+    ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID);
+    Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting();
+    assertThat(((Map) conflationMap.get(this.testName.getMethodName())).size(), is(5));
 
-  /**
-   * This test tests safe removal from the conflation map. i.e operations should only remove old
-   * values and not the latest value
-   */
-  @Test
-  public void testSafeConflationRemoval() {
-    try {
-      hrqFortestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval",
+    Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds();
+    Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID);
 
-          cache, this);
-      Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1),
-          true, "testSafeConflationRemoval");
-      hrqFortestSafeConflationRemoval.put(cf1);
-      hrqFortestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1));
-      Map map =
-
-          (Map) hrqFortestSafeConflationRemoval.getConflationMapForTesting()
-              .get("testSafeConflationRemoval");
-      assertTrue(
-          "Expected the counter to be 2 since it should not have been deleted but it is not so ",
-          map.get("key1").equals(new Long(2)));
-      hrqFortestSafeConflationRemoval = null;
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to ", e);
+    assertThat(availableIDs.size(), is(5));
+    assertThat(counters.size(), is(5));
+
+    for (int i = 5; i < 10; ++i) {
+      assertThat(
+          ((Map) (conflationMap.get(this.testName.getMethodName()))).containsKey("key" + (i + 1)),
+          is(true));
+      assertThat(availableIDs.contains((long) (i + 1)), is(true));
+      assertThat(counters.contains((long) (i + 1)), is(true));
     }
+
+    Region rgn = ((HARegionQueue) regionqueue).getRegion();
+    assertThat(rgn.keySet().size(), is(6));
   }
 
   /**
-   * Extends HARegionQueue for testing purposes. used by testSafeConflationRemoval
+   * This test tests safe removal from the conflation map. i.e operations should only remove old
+   * values and not the latest value
    */
-  static class HARQTestClass extends HARegionQueue.TestOnlyHARegionQueue {
+  @Test
+  public void testSafeConflationRemoval() throws Exception {
+    hrqForTestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval", this.cache);
+    Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1),
+        true, "testSafeConflationRemoval");
 
-    public HARQTestClass(String REGION_NAME, InternalCache cache, HARegionQueueJUnitTest test)
-        throws IOException, ClassNotFoundException, CacheException, InterruptedException {
-      super(REGION_NAME, cache);
-    }
+    hrqForTestSafeConflationRemoval.put(cf1);
+    hrqForTestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1));
 
-    ConcurrentMap createConcurrentMap() {
-      return new ConcHashMap();
-    }
-  }
+    Map map = (Map) hrqForTestSafeConflationRemoval.getConflationMapForTesting()
+        .get("testSafeConflationRemoval");
 
-  /**
-   * Used to override the remove method for testSafeConflationRemoval
-   */
-  static class ConcHashMap extends ConcurrentHashMap implements ConcurrentMap {
-    public boolean remove(Object arg0, Object arg1) {
-      Conflatable cf2 = new ConflatableObject("key1", "value2", new EventID(new byte[] {1}, 1, 2),
-          true, "testSafeConflationRemoval");
-      try {
-        hrqFortestSafeConflationRemoval.put(cf2);
-      } catch (Exception e) {
-        throw new AssertionError("Exception occurred in trying to put ", e);
-      }
-      return super.remove(arg0, arg1);
-    }
+    assertThat(
+        "Expected the counter to be 2 since it should not have been deleted but it is not so ",
+        map.get("key1"), is(2L));
   }
 
-  static List list1;
-
   /**
    * This test tests remove operation is causing the insertion of sequence ID for existing
    * ThreadIdentifier object and concurrently the QRM thread is iterating over the Map to form the
@@ -864,80 +633,86 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that all the sequence should be greater than x
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() {
-    try {
-      final long numberOfIterations = 1000;
-      final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval");
-      HARegionQueue.stopQRMThread();
-      final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids[i] = new ThreadIdentifier(new byte[] {1}, i);
-        hrq.addDispatchedMessage(ids[i], i);
-      }
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = false;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+  public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() throws Exception {
+    long numberOfIterations = 1000;
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName());
+    HARegionQueue.stopQRMThread();
+    ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
+
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+      hrq.addDispatchedMessage(ids[i], i);
+    }
+
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
         }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
-      iterator = list2.iterator();
-      doOnce = false;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+    };
+
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
         }
       }
-      iterator = map.values().iterator();
-      Long max = new Long(numberOfIterations);
-      Long next;
-      while (iterator.hasNext()) {
-        next = ((Long) iterator.next());
-        assertTrue(" Expected all the sequence ID's to be greater than " + max
-            + " but it is not so. Got sequence id " + next, next.compareTo(max) >= 0);
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = false;
+    EventID id;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(new Long(id.getThreadID()), id.getSequenceID());
+      }
+    }
+
+    iterator = list2.iterator();
+    doOnce = false;
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(id.getThreadID(), id.getSequenceID());
       }
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to : ", e);
+    }
+
+    iterator = map.values().iterator();
+    Long max = numberOfIterations;
+    while (iterator.hasNext()) {
+      Long next = (Long) iterator.next();
+      assertThat(
+          " Expected all the sequence ID's to be greater than " + max
+              + " but it is not so. Got sequence id " + next,
+          next.compareTo(max), greaterThanOrEqualTo(0));
     }
   }
 
@@ -958,77 +733,81 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that the map size should be 2x
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() {
-    try {
-      final long numberOfIterations = 1000;
-      final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval");
-      HARegionQueue.stopQRMThread();
-      final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids[i] = new ThreadIdentifier(new byte[] {1}, i);
-        hrq.addDispatchedMessage(ids[i], i);
-      }
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
-            hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = false;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+  public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() throws Exception {
+    int numberOfIterations = 1000;
+    HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName());
+    HARegionQueue.stopQRMThread();
+    ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
+
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+      hrq.addDispatchedMessage(ids[i], i);
+    }
+
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
         }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
-      iterator = list2.iterator();
-      doOnce = false;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next();
-          iterator.next();
-          doOnce = true;
-        } else {
-          id = (EventID) iterator.next();
-          map.put(new Long(id.getThreadID()), new Long(id.getSequenceID()));
+    };
+
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
+          hrq.addDispatchedMessage(ids[i], i + numberOfIterations);
         }
       }
-      assertTrue(
-          " Expected the map size to be " + (2 * numberOfIterations) + " but it is " + map.size(),
-          map.size() == (2 * numberOfIterations));
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to an unexpected exception : ", e);
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = false;
+    EventID id;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(id.getThreadID(), id.getSequenceID());
+      }
+    }
+
+    iterator = list2.iterator();
+    doOnce = false;
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next();
+        iterator.next();
+        doOnce = true;
+      } else {
+        id = (EventID) iterator.next();
+        map.put(id.getThreadID(), id.getSequenceID());
+      }
     }
+    assertThat(
+        " Expected the map size to be " + 2 * numberOfIterations + " but it is " + map.size(),
+        map.size(), is(2 * numberOfIterations));
   }
 
   /**
@@ -1050,101 +829,96 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that a total of x entries are present in the map
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() {
-    try {
-      final long numberOfIterations = 10000;
-      final HARegionQueue hrq1 = createHARegionQueue("testConcurrentDispatcherAndRemoval1");
-
-      final HARegionQueue hrq2 = createHARegionQueue("testConcurrentDispatcherAndRemoval2");
+  public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() throws Exception {
+    int numberOfIterations = 10000;
+    HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1");
+    HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2");
+    HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3");
+    HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4");
+    HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5");
 
-      final HARegionQueue hrq3 = createHARegionQueue("testConcurrentDispatcherAndRemoval3");
+    HARegionQueue.stopQRMThread();
 
-      final HARegionQueue hrq4 = createHARegionQueue("testConcurrentDispatcherAndRemoval4");
+    ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
 
-      final HARegionQueue hrq5 = createHARegionQueue("testConcurrentDispatcherAndRemoval5");
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids[i] = new ThreadIdentifier(new byte[] {1}, i);
+      hrq1.addDispatchedMessage(ids[i], i);
+      hrq2.addDispatchedMessage(ids[i], i);
 
-      HARegionQueue.stopQRMThread();
-      final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations];
-
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids[i] = new ThreadIdentifier(new byte[] {1}, i);
-        hrq1.addDispatchedMessage(ids[i], i);
-        hrq2.addDispatchedMessage(ids[i], i);
+    }
 
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
+    };
 
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            hrq3.addDispatchedMessage(ids[i], i);
-            hrq4.addDispatchedMessage(ids[i], i);
-            hrq5.addDispatchedMessage(ids[i], i);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = true;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          for (int i = 0; i < size; i++) {
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          hrq3.addDispatchedMessage(ids[i], i);
+          hrq4.addDispatchedMessage(ids[i], i);
+          hrq5.addDispatchedMessage(ids[i], i);
         }
       }
-
-      iterator = list2.iterator();
-      doOnce = true;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          for (int i = 0; i < size; i++) {
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = true;
+    EventID id;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next();// region name;
+        int size = (Integer) iterator.next();
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
         }
       }
-      assertTrue(
-          " Expected the map size to be " + (numberOfIterations) + " but it is " + map.size(),
-          map.size() == (numberOfIterations));
+    }
 
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to : ", e);
+    iterator = list2.iterator();
+    doOnce = true;
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next();// region name;
+        int size = (Integer) iterator.next();
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
+        }
+      }
     }
+
+    assertThat(" Expected the map size to be " + numberOfIterations + " but it is " + map.size(),
+        map.size(), is(numberOfIterations));
   }
 
   /**
@@ -1168,203 +942,179 @@ public class HARegionQueueJUnitTest {
    * It is then verified to see that the map size should be 2x * number of regions
    */
   @Test
-  public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId() {
-    try {
-      final long numberOfIterations = 1000;
-      final HARegionQueue hrq1 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval1");
-
-      final HARegionQueue hrq2 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval2");
-      final HARegionQueue hrq3 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval3");
-      final HARegionQueue hrq4 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval4");
-      final HARegionQueue hrq5 =
-
-          createHARegionQueue("testConcurrentDispatcherAndRemoval5");
-
-      HARegionQueue.stopQRMThread();
-
-      final ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations];
-      final ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations];
+  public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId()
+      throws Exception {
+    int numberOfIterations = 1000;
+    HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1");
+    HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2");
+    HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3");
+    HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4");
+    HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5");
+
+    HARegionQueue.stopQRMThread();
+
+    ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations];
+    ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations];
+
+    for (int i = 0; i < numberOfIterations; i++) {
+      ids1[i] = new ThreadIdentifier(new byte[] {1}, i);
+      ids2[i] = new ThreadIdentifier(new byte[] {2}, i);
+      ids3[i] = new ThreadIdentifier(new byte[] {3}, i);
+      ids4[i] = new ThreadIdentifier(new byte[] {4}, i);
+      ids5[i] = new ThreadIdentifier(new byte[] {5}, i);
+      hrq1.addDispatchedMessage(ids1[i], i);
+      hrq2.addDispatchedMessage(ids2[i], i);
+      hrq3.addDispatchedMessage(ids3[i], i);
+      hrq4.addDispatchedMessage(ids4[i], i);
+      hrq5.addDispatchedMessage(ids5[i], i);
+    }
 
-      for (int i = 0; i < numberOfIterations; i++) {
-        ids1[i] = new ThreadIdentifier(new byte[] {1}, i);
-        ids2[i] = new ThreadIdentifier(new byte[] {2}, i);
-        ids3[i] = new ThreadIdentifier(new byte[] {3}, i);
-        ids4[i] = new ThreadIdentifier(new byte[] {4}, i);
-        ids5[i] = new ThreadIdentifier(new byte[] {5}, i);
-        hrq1.addDispatchedMessage(ids1[i], i);
-        hrq2.addDispatchedMessage(ids2[i], i);
-        hrq3.addDispatchedMessage(ids3[i], i);
-        hrq4.addDispatchedMessage(ids4[i], i);
-        hrq5.addDispatchedMessage(ids5[i], i);
+    Thread thread1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(600);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        list1 = HARegionQueue.createMessageListForTesting();
       }
+    };
 
-      Thread thread1 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(600);
-          } catch (InterruptedException e) {
-            fail("interrupted");
-          }
-          list1 = HARegionQueue.createMessageListForTesting();
-        };
-      };
-      Thread thread2 = new Thread() {
-        public void run() {
-          try {
-            Thread.sleep(480);
-          } catch (InterruptedException e) {
-            fail("Interrupted");
-          }
-          for (int i = 0; i < numberOfIterations; i++) {
-            ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
-            ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations);
-            ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations);
-            ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations);
-            ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations);
-
-            hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations);
-            hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations);
-            hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations);
-            hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations);
-            hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations);
-          }
-        };
-      };
-      thread1.start();
-      thread2.start();
-      ThreadUtils.join(thread1, 30 * 1000);
-      ThreadUtils.join(thread2, 30 * 1000);
-      List list2 = HARegionQueue.createMessageListForTesting();
-      Iterator iterator = list1.iterator();
-      boolean doOnce = true;
-      EventID id = null;
-      Map map = new HashMap();
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          System.out.println(" size of list 1 iteration x " + size);
-          for (int i = 0; i < size; i++) {
-
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    Thread thread2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(480);
+        } catch (InterruptedException e) {
+          errorCollector.addError(e);
+        }
+        for (int i = 0; i < numberOfIterations; i++) {
+          ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations);
+          ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations);
+          ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations);
+          ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations);
+          ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations);
+
+          hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations);
+          hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations);
+          hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations);
+          hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations);
+          hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations);
         }
       }
-
-      iterator = list2.iterator();
-      doOnce = true;
-      id = null;
-      while (iterator.hasNext()) {
-        if (!doOnce) {
-          iterator.next(); // read the total message size
-          doOnce = true;
-        } else {
-          iterator.next();// region name;
-          int size = ((Integer) iterator.next()).intValue();
-          System.out.println(" size of list 2 iteration x " + size);
-          for (int i = 0; i < size; i++) {
-            id = (EventID) iterator.next();
-            map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()),
-                new Long(id.getSequenceID()));
-          }
+    };
+
+    thread1.start();
+    thread2.start();
+    ThreadUtils.join(thread1, 30 * 1000);
+    ThreadUtils.join(thread2, 30 * 1000);
+    List list2 = HARegionQueue.createMessageListForTesting();
+    Iterator iterator = list1.iterator();
+    boolean doOnce = true;
+    EventID id = null;
+    Map map = new HashMap();
+
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next(); // region name;
+        int size = (Integer) iterator.next();
+        System.out.println(" size of list 1 iteration x " + size);
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
         }
       }
+    }
 
-      assertTrue(" Expected the map size to be " + (numberOfIterations * 2 * 5) + " but it is "
-          + map.size(), map.size() == (numberOfIterations * 2 * 5));
+    iterator = list2.iterator();
+    doOnce = true;
 
-    } catch (Exception e) {
-      throw new AssertionError("Test failed due to : ", e);
+    while (iterator.hasNext()) {
+      if (!doOnce) {
+        iterator.next(); // read the total message size
+        doOnce = true;
+      } else {
+        iterator.next(); // region name;
+        int size = (Integer) iterator.next();
+        System.out.println(" size of list 2 iteration x " + size);
+        for (int i = 0; i < size; i++) {
+          id = (EventID) iterator.next();
+          map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID());
+        }
+      }
     }
+
+    assertThat(
+        " Expected the map size to be " + numberOfIterations * 2 * 5 + " but it is " + map.size(),
+        map.size(), is(numberOfIterations * 2 * 5));
   }
 
   /**
-   * Concurrent Peek on Blokcing Queue waiting with for a Put . If concurrent take is also happening
+   * Concurrent Peek on Blocking Queue waiting with for a Put . If concurrent take is also happening
    * such that the object is removed first then the peek should block & not return with null.
    */
   @Test
-  public void testBlockingQueueForConcurrentPeekAndTake() {
-    exceptionInThread = false;
-    testFailed = false;
-    try {
-      final TestBlockingHARegionQueue bQ =
-          new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", cache);
-      Thread[] threads = new Thread[3];
-      for (int i = 0; i < 3; i++) {
-        threads[i] = new Thread() {
-          public void run() {
-            try {
-              long startTime = System.currentTimeMillis();
-              Object obj = bQ.peek();
-              if (obj == null) {
-                testFailed = true;
-                message.append(
-                    " Failed :  failed since object was null and was not expected to be null \n");
-              }
-              long totalTime = System.currentTimeMillis() - startTime;
+  public void testBlockingQueueForConcurrentPeekAndTake() throws Exception {
+    TestBlockingHARegionQueue regionQueue =
+        new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", this.cache);
+    Thread[] threads = new Thread[3];
+
+    for (int i = 0; i < 3; i++) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            long startTime = System.currentTimeMillis();
+            Object obj = regionQueue.peek();
+            if (obj == null) {
+              errorCollector.addError(new AssertionError(
+                  "Failed :  failed since object was null and was not expected to be null"));
+            }
+            long totalTime = System.currentTimeMillis() - startTime;
 
-              if (totalTime < 4000) {
-                testFailed = true;
-                message
-                    .append(" Failed :  Expected time to be greater than 4000 but it is not so ");
-              }
-            } catch (Exception e) {
-              exceptionInThread = true;
-              exception = e;
+            if (totalTime < 4000) {
+              errorCollector.addError(new AssertionError(
+                  "Failed :  Expected time to be greater than 4000 but it is not so"));
             }
+          } catch (Exception e) {
+            errorCollector.addError(e);
           }
-        };
-
-      }
-
-      for (int k = 0; k < 3; k++) {
-        threads[k].start();
-      }
-      Thread.sleep(4000);
-
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      EventID id1 = new EventID(new byte[] {1}, 1, 2);
+        }
+      };
+    }
 
-      bQ.takeFirst = true;
-      bQ.put(new ConflatableObject("key", "value", id, true, "testing"));
+    for (int k = 0; k < 3; k++) {
+      threads[k].start();
+    }
 
-      Thread.sleep(2000);
+    Thread.sleep(4000);
 
-      bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing"));
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+    EventID id1 = new EventID(new byte[] {1}, 1, 2);
 
-      long startTime = System.currentTimeMillis();
-      for (int k = 0; k < 3; k++) {
-        ThreadUtils.join(threads[k], 180 * 1000);
-      }
+    regionQueue.takeFirst = true;
+    regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
 
-      long totalTime = System.currentTimeMillis() - startTime;
+    Thread.sleep(2000);
 
-      if (totalTime >= 180000) {
-        fail(" Test taken too long ");
-      }
+    regionQueue
+        .put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName()));
 
-      if (testFailed) {
-        fail(" test failed due to " + message);
-      }
+    long startTime = System.currentTimeMillis();
+    for (int k = 0; k < 3; k++) {
+      ThreadUtils.join(threads[k], 180 * 1000);
+    }
 
-    } catch (Exception e) {
-      throw new AssertionError(" Test failed due to ", e);
+    long totalTime = System.currentTimeMillis() - startTime;
+    if (totalTime >= 180000) {
+      fail(" Test taken too long ");
     }
   }
 
@@ -1373,71 +1123,60 @@ public class HARegionQueueJUnitTest {
    * QRM thread , the peek should block correctly.
    */
   @Test
-  public void testBlockingQueueForTakeWhenPeekInProgress() {
-    exceptionInThread = false;
-    testFailed = false;
-    try {
-      final TestBlockingHARegionQueue bQ =
-          new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", cache);
-      Thread[] threads = new Thread[3];
-      for (int i = 0; i < 3; i++) {
-        threads[i] = new Thread() {
-          public void run() {
-            try {
-              long startTime = System.currentTimeMillis();
-              Object obj = bQ.peek();
-              if (obj == null) {
-                testFailed = true;
-                message.append(
-                    " Failed :  failed since object was null and was not expected to be null \n");
-              }
-              long totalTime = System.currentTimeMillis() - startTime;
+  public void testBlockingQueueForTakeWhenPeekInProgress() throws Exception {
+    TestBlockingHARegionQueue regionQueue =
+        new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", this.cache);
+    Thread[] threads = new Thread[3];
+
+    for (int i = 0; i < 3; i++) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          try {
+            long startTime = System.currentTimeMillis();
+            Object obj = regionQueue.peek();
+            if (obj == null) {
+              errorCollector.addError(new AssertionError(
+                  "Failed :  failed since object was null and was not expected to be null"));
+            }
+            long totalTime = System.currentTimeMillis() - startTime;
 
-              if (totalTime < 4000) {
-                testFailed = true;
-                message
-                    .append(" Failed :  Expected time to be greater than 4000 but it is not so ");
-              }
-            } catch (Exception e) {
-              exceptionInThread = true;
-              exception = e;
+            if (totalTime < 4000) {
+              errorCollector.addError(new AssertionError(
+                  "Failed :  Expected time to be greater than 4000 but it is not so"));
             }
+          } catch (Exception e) {
+            errorCollector.addError(e);
           }
-        };
-      }
-
-      for (int k = 0; k < 3; k++) {
-        threads[k].start();
-      }
-      Thread.sleep(4000);
-
-      EventID id = new EventID(new byte[] {1}, 1, 1);
-      EventID id1 = new EventID(new byte[] {1}, 1, 2);
+        }
+      };
+    }
 
-      bQ.takeWhenPeekInProgress = true;
-      bQ.put(new ConflatableObject("key", "value", id, true, "testing"));
+    for (int k = 0; k < 3; k++) {
+      threads[k].start();
+    }
 
-      Thread.sleep(2000);
+    Thread.sleep(4000);
 
-      bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing"));
+    EventID id = new EventID(new byte[] {1}, 1, 1);
+    EventID id1 = new EventID(new byte[] {1}, 1, 2);
 
-      long startTime = System.currentTimeMillis();
-      for (int k = 0; k < 3; k++) {
-        ThreadUtils.join(threads[k], 60 * 1000);
-      }
+    regionQueue.takeWhenPeekInProgress = true;
+    regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName()));
 
-      long totalTime = System.currentTimeMillis() - startTime;
+    Thread.sleep(2000);
 
-      if (totalTime >= 60000) {
-        fail(" Test taken too long ");
-      }
+    regionQueue
+        .put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName()));
 
-      if (testFailed) {
-        fail(" test failed due to " + message);
-      }
+    long startTime = System.currentTimeMillis();
+    for (int k = 0; k < 3; k++) {
+      ThreadUtils.join(threads[k], 60 * 1000);
+    }
 
-    } catch (Exception e) {
-      throw new AssertionError(" Test failed due to ", e);
+    long totalTime = System.currentTimeMillis() - startTime;
+    if (totalTime >= 60000) {
+      fail(" Test taken too long ");
     }
   }
 
@@ -1451,138 +1190,88 @@ public class HARegionQueueJUnitTest {
    * violation. This test will validate that behaviour
    */
   @Test
-  public void testConcurrentEventExpiryAndTake() {
-    try {
-      HARegionQueueAttributes haa = new HARegionQueueAttributes();
-      haa.setExpiryTime(3);
-      final RegionQueue regionqueue =
-          new HARegionQueue.TestOnlyHARegionQueue("testing", cache, haa) {
-            CacheListener createCacheListenerForHARegion() {
+  public void testConcurrentEventExpiryAndTake() throws Exception {
+    AtomicBoolean complete = new AtomicBoolean(false);
+    AtomicBoolean expiryCalled = new AtomicBoolean(false);
+    AtomicBoolean allowExpiryToProceed = new AtomicBoolean(false);
 
-              return new CacheListenerAdapter() {
+    HARegionQueueAttributes haa = new HARegionQueueAttributes();
+    haa.setExpiryTime(3);
 
-                public void afterInvalidate(EntryEvent event) {
+    RegionQueue regionqueue =
+        new HARegionQueue.TestOnlyHARegionQueue(this.testName.getMethodName(), this.cache, haa) {
+          @Override
+          CacheListener createCacheListenerForHARegion() {
 
-                  if (event.getKey() instanceof Long) {
-                    synchronized (HARegionQueueJUnitTest.this) {
-                      expiryCalled = true;
-                      HARegionQueueJUnitTest.this.notify();
+            return new CacheListenerAdapter() {
 
-                    } ;
-                    Thread.yield();
+              @Override
+              public void afterInvalidate(EntryEvent event) {
 
-                    synchronized (HARegionQueueJUnitTest.this) {
-                      if (!allowExpiryToProceed) {
-                        try {
-                          HARegionQueueJUnitTest.this.wait();
-                        } catch (InterruptedException e1) {
-                          encounteredException = true;
-                        }
+                if (event.getKey() instanceof Long) {
+                  synchronized (HARegionQueueJUnitTest.this) {
+                    expiryCalled.set(true);
+                    HARegionQueueJUnitTest.this.notifyAll();
+                  }
+
+                  Thread.yield();
+
+                  synchronized (HARegionQueueJUnitTest.this) {
+                    while (!allowExpiryToProceed.get()) {
+                      try {
+                        HARegionQueueJUnitTest.this.wait();
+                      } catch (InterruptedException e) {
+                        errorCollector.addError(e);
+                        break;
                       }
                     }
-                    try {
-                      expireTheEventOrThreadIdentifier(event);
-                    } catch (CacheException e) {
-                      e.printStackTrace();
-                      encounteredException = true;
-                    } finally {
-                      synchronized (HARegionQueueJUnitTest.this) {
-                        complete = true;
-                        HARegionQueueJUnitTest.this.notify();
-                      }
+                  }
+
+                  try {
+                    expireTheEventOrThreadIdentifier(event);
+                  } catch (CacheException e) {
+                    errorCollector.addError(e);
+                  } finally {
+                    synchronized (HARegionQueueJUnitTest.this) {
+                      complete.set(true);
+                      HARegionQueueJUnitTest.this.notifyAll();
                     }
                   }
                 }
-              };
-            }
-          };
-      EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+              }
+            };
+          }
+        };
 
-      Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing");
+    EventID ev1 = new EventID(new byte[] {1}, 1, 1);
+    Conflatable cf1 =
+        new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName());
+    regionqueue.put(cf1);
 
-      regionqueue.put(cf1);
-      synchronized (this) {
-        if (!expiryCalled) {
-          this.wait();
-        }
-      }
-      try {
-        Object o = regionqueue.take();
-        assertNull(o);
-      } catch (Exception e) {
-        throw new AssertionError("Test failed due to exception ", e);
-      } finally {
-        synchronized (this) {
-          this.allowExpiryToProceed = true;
-          this.notify();
-        }
+    synchronized (this) {
+      

<TRUNCATED>

Mime
View raw message