geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasonhu...@apache.org
Subject geode git commit: GEODE-2900: push shadow key back into the front of the eventSeqNumber "Queue"
Date Tue, 09 May 2017 17:15:47 GMT
Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2900 [created] 240b469ff


GEODE-2900:  push shadow key back into the front of the eventSeqNumber "Queue"


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/240b469f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/240b469f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/240b469f

Branch: refs/heads/feature/GEODE-2900
Commit: 240b469ff217fbfba39381f196ae3e0e832a69a6
Parents: 23eb232
Author: Jason Huynh <huynhja@gmail.com>
Authored: Tue May 9 10:11:39 2017 -0700
Committer: Jason Huynh <huynhja@gmail.com>
Committed: Tue May 9 10:14:32 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegionQueue.java | 34 ++++++++++++--------
 .../parallel/ParallelGatewaySenderQueue.java    |  4 +--
 2 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/240b469f/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7a21d12..d374541 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -27,8 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -74,7 +76,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
    * A transient queue to maintain the eventSeqNum of the events that are to be sent to remote
site.
    * It is cleared when the queue is cleared.
    */
-  private final BlockingQueue<Object> eventSeqNumQueue = new LinkedBlockingQueue<Object>();
+  private final BlockingDeque<Object> eventSeqNumDeque = new LinkedBlockingDeque<Object>();
 
   // private final BlockingQueue<EventID> eventSeqNumQueueWithEventId = new
   // LinkedBlockingQueue<EventID>();
@@ -139,7 +141,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
             }
           });
           for (EventID eventID : keys) {
-            eventSeqNumQueue.add(eventID);
+            eventSeqNumDeque.addLast(eventID);
           }
         } else {
           TreeSet<Long> sortedKeys = new TreeSet<Long>(this.keySet());
@@ -150,7 +152,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
           // fix for #49679 NoSuchElementException thrown from BucketRegionQueue.initialize
           if (!sortedKeys.isEmpty()) {
             for (Long key : sortedKeys) {
-              eventSeqNumQueue.add(key);
+              eventSeqNumDeque.addLast(key);
             }
             lastKeyRecovered = sortedKeys.last();
             if (this.getEventSeqNum() != null) {
@@ -162,7 +164,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "For bucket {} ,total keys recovered are : {} last key recovered is : {} and
the seqNo is ",
-              getId(), eventSeqNumQueue.size(), lastKeyRecovered, getEventSeqNum());
+              getId(), eventSeqNumDeque.size(), lastKeyRecovered, getEventSeqNum());
         }
       }
       this.initialized = true;
@@ -211,7 +213,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
   @Override
   public void beforeAcquiringPrimaryState() {
     int batchSize = this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
-    Iterator<Object> itr = eventSeqNumQueue.iterator();
+    Iterator<Object> itr = eventSeqNumDeque.iterator();
     markEventsAsDuplicate(batchSize, itr);
   }
 
@@ -224,7 +226,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       }
     });
     this.indexes.clear();
-    this.eventSeqNumQueue.clear();
+    this.eventSeqNumDeque.clear();
   }
 
   @Override
@@ -236,7 +238,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         result.set(BucketRegionQueue.super.clearEntries(rvv));
       }
     });
-    this.eventSeqNumQueue.clear();
+    this.eventSeqNumDeque.clear();
     return result.get();
   }
 
@@ -250,7 +252,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     getInitializationLock().writeLock().lock();
     try {
       this.indexes.clear();
-      this.eventSeqNumQueue.clear();
+      this.eventSeqNumDeque.clear();
     } finally {
       getInitializationLock().writeLock().unlock();
     }
@@ -377,7 +379,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       if (logger.isDebugEnabled()) {
         logger.debug(" removing the key {} from eventSeqNumQueue", event.getKey());
       }
-      this.eventSeqNumQueue.remove(event.getKey());
+      this.eventSeqNumDeque.remove(event.getKey());
     }
   }
 
@@ -412,7 +414,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
       if (this.getPartitionedRegion().isDestroyed()) {
         throw new BucketRegionQueueUnavailableException();
       }
-      key = this.eventSeqNumQueue.peek();
+      key = this.eventSeqNumDeque.peekFirst();
       if (key != null) {
         object = optimalGet(key);
         if (object == null && !this.getPartitionedRegion().isConflationEnabled())
{
@@ -431,7 +433,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         // RegionQueue[1])[0];
         // //queue.addToPeekedKeys(key);
         // }
-        this.eventSeqNumQueue.remove(key);
+        this.eventSeqNumDeque.remove(key);
       }
       return object; // OFFHEAP: ok since callers are careful to do destroys on
                      // region queue after finished with peeked object.
@@ -443,7 +445,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
   protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) {
     if (didPut) {
       if (this.initialized) {
-        this.eventSeqNumQueue.add(key);
+        this.eventSeqNumDeque.addLast(key);
         updateLargestQueuedKey((Long) key);
       }
       if (logger.isDebugEnabled()) {
@@ -456,6 +458,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     }
   }
 
+  public void enqueueKeyToFront(Object key) {
+    eventSeqNumDeque.addFirst(key);
+  }
+
   private void updateLargestQueuedKey(Long key) {
     Atomics.setIfGreater(this.latestQueuedKey, key);
   }
@@ -510,7 +516,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
    * @throws ForceReattemptException
    */
   public Object remove() throws ForceReattemptException {
-    Object key = this.eventSeqNumQueue.remove();
+    Object key = this.eventSeqNumDeque.removeFirst();
     if (key != null) {
       destroyKey(key);
     }
@@ -586,7 +592,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
 
   public boolean isReadyForPeek() {
     return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
-        && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();
+        && !this.eventSeqNumDeque.isEmpty() && getBucketAdvisor().isPrimary();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/240b469f/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 82e6f68..7717fc8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -56,7 +56,6 @@ import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.distributed.internal.DM;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketNotFoundException;
 import org.apache.geode.internal.cache.BucketRegion;
@@ -81,7 +80,6 @@ import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
 import org.apache.geode.internal.cache.wan.GatewaySenderException;
 import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.ParallelQueueBatchRemovalResponse;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
@@ -1357,6 +1355,8 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
         final PartitionedRegion region = (PartitionedRegion) event.getRegion();
         if (!region.getRegionAdvisor().isPrimaryForBucket(bucketId)) {
           iterator.remove();
+          getBucketRegionQueueByBucketId(getRandomShadowPR(), bucketId)
+              .enqueueKeyToFront(event.getShadowKey());
         }
       }
 


Mime
View raw message