geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [54/70] incubator-geode git commit: GEODE-804: Remaining WAN and CQ changes from pivotal
Date Thu, 28 Jan 2016 18:13:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index 74501fb..01c618c 100644
--- a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -65,7 +65,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender
{
   
   @Override
   public void start() {
-    this.lifeCycleLock.writeLock().lock(); 
+    this.getLifeCycleLock().writeLock().lock(); 
     try {
       if (isRunning()) {
         logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING,
this.getId()));
@@ -112,7 +112,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender
{
       }
     }
     finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
   }
   
@@ -127,7 +127,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender
{
 
   @Override
   public void stop() {
-    this.lifeCycleLock.writeLock().lock(); 
+    this.getLifeCycleLock().writeLock().lock(); 
     try {
       if (!this.isRunning()) {
         return;
@@ -164,7 +164,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender
{
 //      }
     }
     finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index d69d747..ad89c22 100644
--- a/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -69,7 +69,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender
{
       logger.debug("Starting gatewaySender : {}", this);
     }
     
-    this.lifeCycleLock.writeLock().lock();
+    this.getLifeCycleLock().writeLock().lock();
     try {
       if (isRunning()) {
         logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING,
this.getId()));
@@ -118,7 +118,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender
{
   
       enqueueTempEvents();
     } finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
   }
   
@@ -127,7 +127,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender
{
     if (logger.isDebugEnabled()) {
       logger.debug("Stopping Gateway Sender : {}", this);
     }
-    this.lifeCycleLock.writeLock().lock();
+    this.getLifeCycleLock().writeLock().lock();
     try {
       // Stop the dispatcher
       AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
@@ -148,7 +148,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender
{
       
       clearTempEventsAfterSenderStopped();
     } finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
     if (this.isPrimary()) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 751a697..84b4395 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -2221,8 +2221,20 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
   
+  public static void stopReceivers() {
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      receiver.stop();
+    }
+  }
+  
+  public static void startReceivers() throws IOException {
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      receiver.start();
+    }
+  }
   
-   
   public static void createSender(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -3948,11 +3960,23 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
   
-  public static void validateQueueSizeStat(String id, int queueSize) {
-    GatewaySender sender = cache.getGatewaySender(id);
-    GatewaySenderStats stats = ((AbstractGatewaySender) sender).getStatistics();
-    assertEquals(queueSize, stats.getEventQueueSize());
-    assertEquals(0, stats.getTempEventQueueSize());
+  public static void validateQueueSizeStat(String id, final int queueSize) {
+    final AbstractGatewaySender sender = (AbstractGatewaySender)  cache.getGatewaySender(id);
+    
+    waitForCriterion(new WaitCriterion() {
+      
+      @Override
+      public boolean done() {
+        return sender.getEventQueueSize() == queueSize;
+      }
+      
+      @Override
+      public String description() {
+        // TODO Auto-generated method stub
+        return null;
+      }
+    }, 30000, 50, false);
+    assertEquals(queueSize, sender.getEventQueueSize());
   }
   /**
    * This method is specifically written for pause and stop operations.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
index 0ee0b36..9b78a8f 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
@@ -483,11 +483,8 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends
WANTes
     vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
     vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
     
-    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    vm6.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    vm7.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    
+    pause(2000);
+
     //SECOND RUN: do some of the puts after the senders are stopped
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
     

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java
index afb3555..2105cef 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java
@@ -201,21 +201,28 @@ public class ConcurrentSerialGatewaySenderOperationsDUnitTest  extends
WANTestBa
         testName + "_RR", "ln", isOffHeap() });
 
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        1000 });
+        20 });
 
     vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
     vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
 
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        1000 });
+        20 });
     
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 1000 });
+        testName + "_RR", 20 });
     vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 1000 });
+        testName + "_RR", 20 });
+    
+    vm2.invoke(WANTestBase.class, "stopReceivers");
+    vm3.invoke(WANTestBase.class, "stopReceivers");
     
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        200 });
+        20 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
+    
     vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
     vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
 
@@ -223,9 +230,9 @@ public class ConcurrentSerialGatewaySenderOperationsDUnitTest  extends
WANTestBa
         "verifySenderStoppedState", new Object[] { "ln" });
     vm5.invoke(ConcurrentSerialGatewaySenderOperationsDUnitTest.class,
         "verifySenderStoppedState", new Object[] { "ln" });
+
     vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
     vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-
     /**
      * Should have no effect on GatewaySenderState
      */
@@ -242,9 +249,18 @@ public class ConcurrentSerialGatewaySenderOperationsDUnitTest  extends
WANTestBa
     int START_WAIT_TIME = 30000;
     vm4async.getResult(START_WAIT_TIME);
     vm5async.getResult(START_WAIT_TIME);
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
 
-    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-      10000 });
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+      110 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 130 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 130 });
+    
+    vm2.invoke(WANTestBase.class, "startReceivers");
+    vm3.invoke(WANTestBase.class, "startReceivers");
 
     vm4.invoke(ConcurrentSerialGatewaySenderOperationsDUnitTest.class,
         "verifySenderResumedState", new Object[] { "ln" });
@@ -252,12 +268,16 @@ public class ConcurrentSerialGatewaySenderOperationsDUnitTest  extends
WANTestBa
         "verifySenderResumedState", new Object[] { "ln" });
     
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_RR", 10000 });
+      testName + "_RR", 110 });
     vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_RR", 10000 });
+      testName + "_RR", 110 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
   }
 
-  public void testStopOneSerialGatewaySenderBothPrimary() {
+
+  public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
     addExpectedException("Broken pipe");
     Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
         "createFirstLocatorWithDSId", new Object[] { 1 });
@@ -295,7 +315,7 @@ public class ConcurrentSerialGatewaySenderOperationsDUnitTest  extends
WANTestBa
     vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
 
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        1000 });
+        100 });
 
     vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
 
@@ -303,14 +323,31 @@ public class ConcurrentSerialGatewaySenderOperationsDUnitTest  extends
WANTestBa
         "verifySenderStoppedState", new Object[] { "ln" });
     
     vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        10000 });
+        200 });
     
     getLogWriter().info("Completed puts in the region");
 
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 10000 });
+        testName + "_RR", 200 });
     vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 10000 });
+        testName + "_RR", 200 });
+    
+  //Do some puts while restarting a sender
+    AsyncInvocation asyncPuts = vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[]
{ testName + "_RR",
+        2000 });
+    
+    Thread.sleep(10);
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    asyncPuts.getResult();
+    getLogWriter().info("Completed puts in the region");
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 2000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 2000 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
   }
 
   public void Bug46921_testStopOneSerialGatewaySender_PrimarySecondary() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 5aac8f1..cd25f16 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -490,11 +490,6 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase
{
     vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
     vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
     
-    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    vm6.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    vm7.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
-    
     pause(2000);
     
     //SECOND RUN: do some of the puts after the senders are stopped

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8fdbab40/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index 70b6ecb..bdaec73 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -210,21 +210,28 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase
{
         testName + "_RR", "ln", isOffHeap() });
 
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        1000 });
+        20 });
 
     vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
     vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
 
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        1000 });
+        20 });
     
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 1000 });
+        testName + "_RR", 20 });
     vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 1000 });
+        testName + "_RR", 20 });
+    
+    vm2.invoke(WANTestBase.class, "stopReceivers");
+    vm3.invoke(WANTestBase.class, "stopReceivers");
     
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        200 });
+        20 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
+    
     vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
     vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
 
@@ -252,9 +259,17 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase
{
     vm4async.getResult(START_WAIT_TIME);
     vm5async.getResult(START_WAIT_TIME);
 
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 20 });
 
-    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-      10000 });
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+      110 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 130 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 130 });
+    
+    vm2.invoke(WANTestBase.class, "startReceivers");
+    vm3.invoke(WANTestBase.class, "startReceivers");
 
     vm4.invoke(SerialGatewaySenderOperationsDUnitTest.class,
         "verifySenderResumedState", new Object[] { "ln" });
@@ -262,12 +277,15 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase
{
         "verifySenderResumedState", new Object[] { "ln" });
     
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_RR", 10000 });
+      testName + "_RR", 110 });
     vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName + "_RR", 10000 });
+      testName + "_RR", 110 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
   }
 
-  public void testStopOneSerialGatewaySenderBothPrimary() {
+  public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable {
     Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
         "createFirstLocatorWithDSId", new Object[] { 1 });
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
@@ -304,7 +322,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase
{
     vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
 
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        1000 });
+        100 });
 
     vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
 
@@ -312,14 +330,32 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase
{
         "verifySenderStoppedState", new Object[] { "ln" });
     
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
-        10000 });
+        200 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 200 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 200 });
+    
+    //Do some puts while restarting a sender
+    AsyncInvocation asyncPuts = vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[]
{ testName + "_RR",
+        2000 });
+    
+    Thread.sleep(10);
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
     
+    asyncPuts.getResult();
     getLogWriter().info("Completed puts in the region");
-
+    
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 10000 });
+        testName + "_RR", 2000 });
     vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_RR", 10000 });
+        testName + "_RR", 2000 });
+    
+    Thread.sleep(5000);
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { "ln", 0 });
+    
+    
   }
 
   public void testStopOneSerialGatewaySender_PrimarySecondary() {


Mime
View raw message