geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aging...@apache.org
Subject geode git commit: GEODE-2489: Avoid sending tombstone GC messages to peer nodes if there is no client registration
Date Wed, 22 Feb 2017 22:12:34 GMT
Repository: geode
Updated Branches:
  refs/heads/develop 76fa7cc73 -> 257c21ece


GEODE-2489: Avoid sending tombstone GC messages to peer nodes if there is no client registration

When tombstones are removed, tombstone messages are sent to region replicas; and in case of
Partitioned Region (PR) messages are also sent to peer region nodes for client events.

Currently tombstone messages meant for clients that have all the keys removed are getting
sent to peer PR nodes even though no clients are registered on those peers.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/257c21ec
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/257c21ec
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/257c21ec

Branch: refs/heads/develop
Commit: 257c21ece8ac54cf3ead2c41bfe8085b3219e3d5
Parents: 76fa7cc
Author: Anil <agingade@pivotal.io>
Authored: Thu Feb 16 17:57:46 2017 -0800
Committer: Anil <agingade@pivotal.io>
Committed: Wed Feb 22 14:10:08 2017 -0800

----------------------------------------------------------------------
 .../internal/DistributionAdvisor.java           | 22 ++++++++
 .../geode/internal/cache/BucketRegion.java      |  4 +-
 .../geode/internal/cache/TombstoneService.java  | 11 +++-
 .../cache/partitioned/PRTombstoneMessage.java   |  2 +-
 .../cache/partitioned/RegionAdvisor.java        | 20 +++++++
 .../geode/cache30/ClientServerCCEDUnitTest.java | 58 ++++++++++++++++++++
 6 files changed, 113 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/257c21ec/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index b5b9cf4..1d3dc86 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -1295,6 +1295,28 @@ public class DistributionAdvisor {
   }
 
   /**
+   * This method calls filter->include on every profile until include returns true.
+   * 
+   * @return false if all filter->include calls returns false; otherwise true.
+   **/
+  protected boolean satisfiesFilter(Filter f) {
+    initializationGate();
+    if (disabled) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Intelligent Messaging Disabled");
+      }
+      return !getDefaultDistributionMembers().isEmpty();
+    }
+    Profile[] locProfiles = this.profiles; // grab current profiles
+    for (Profile p : locProfiles) {
+      if (f.include(p)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * A visitor interface for all the available profiles used by
    * {@link DistributionAdvisor#accept(ProfileVisitor, Object)}. Unlike the {@link Filter}
class
    * this does not assume of two state visit of inclusion or exclusion rather allows manipulation
of

http://git-wip-us.apache.org/repos/asf/geode/blob/257c21ec/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 585f4e4..d92ddab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -332,9 +332,9 @@ public class BucketRegion extends DistributedRegion implements Bucket
{
       // have the filter profile ferret out all of the clients that have interest
       // in this region
       FilterProfile fp = getFilterProfile();
+      // fix for bug #46309 - don't send null/empty key set to clients
       if ((removedKeys != null && !removedKeys.isEmpty()) // bug #51877 - NPE in
clients
-          && (routing != null || fp != null)) { // fix for bug #46309 - don't send
null/empty key
-                                                // set to clients
+          && (routing != null || (fp != null && fp.hasInterest()))) {
         RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(),
             Operation.REGION_DESTROY, null, true, getMyId());
         FilterInfo clientRouting = routing;

http://git-wip-us.apache.org/repos/asf/geode/blob/257c21ec/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
index ca682bc..0df27c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TombstoneService.java
@@ -550,7 +550,7 @@ public class TombstoneService {
             DistributedRegion tr = (DistributedRegion) t.region;
             boolean tombstoneWasStillInRegionMap =
                 tr.getRegionMap().removeTombstone(t.entry, t, false, true);
-            if (tombstoneWasStillInRegionMap && tr.isUsedForPartitionedRegionBucket())
{
+            if (tombstoneWasStillInRegionMap && hasToTrackKeysForClients(tr)) {
               Set<Object> keys = reapedKeys.get(tr);
               if (keys.isEmpty()) {
                 keys = new HashSet<Object>();
@@ -589,6 +589,15 @@ public class TombstoneService {
       } // sync on deltaGIILock
     }
 
+    /**
+     * Returns true if keys needs to be tracked for clients registering interests on PR.
+     */
+    private boolean hasToTrackKeysForClients(DistributedRegion r) {
+      return r.isUsedForPartitionedRegionBucket()
+          && ((r.getFilterProfile() != null && r.getFilterProfile().hasInterest())
+              || r.getPartitionedRegion().getRegionAdvisor().hasPRServerWithInterest());
+    }
+
     @Override
     protected void checkExpiredTombstoneGC() {
       if (shouldCallExpireBatch()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/257c21ec/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
index fa82560..0e6b707 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PRTombstoneMessage.java
@@ -68,7 +68,7 @@ public final class PRTombstoneMessage extends PartitionMessageWithDirectReply
 
   public static void send(BucketRegion r, final Set<Object> keys, EventID eventID)
{
     Set<InternalDistributedMember> recipients =
-        r.getPartitionedRegion().getRegionAdvisor().adviseAllPRNodes();
+        r.getPartitionedRegion().getRegionAdvisor().adviseAllServersWithInterest();
     recipients.removeAll(r.getDistributionAdvisor().adviseReplicates());
     if (recipients.size() == 0) {
       return;

http://git-wip-us.apache.org/repos/asf/geode/blob/257c21ec/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
index 0978585..84b0aad 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java
@@ -1004,6 +1004,26 @@ public class RegionAdvisor extends CacheDistributionAdvisor {
     });
   }
 
+  public Set adviseAllServersWithInterest() {
+    return adviseFilter(new Filter() {
+      public boolean include(Profile profile) {
+        CacheProfile prof = (CacheProfile) profile;
+        return prof.hasCacheServer && prof.filterProfile.hasInterest();
+      }
+    });
+  }
+
+  private static final Filter prServerWithInterestFilter = new Filter() {
+    public boolean include(Profile profile) {
+      CacheProfile prof = (CacheProfile) profile;
+      return prof.isPartitioned && prof.hasCacheServer && prof.filterProfile.hasInterest();
+    }
+  };
+
+  public boolean hasPRServerWithInterest() {
+    return satisfiesFilter(prServerWithInterestFilter);
+  }
+
   /**
    * return the set of all members who must receive operation notifications
    * 

http://git-wip-us.apache.org/repos/asf/geode/blob/257c21ec/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
index e33074d..e749336 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/ClientServerCCEDUnitTest.java
@@ -260,6 +260,64 @@ public class ClientServerCCEDUnitTest extends JUnit4CacheTestCase {
     }
   }
 
+  public void testTombstoneGcMessagesAreOnlySentToPRNodesWithInterestRegistration() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    final String name = "Region";
+
+    createServerRegion(vm0, name, false);
+    // Create all the buckets on this vm.
+    createEntries(vm0);
+
+    createServerRegion(vm1, name, false);
+
+    int port = createServerRegion(vm2, name, false);
+
+    // Create client and register interest on one server.
+    createClientRegion(vm3, name, port, true, ClientRegionShortcut.CACHING_PROXY);
+
+    try {
+      vm1.invoke(() -> {
+        DistributionMessageObserver.setInstance(new PRTombstoneMessageObserver());
+      });
+      vm2.invoke(() -> {
+        DistributionMessageObserver.setInstance(new PRTombstoneMessageObserver());
+      });
+
+      destroyEntries(vm0);
+      forceGC(vm0);
+
+      // vm2 should receive tombstone GC messages
+      vm2.invoke(() -> {
+        PRTombstoneMessageObserver mo =
+            (PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
+        // Should receive tombstone message for each bucket.
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+          return mo.prTsMessageProcessed >= 2;
+        });
+        assertEquals("Tombstone GC message is expected.", 2, mo.prTsMessageProcessed);
+      });
+
+      // Since there is no interest registered, vm1 should not receive any tombstone GC messages
+      vm1.invoke(() -> {
+        PRTombstoneMessageObserver mo =
+            (PRTombstoneMessageObserver) DistributionMessageObserver.getInstance();
+        assertEquals("Tombstone GC message is not expected.", 0, mo.prTsMessageProcessed);
+      });
+    } finally {
+      vm1.invoke(() -> {
+        DistributionMessageObserver.setInstance(null);
+      });
+      vm2.invoke(() -> {
+        DistributionMessageObserver.setInstance(null);
+      });
+    }
+  }
+
   private class PRTombstoneMessageObserver extends DistributionMessageObserver {
     public int tsMessageProcessed = 0;
     public int prTsMessageProcessed = 0;


Mime
View raw message