geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [14/19] incubator-geode git commit: GEODE-681: Measuring event queue size without using the stat for gfsh
Date Thu, 21 Jan 2016 19:09:00 GMT
GEODE-681: Measuring event queue size without using the stat for gfsh

The eventQueueSize stat has issues with concurrent events during start
and stop that are leading it to be inaccurate. To make gfsh and jmx
correct,  measuring the queue size directly from the underlying region
size.

Adding unit tests for the size methods.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6e52f7a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6e52f7a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6e52f7a3

Branch: refs/heads/feature/GEODE-715
Commit: 6e52f7a3855f045076ea32ae8d913d2ee30ad33e
Parents: e497a4d
Author: Dan Smith <upthewaterspout@apache.org>
Authored: Wed Dec 9 16:22:32 2015 -0800
Committer: Dan Smith <upthewaterspout@apache.org>
Committed: Thu Jan 21 09:09:15 2016 -0800

----------------------------------------------------------------------
 .../internal/ParallelAsyncEventQueueImpl.java   |   8 +-
 .../internal/SerialAsyncEventQueueImpl.java     |   8 +-
 .../cache/wan/AbstractGatewaySender.java        |  31 ++++--
 .../AbstractGatewaySenderEventProcessor.java    |   8 +-
 .../cache/wan/GatewaySenderAdvisor.java         |   4 +-
 .../ParallelGatewaySenderEventProcessor.java    |   8 +-
 .../parallel/ParallelGatewaySenderQueue.java    |  61 +++++++----
 .../ParallelGatewaySenderQueueJUnitTest.java    |  87 +++++++++++++++
 ...urrentSerialGatewaySenderEventProcessor.java |   9 ++
 .../beans/GatewaySenderMBeanBridge.java         |   8 +-
 .../internal/beans/stats/StatsKey.java          |   1 -
 .../SortedListForAsyncQueueJUnitTest.java       |   4 +-
 ...ialGatewaySenderEventProcessorJUnitTest.java |  42 ++++++++
 .../bean/stats/GatewayMBeanBridgeJUnitTest.java | 108 +++++++++++++++++++
 .../bean/stats/GatewaySenderStatsJUnitTest.java | 101 -----------------
 15 files changed, 334 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 377f8e2..aa3e71c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -70,7 +70,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
   
   @Override
   public void start() {
-    this.lifeCycleLock.writeLock().lock(); 
+    this.getLifeCycleLock().writeLock().lock(); 
     try {
       if (isRunning()) {
         logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING,
this.getId()));
@@ -117,7 +117,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender
{
       }
     }
     finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
   }
   
@@ -132,7 +132,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender
{
 
   @Override
   public void stop() {
-    this.lifeCycleLock.writeLock().lock(); 
+    this.getLifeCycleLock().writeLock().lock(); 
     try {
       if (!this.isRunning()) {
         return;
@@ -163,7 +163,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender
{
       clearTempEventsAfterSenderStopped();
     }
     finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index 8c8342c..5669aed 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -74,7 +74,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
       logger.debug("Starting gatewaySender : {}", this);
     }
     
-    this.lifeCycleLock.writeLock().lock();
+    this.getLifeCycleLock().writeLock().lock();
     try {
       if (isRunning()) {
         logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_SENDER_0_IS_ALREADY_RUNNING,
this.getId()));
@@ -123,7 +123,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
   
       enqueueTempEvents();
     } finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
   }
   
@@ -132,7 +132,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
     if (logger.isDebugEnabled()) {
       logger.debug("Stopping Gateway Sender : {}", this);
     }
-    this.lifeCycleLock.writeLock().lock();
+    this.getLifeCycleLock().writeLock().lock();
     try {
       // Stop the dispatcher
       AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
@@ -152,7 +152,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
       
       clearTempEventsAfterSenderStopped();
     } finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
     if (this.isPrimary()) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
index 422c150..d2822af 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySender.java
@@ -142,7 +142,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
   
   protected LocatorDiscoveryCallback locatorDiscoveryCallback;
   
-  public final ReentrantReadWriteLock lifeCycleLock = new ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock lifeCycleLock = new ReentrantReadWriteLock();
   
   protected GatewaySenderAdvisor senderAdvisor;
   
@@ -569,7 +569,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
    */
   public void destroy() {
     try {
-      this.lifeCycleLock.writeLock().lock();
+      this.getLifeCycleLock().writeLock().lock();
       // first, check if this sender is attached to any region. If so, throw
       // GatewaySenderException
       Set<LocalRegion> regions = ((GemFireCacheImpl)this.cache)
@@ -627,7 +627,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
       }//END if (regionQueues != null)
     }
     finally {
-      this.lifeCycleLock.writeLock().unlock();
+      this.getLifeCycleLock().writeLock().unlock();
     }
   }
   
@@ -803,7 +803,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
   
   final public void pause() {
     if (this.eventProcessor != null) {
-      this.lifeCycleLock.writeLock().lock();
+      this.getLifeCycleLock().writeLock().lock();
       try {
         if (this.eventProcessor.isStopped()) {
           return;
@@ -818,14 +818,14 @@ public abstract class AbstractGatewaySender implements GatewaySender,
 
         enqueueTempEvents();
       } finally {
-        this.lifeCycleLock.writeLock().unlock();
+        this.getLifeCycleLock().writeLock().unlock();
       }
     }
   }
 
   final public void resume() {
     if (this.eventProcessor != null) {
-      this.lifeCycleLock.writeLock().lock();
+      this.getLifeCycleLock().writeLock().lock();
       try {
         if (this.eventProcessor.isStopped()) {
           return;
@@ -841,7 +841,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
         
         enqueueTempEvents();
       } finally {
-        this.lifeCycleLock.writeLock().unlock();
+        this.getLifeCycleLock().writeLock().unlock();
       }
     }
   }
@@ -951,10 +951,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
       clonedEvent.setCallbackArgument(geCallbackArg);
     }
 
-    if (!this.lifeCycleLock.readLock().tryLock()) {
+    if (!this.getLifeCycleLock().readLock().tryLock()) {
       synchronized (this.queuedEventsSync) {
         if (!this.enqueuedAllTempQueueEvents) {
-          if (!this.lifeCycleLock.readLock().tryLock()) {
+          if (!this.getLifeCycleLock().readLock().tryLock()) {
             Object substituteValue = getSubstituteValue(clonedEvent, operation);
             this.tmpQueuedEvents.add(new TmpQueueEvent(operation, clonedEvent, substituteValue));
             freeClonedEvent = false;
@@ -967,7 +967,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
         }
       }
       if(this.enqueuedAllTempQueueEvents) {
-        this.lifeCycleLock.readLock().lock();
+        this.getLifeCycleLock().readLock().lock();
       }
     }
     try {
@@ -1010,7 +1010,7 @@ public abstract class AbstractGatewaySender implements GatewaySender,
                 new Object[] { this, getId(), operation, clonedEvent }), e);
       }
     } finally {
-      this.lifeCycleLock.readLock().unlock();
+      this.getLifeCycleLock().readLock().unlock();
     }
     } finally {
       if (freeClonedEvent) {
@@ -1249,6 +1249,11 @@ public abstract class AbstractGatewaySender implements GatewaySender,
     return 0;
   }
   
+  public int getEventQueueSize() { 
+    AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+    return localProcessor == null ? 0 : localProcessor.eventQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
     this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
@@ -1260,6 +1265,10 @@ public abstract class AbstractGatewaySender implements GatewaySender,
   public Object getLockForConcurrentDispatcher() {
     return this.lockForConcurrentDispatcher;
   }
+  public ReentrantReadWriteLock getLifeCycleLock() {
+    return lifeCycleLock;
+  }
+
   /**
    * Has a reference to a GatewayEventImpl and has a timeout value.
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index b9d877e..c19857f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -244,8 +244,12 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread
{
     //return this.queue.take();
   }
 
-  protected int eventQueueSize() {
-    // This should be local size instead of PR size. Fix for #48627
+  public int eventQueueSize() {
+    if(queue == null) {
+      return 0;
+    }
+    
+    // This should be local size instead of pr size
     if (this.queue instanceof ParallelGatewaySenderQueue) {
       return ((ParallelGatewaySenderQueue) queue).localSize();
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAdvisor.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAdvisor.java
index 60ed6c8..8efd5fc 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderAdvisor.java
@@ -431,7 +431,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
     this.lockObtainingThread = new Thread(threadGroup, new Runnable() {
       @SuppressWarnings("synthetic-access")
       public void run() {
-        GatewaySenderAdvisor.this.sender.lifeCycleLock
+        GatewaySenderAdvisor.this.sender.getLifeCycleLock()
                     .readLock().lock(); 
           try {
             // Attempt to obtain the lock
@@ -466,7 +466,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
             }
           }
           finally{
-            GatewaySenderAdvisor.this.sender.lifeCycleLock
+            GatewaySenderAdvisor.this.sender.getLifeCycleLock()
                           .readLock().unlock();
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
index 8d11079..fb83373 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
@@ -104,10 +104,14 @@ public class ParallelGatewaySenderEventProcessor extends
       logger.debug("The target Regions are(PGSEP) {}", targetRs);
     }
     
+    ParallelGatewaySenderQueue queue;
     if (sender.getIsHDFSQueue())
-      this.queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index,
this.nDispatcher);
+      queue = new HDFSParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
     else
-      this.queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+      queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
+    
+    queue.start();
+    this.queue = queue;
     
     if(((ParallelGatewaySenderQueue)queue).localSize() > 0) {
       ((ParallelGatewaySenderQueue)queue).notifyEventProcessorIfRequired();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 94dc9e8..c00903f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -212,19 +212,29 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   
   final protected int index; 
   final protected int nDispatcher;
+
+  private MetaRegionFactory metaRegionFactory;
   
   /**
    * A transient queue to maintain the eventSeqNum of the events that are to be
    * sent to remote site. It is cleared when the queue is cleared.
    */
-  //private final BlockingQueue<Long> eventSeqNumQueue;  
+  //private final BlockingQueue<Long> eventSeqNumQueue;
   
   public ParallelGatewaySenderQueue(AbstractGatewaySender sender,
       Set<Region> userRegions, int idx, int nDispatcher) {
+    this(sender, userRegions, idx, nDispatcher, new MetaRegionFactory());
+  }
+  
+  ParallelGatewaySenderQueue(AbstractGatewaySender sender,
+      Set<Region> userRegions, int idx, int nDispatcher, MetaRegionFactory metaRegionFactory)
{
+  
+    this.metaRegionFactory = metaRegionFactory;
+    
     this.index = idx;
     this.nDispatcher = nDispatcher;
     this.stats = sender.getStatistics();
-    this.sender = (AbstractGatewaySender)sender;
+    this.sender = sender;
     
     List<Region> listOfRegions = new ArrayList<Region>(userRegions);
     //eventSeqNumQueue = new LinkedBlockingQueue<Long>();
@@ -266,25 +276,29 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     
     queueEmptyLock = new StoppableReentrantLock(sender.getCancelCriterion());
     queueEmptyCondition = queueEmptyLock.newCondition();
+    
+    //initialize the conflation thread pool if conflation is enabled
+    if (sender.isBatchConflationEnabled()) {
+      initializeConflationThreadPool();
+    }
+  }
+
+  /**Start the background batch removal thread. */
+  public void start() {
     //at present, this won't be accessed by multiple threads, 
     //still, it is safer approach to synchronize it
     synchronized (ParallelGatewaySenderQueue.class) {
       if (removalThread == null) {
         removalThread = new BatchRemovalThread(
-          (GemFireCacheImpl)sender.getCache(), this);
+          (GemFireCacheImpl)this.sender.getCache(), this);
         removalThread.start();
       }
     }
-    
-    //initialize the conflation thread pool if conflation is enabled
-    if (sender.isBatchConflationEnabled()) {
-      initializeConflationThreadPool();
-    }
   }
 
   public void addShadowPartitionedRegionForUserRR(
       DistributedRegion userRegion) {
-    this.sender.lifeCycleLock.writeLock().lock();
+    this.sender.getLifeCycleLock().writeLock().lock();
     PartitionedRegion prQ = null;
 
     if (logger.isDebugEnabled()) {
@@ -434,7 +448,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
       if (prQ != null) {
 	      this.userRegionNameToshadowPRMap.put(userRegion.getFullPath(), prQ);
       }
-      this.sender.lifeCycleLock.writeLock().unlock();
+      this.sender.getLifeCycleLock().writeLock().unlock();
     }
   }
   
@@ -447,7 +461,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     if (logger.isDebugEnabled()) {
       logger.debug("{} addShadowPartitionedRegionForUserPR: Attempting to create queue region:
{}", this, userPR.getDisplayName());
     }
-    this.sender.lifeCycleLock.writeLock().lock();
+    this.sender.getLifeCycleLock().writeLock().lock();
     
     PartitionedRegion prQ = null;
     try {
@@ -527,8 +541,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
           logger.debug("{}: Attempting to create queue region: {}", this, prQName);
         }
 
-        ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(
-            prQName, ra, null, cache, sender, isUsedForHDFS());
+        ParallelGatewaySenderQueueMetaRegion meta = metaRegionFactory.newMetataRegion(cache,
+            prQName, ra, sender, isUsedForHDFS());
 
         try {
           prQ = (PartitionedRegion)cache
@@ -586,7 +600,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         ((AbstractGatewaySender)sender).enqueueTempEvents();
       }
       afterRegionAdd(userPR);
-      this.sender.lifeCycleLock.writeLock().unlock();
+      this.sender.getLifeCycleLock().writeLock().unlock();
     }
   }
 
@@ -908,12 +922,12 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   public PartitionedRegion removeShadowPR(String fullpath) {
     try {
-      this.sender.lifeCycleLock.writeLock().lock();
+      this.sender.getLifeCycleLock().writeLock().lock();
       this.sender.setEnqueuedAllTempQueueEvents(false);
       return this.userRegionNameToshadowPRMap.remove(fullpath);
     }
     finally {
-      sender.lifeCycleLock.writeLock().unlock();
+      sender.getLifeCycleLock().writeLock().unlock();
     }
   }
   
@@ -1414,8 +1428,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   public int localSize() {
     int size = 0;
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
-      if(((PartitionedRegion)prQ.getRegion()).getDataStore() != null) {
-        size += ((PartitionedRegion)prQ.getRegion()).getDataStore()
+      if(prQ != null && prQ.getDataStore() != null) {
+        size += prQ.getDataStore()
             .getSizeOfLocalPrimaryBuckets();  
       }
       if (logger.isDebugEnabled()) {
@@ -1788,7 +1802,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     }
   }
 
-  protected class ParallelGatewaySenderQueueMetaRegion extends
+  protected static class ParallelGatewaySenderQueueMetaRegion extends
       PartitionedRegion {
     
     AbstractGatewaySender sender = null;
@@ -1860,4 +1874,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
   public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException{
   	throw new RuntimeException("This method(size)is not supported by ParallelGatewaySenderQueue");
   }
+  
+  static class MetaRegionFactory {
+    ParallelGatewaySenderQueueMetaRegion newMetataRegion(
+        GemFireCacheImpl cache, final String prQName, final RegionAttributes ra, AbstractGatewaySender
sender, boolean isUsedForHDFS) {
+      ParallelGatewaySenderQueueMetaRegion meta = new ParallelGatewaySenderQueueMetaRegion(
+          prQName, ra, null, cache, sender, isUsedForHDFS);
+      return meta;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
new file mode 100644
index 0000000..a7daf98
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.MetaRegionFactory;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.ParallelGatewaySenderQueueMetaRegion;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+import static org.mockito.Mockito.*;
+
+
+@Category(UnitTest.class)
+public class ParallelGatewaySenderQueueJUnitTest {
+  
+  private ParallelGatewaySenderQueue queue;
+  private MetaRegionFactory metaRegionFactory;
+  private GemFireCacheImpl cache;
+
+  @Before
+  public void createParallelGatewaySenderQueue() {
+    cache = mock(GemFireCacheImpl.class);
+    AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
+    when(sender.getCache()).thenReturn(cache);
+    when(sender.getMaximumQueueMemory()).thenReturn(100);
+    when(sender.getLifeCycleLock()).thenReturn(new ReentrantReadWriteLock());
+    metaRegionFactory = mock(MetaRegionFactory.class);
+    queue = new ParallelGatewaySenderQueue(sender, Collections.emptySet(), 0, 1, metaRegionFactory);
+  }
+
+  @Test
+  public void testLocalSize() throws TimeoutException, RegionExistsException, ClassNotFoundException,
IOException {
+    ParallelGatewaySenderQueueMetaRegion mockMetaRegion = mock(ParallelGatewaySenderQueueMetaRegion.class);
+    PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
+    when(mockMetaRegion.getDataStore()).thenReturn(dataStore);
+    when(dataStore.getSizeOfLocalPrimaryBuckets()).thenReturn(3); 
+    when(metaRegionFactory.newMetataRegion(any(), any(), any(), any(), anyBoolean())).thenReturn(mockMetaRegion);
+    when(cache.createVMRegion(any(), any(), any())).thenReturn(mockMetaRegion);
+    
+    queue.addShadowPartitionedRegionForUserPR(mockPR("region1"));
+    
+    assertEquals(3, queue.localSize());
+  }
+
+  private PartitionedRegion mockPR(String name) {
+    PartitionedRegion region = mock(PartitionedRegion.class);
+    when(region.getFullPath()).thenReturn(name);
+    when(region.getPartitionAttributes()).thenReturn(new PartitionAttributesFactory<>().create());
+    when(region.getTotalNumberOfBuckets()).thenReturn(113);
+    when(region.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+    return region;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 16410ce..1f68c55 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -100,6 +100,15 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends
       }
     }
   }
+  
+  @Override
+  public int eventQueueSize() {
+    int size = 0;
+    for (RegionQueue queue : queues) {
+      size += queue.size();
+    }
+    return size;
+  }
 
   //based on the fix for old wan Bug#46992 .revision is 39437  
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/GatewaySenderMBeanBridge.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/GatewaySenderMBeanBridge.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/GatewaySenderMBeanBridge.java
index 0b6f432..938d326 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/GatewaySenderMBeanBridge.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/GatewaySenderMBeanBridge.java
@@ -76,12 +76,6 @@ public class GatewaySenderMBeanBridge {
     }
 
   }
-  
-  public GatewaySenderMBeanBridge() {
-    this.monitor = new MBeanStatsMonitor(
-        ManagementStrings.GATEWAY_SENDER_MONITOR.toLocalizedString());
-    initializeStats();
-  }
 
   public void addGatewaySenderStats(GatewaySenderStats gatewaySenderStats) {
     monitor.addStatisticsToMonitor(gatewaySenderStats.getStats());
@@ -253,7 +247,7 @@ public class GatewaySenderMBeanBridge {
   }  
 
   public int getEventQueueSize() {
-    return getStatistic(StatsKey.GATEWAYSENDER_EVENTS_QUEUE_SIZE).intValue();
+    return abstractSender.getEventQueueSize();
   }
 
   public float getEventsQueuedRate() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/StatsKey.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/StatsKey.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/StatsKey.java
index 5097292..05dfd5e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/StatsKey.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/StatsKey.java
@@ -301,7 +301,6 @@ public class StatsKey {
   
   public static final String GATEWAYSENDER_EVENTS_RECEIVED = "eventsReceived";
   public static final String GATEWAYSENDER_EVENTS_QUEUED = "eventsQueued";
-  public static final String GATEWAYSENDER_EVENTS_QUEUE_SIZE = "eventQueueSize";
   public static final String GATEWAYSENDER_BATCHES_DISTRIBUTED = "batchesDistributed";
   public static final String GATEWAYSENDER_BATCHES_DISTRIBUTE_TIME = "batchDistributionTime";
   public static final String GATEWAYSENDER_TOTAL_BATCHES_REDISTRIBUTED = "batchesRedistributed";

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
index bfd4ac3..0477893 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
@@ -268,7 +268,9 @@ public class SortedListForAsyncQueueJUnitTest extends TestCase {
     ParallelAsyncEventQueueImpl gatewaySender = new ParallelAsyncEventQueueImpl(c, gattrs);
     HashSet<Region> set = new HashSet<Region>();
     set.add(region);
-    return new HDFSParallelGatewaySenderQueue(gatewaySender, set, 0, 1);
+    HDFSParallelGatewaySenderQueue queue = new HDFSParallelGatewaySenderQueue(gatewaySender,
set, 0, 1);
+    queue.start();
+    return queue;
   }
   
  // A test for testing whether the KeyToSeqNumObject compare function is in order.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessorJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessorJUnitTest.java
new file mode 100644
index 0000000..e26d287
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessorJUnitTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ConcurrentSerialGatewaySenderEventProcessorJUnitTest {
+
+  @Test
+  public void eventQueueSizeReturnsSizeOfQueues() {
+    AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
+    ConcurrentSerialGatewaySenderEventProcessor processor = new ConcurrentSerialGatewaySenderEventProcessor(sender);
+    RegionQueue queue = mock(RegionQueue.class);
+    when(queue.size()).thenReturn(3);
+    processor.getQueues().add(queue);
+    assertEquals(3,processor.eventQueueSize());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
new file mode 100644
index 0000000..97892ba
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewayMBeanBridgeJUnitTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.management.bean.stats;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.NanoTimer;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
+import com.gemstone.gemfire.management.internal.beans.GatewaySenderMBeanBridge;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+import io.codearte.catchexception.shade.mockito.Mockito;
+
+/**
+ * @author rishim
+ */
+@Category(IntegrationTest.class)
+public class GatewayMBeanBridgeJUnitTest extends MBeanStatsTestCase {
+  
+  private GatewaySenderMBeanBridge bridge;
+
+  private GatewaySenderStats senderStats;
+
+  private static long testStartTime = NanoTimer.getTime();
+
+  private AbstractGatewaySender sender;
+
+  public void init() {
+    senderStats = new GatewaySenderStats(system, "test");
+
+    sender = Mockito.mock(AbstractGatewaySender.class);
+    Mockito.when(sender.getStatistics()).thenReturn(senderStats);
+    bridge = new GatewaySenderMBeanBridge(sender);
+    bridge.addGatewaySenderStats(senderStats);
+  }
+  
+  @Test
+  public void testSenderStats() throws InterruptedException{
+    senderStats.incBatchesRedistributed();
+    senderStats.incEventsReceived();
+    Mockito.when(sender.getEventQueueSize()).thenReturn(10);
+    senderStats.endPut(testStartTime);
+    senderStats.endBatch(testStartTime, 100);
+    senderStats.incEventsNotQueuedConflated();
+    senderStats.incEventsExceedingAlertThreshold();
+    
+    sample();
+    
+    assertEquals(1, getTotalBatchesRedistributed());
+    assertEquals(1, getTotalEventsConflated());
+    assertEquals(10, getEventQueueSize());
+    assertTrue(getEventsQueuedRate() >0);
+    assertTrue(getEventsReceivedRate() >0);
+    assertTrue(getBatchesDispatchedRate() >0);
+    assertTrue(getAverageDistributionTimePerBatch() >0);
+    assertTrue(getEventsExceedingAlertThreshold() >0);
+  }
+
+  private int getTotalBatchesRedistributed() {
+    return bridge.getTotalBatchesRedistributed();
+  }
+
+  private int getTotalEventsConflated() {
+    return bridge.getTotalEventsConflated();
+  }  
+
+  private int getEventQueueSize() {
+    return bridge.getEventQueueSize();
+  }
+
+  private float getEventsQueuedRate() {
+    return bridge.getEventsQueuedRate();
+  }
+
+  private float getEventsReceivedRate() {
+    return bridge.getEventsReceivedRate();
+  }
+   
+  private float getBatchesDispatchedRate() {
+    return bridge.getBatchesDispatchedRate();
+  }
+   
+  private long getAverageDistributionTimePerBatch() {
+    return bridge.getAverageDistributionTimePerBatch();
+  }
+  private long getEventsExceedingAlertThreshold() {
+    return bridge.getEventsExceedingAlertThreshold();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6e52f7a3/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewaySenderStatsJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewaySenderStatsJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewaySenderStatsJUnitTest.java
deleted file mode 100644
index 30b3a82..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/management/bean/stats/GatewaySenderStatsJUnitTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.management.bean.stats;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.NanoTimer;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderStats;
-import com.gemstone.gemfire.management.internal.beans.GatewaySenderMBeanBridge;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-/**
- * @author rishim
- */
-@Category(IntegrationTest.class)
-public class GatewaySenderStatsJUnitTest extends MBeanStatsTestCase {
-  
-  private GatewaySenderMBeanBridge bridge;
-
-  private GatewaySenderStats senderStats;
-
-  private static long testStartTime = NanoTimer.getTime();
-
-  public void init() {
-    senderStats = new GatewaySenderStats(system, "test");
-
-    bridge = new GatewaySenderMBeanBridge();
-    bridge.addGatewaySenderStats(senderStats);
-  }
-  
-  @Test
-  public void testSenderStats() throws InterruptedException{
-    senderStats.incBatchesRedistributed();
-    senderStats.incEventsReceived();
-    senderStats.setQueueSize(10);
-    senderStats.endPut(testStartTime);
-    senderStats.endBatch(testStartTime, 100);
-    senderStats.incEventsNotQueuedConflated();
-    senderStats.incEventsExceedingAlertThreshold();
-    
-    sample();
-    
-    assertEquals(1, getTotalBatchesRedistributed());
-    assertEquals(1, getTotalEventsConflated());
-    assertEquals(10, getEventQueueSize());
-    assertTrue(getEventsQueuedRate() >0);
-    assertTrue(getEventsReceivedRate() >0);
-    assertTrue(getBatchesDispatchedRate() >0);
-    assertTrue(getAverageDistributionTimePerBatch() >0);
-    assertTrue(getEventsExceedingAlertThreshold() >0);
-  }
-
-  private int getTotalBatchesRedistributed() {
-    return bridge.getTotalBatchesRedistributed();
-  }
-
-  private int getTotalEventsConflated() {
-    return bridge.getTotalEventsConflated();
-  }  
-
-  private int getEventQueueSize() {
-    return bridge.getEventQueueSize();
-  }
-
-  private float getEventsQueuedRate() {
-    return bridge.getEventsQueuedRate();
-  }
-
-  private float getEventsReceivedRate() {
-    return bridge.getEventsReceivedRate();
-  }
-   
-  private float getBatchesDispatchedRate() {
-    return bridge.getBatchesDispatchedRate();
-  }
-   
-  private long getAverageDistributionTimePerBatch() {
-    return bridge.getAverageDistributionTimePerBatch();
-  }
-  private long getEventsExceedingAlertThreshold() {
-    return bridge.getEventsExceedingAlertThreshold();
-  }
-}



Mime
View raw message