geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject [7/7] geode git commit: fix-6
Date Thu, 27 Apr 2017 06:27:26 GMT
fix-6


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/ea3420b3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/ea3420b3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/ea3420b3

Branch: refs/heads/feature/GEM-1299
Commit: ea3420b3efd9dc21fa29f0f7b70bef911d4e827e
Parents: 80a95f6
Author: zhouxh <gzhou@pivotal.io>
Authored: Wed Apr 26 23:23:13 2017 -0700
Committer: zhouxh <gzhou@pivotal.io>
Committed: Wed Apr 26 23:26:22 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/geode/internal/cache/BucketRegion.java  | 2 +-
 .../org/apache/geode/internal/cache/BucketRegionQueue.java  | 6 +++++-
 .../java/org/apache/geode/internal/cache/LocalRegion.java   | 2 +-
 .../cache/wan/parallel/ParallelGatewaySenderQueue.java      | 1 +
 .../lucene/internal/LuceneIndexForPartitionedRegion.java    | 9 +++++----
 .../internal/distributed/PokeLuceneAsyncQueueFunction.java  | 3 +++
 6 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 136d7b9..cde7cf4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -668,7 +668,7 @@ public class BucketRegion extends DistributedRegion implements Bucket
{
     }
   }
 
-  protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
+  public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
     // We don't need to clone the event for new Gateway Senders.
     // Preserve the bucket reference for resetting it later.
     LocalRegion bucketRegion = event.getRegion();

http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index bcc1d8d..56ae3f1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -384,7 +384,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
   /**
    * Does a get that gets the value without fault values in from disk.
    */
-  private Object optimalGet(Object k) {
+  public Object optimalGet(Object k) {
     // Get the object at that key (to remove the index).
     Object object = null;
     try {
@@ -588,6 +588,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     return this.eventSeqNumQueue.peek();
   }
 
+  public BlockingQueue getEventSeqNumQueue() {
+    return eventSeqNumQueue;
+  }
+
   public boolean isReadyForPeek() {
     return !this.getPartitionedRegion().isDestroyed() && !this.isEmpty()
         && !this.eventSeqNumQueue.isEmpty() && getBucketAdvisor().isPrimary();

http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 45035d7..200640e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6338,7 +6338,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return false;
   }
 
-  protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
+  public void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
 
     if (isPdxTypesRegion()
         || event.isConcurrencyConflict() /* usually concurrent cache modification problem
*/) {

http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 9696b90..87feb21 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1133,6 +1133,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
             bucketId, prQ.getFullPath());
       }
     }
+    brq.getEventSeqNumQueue().add(key);
     addRemovedEvent(prQ, bucketId, key);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
index a60ca01..6e3dce0 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java
@@ -290,16 +290,17 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl
{
         try {
           for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
             if (!br.getBucketAdvisor().isPrimary()) {
-              AsyncEvent currentFirst = (AsyncEvent) ((BucketRegionQueue) br).firstEventSeqNum();
-              AsyncEvent lastPeek = (AsyncEvent) lastPeekedEvents.put(br, currentFirst);
+              Long currentFirst = (Long) ((BucketRegionQueue) br).firstEventSeqNum();
+              Long lastPeek = (Long) lastPeekedEvents.put(br, currentFirst);
               if (currentFirst != null && currentFirst.equals(lastPeek)) {
-                redistributeEvents(lastPeek);
+                redistributeEvents((AsyncEvent) ((BucketRegionQueue) br).optimalGet(currentFirst));
+                lastPeekedEvents.put(br, ((BucketRegionQueue) br).firstEventSeqNum());
               }
             } else {
               lastPeekedEvents.put(br, null);
             }
           }
-          Thread.sleep(10000);
+          Thread.sleep(2000);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }

http://git-wip-us.apache.org/repos/asf/geode/blob/ea3420b3/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
index 992972b..10c6888 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/PokeLuceneAsyncQueueFunction.java
@@ -52,6 +52,7 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity
{
     PartitionedRegion pr = (PartitionedRegion) ctx.getDataSet();
     Cache cache = pr.getCache();
     String queueId = (String) pr.getAttributes().getAsyncEventQueueIds().iterator().next();
+    // PR could have many AEQs, not just AEQ for lucene
     AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(queueId);
 
     // Get the GatewaySender
@@ -60,6 +61,8 @@ public class PokeLuceneAsyncQueueFunction implements Function, InternalEntity
{
     // Update the shadow key
     BucketRegion br = pr.getBucketRegion(key);
     if (br.getBucketAdvisor().isPrimary()) {
+      // only do it for primary? how about failover again to secondary?
+      // why not br.notifyGatewaySender(operation, event);
       try {
         List<ParallelGatewaySenderEventProcessor> processors =
             ((ConcurrentParallelGatewaySenderEventProcessor) sender.getEventProcessor())


Mime
View raw message