geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject incubator-geode git commit: add FORCE_INVALIDATE_EVENT fixes to geode
Date Wed, 30 Mar 2016 22:21:59 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-915 [created] fdc61fc7e


add FORCE_INVALIDATE_EVENT fixes to geode


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fdc61fc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fdc61fc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fdc61fc7

Branch: refs/heads/feature/GEODE-915
Commit: fdc61fc7e0c540b77dcf38d3813cdf7a3036d9d2
Parents: 82faa8a
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Wed Mar 30 15:10:29 2016 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Wed Mar 30 15:10:29 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractRegionMap.java       |  66 +--
 .../gemfire/internal/cache/EntryEventImpl.java  |   3 +
 .../gemfire/internal/cache/LocalRegion.java     |  14 +-
 .../gemfire/internal/cache/ProxyRegionMap.java  |  11 +-
 .../ClientServerForceInvalidateDUnitTest.java   | 414 +++++++++++++++++++
 5 files changed, 463 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 9058984..58f535d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -1614,7 +1614,7 @@ public abstract class AbstractRegionMap implements RegionMap {
               // Create an entry event only if the calling context is
               // a receipt of a TXCommitMessage AND there are callbacks installed
               // for this region
-              boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/,
isRegionReady || inRI);
+              boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
               EntryEventImpl cbEvent = createCBEvent(owner, op,
                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState, versionTag, tailKey);
               try {
@@ -1726,7 +1726,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 }
                 else {
                   try {
-                    boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady
|| inRI);
+                    boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady ||
inRI);
                     cbEvent = createCBEvent(owner, op,
                         key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState, versionTag, tailKey);
                     try {
@@ -1788,7 +1788,7 @@ public abstract class AbstractRegionMap implements RegionMap {
             if (!opCompleted) {
               // already has value set to Token.DESTROYED
               opCompleted = true;
-              boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady ||
inRI);
+              boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
               cbEvent = createCBEvent(owner, op,
                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState, versionTag, tailKey);
               try {
@@ -1878,6 +1878,14 @@ public abstract class AbstractRegionMap implements RegionMap {
    * If true then invalidates that throw EntryNotFoundException
    * or that are already invalid will first call afterInvalidate on CacheListeners. 
    * The old value on the event passed to afterInvalidate will be null.
+   * If the region is not initialized then callbacks will not be done.
+   * This property only applies to non-transactional invalidates.
+   * Transactional invalidates ignore this property.
+   * Note that empty "proxy" regions on a client will not be sent invalidates
+   * from the server unless they also set the proxy InterestPolicy to ALL.
+   * If the invalidate is not sent then this property will not cause a listener 
+   * on that client to be notified of the invalidate.
+   * A non-empty "caching-proxy" will receive invalidates from the server.
    */
   public static boolean FORCE_INVALIDATE_EVENT = Boolean.getBoolean("gemfire.FORCE_INVALIDATE_EVENT");
 
@@ -1885,9 +1893,9 @@ public abstract class AbstractRegionMap implements RegionMap {
    * If the FORCE_INVALIDATE_EVENT flag is true
    * then invoke callbacks on the given event.
    */
-  void forceInvalidateEvent(EntryEventImpl event) {
+  static void forceInvalidateEvent(EntryEventImpl event, LocalRegion owner) {
     if (FORCE_INVALIDATE_EVENT) {
-      event.invokeCallbacks(_getOwner(), false, false);
+      event.invokeCallbacks(owner, false, false);
     }
   }
   
@@ -1907,8 +1915,9 @@ public abstract class AbstractRegionMap implements RegionMap {
     boolean didInvalidate = false;
     RegionEntry invalidatedRe = null;
     boolean clearOccured = false;
-
     DiskRegion dr = owner.getDiskRegion();
+    boolean ownerIsInitialized = owner.isInitialized();
+    try {
     // Fix for Bug #44431. We do NOT want to update the region and wait
     // later for index INIT as region.clear() can cause inconsistency if
     // happened in parallel as it also does index INIT.
@@ -1957,9 +1966,8 @@ public abstract class AbstractRegionMap implements RegionMap {
                         // that's okay - when writing an invalid into a disk, the
                         // region has been cleared (including this token)
                       }
-                      forceInvalidateEvent(event);
                     } else {
-                      owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+                      owner.serverInvalidate(event);
                       if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer())
{
                         // server did not perform the invalidation, so don't leave an invalid
                         // entry here
@@ -2031,17 +2039,20 @@ public abstract class AbstractRegionMap implements RegionMap {
                 if (forceNewEntry && event.isFromServer()) {
                   // don't invoke listeners - we didn't force new entries for
                   // CCU invalidations before 7.0, and listeners don't care
-                  event.inhibitCacheListenerNotification(true);
+                  if (!FORCE_INVALIDATE_EVENT) {
+                    event.inhibitCacheListenerNotification(true);
+                  }
                 }
                 event.setRegionEntry(newRe);
-                owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+                owner.serverInvalidate(event);
                 if (!forceNewEntry && event.noVersionReceivedFromServer()) {
                   // server did not perform the invalidation, so don't leave an invalid
                   // entry here
                   return false;
                 }
                 try {
-                  if (!owner.isInitialized() && owner.getDataPolicy().withReplication())
{
+                  ownerIsInitialized = owner.isInitialized();
+                  if (!ownerIsInitialized && owner.getDataPolicy().withReplication())
{
                     final int oldSize = owner.calculateRegionEntryValueSize(newRe);
                     invalidateEntry(event, newRe, oldSize);
                   }
@@ -2101,7 +2112,8 @@ public abstract class AbstractRegionMap implements RegionMap {
             re = null;
           }
           if (re == null) {
-            if (!owner.isInitialized()) {
+            ownerIsInitialized = owner.isInitialized();
+            if (!ownerIsInitialized) {
               // when GII message arrived or processed later than invalidate
               // message, the entry should be created as placeholder
               RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner,
event.getKey(),
@@ -2129,7 +2141,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 }
        
                 // bug #43287 - send event to server even if it's not in the client (LRU
may have evicted it)
-                owner.cacheWriteBeforeInvalidate(event, true, false);
+                owner.serverInvalidate(event);
                 if (owner.concurrencyChecksEnabled) {
                   if (event.getVersionTag() == null) {
                     // server did not perform the invalidation, so don't leave an invalid
@@ -2186,11 +2198,10 @@ public abstract class AbstractRegionMap implements RegionMap {
                   if (event.getVersionTag() != null && owner.getVersionVector() !=
null) {
                     owner.getVersionVector().recordVersion((InternalDistributedMember) event.getDistributedMember(),
event.getVersionTag());
                   }
-                  forceInvalidateEvent(event);
                 }
                 else { // previous value not invalid
                   event.setRegionEntry(re);
-                  owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+                  owner.serverInvalidate(event);
                   if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer())
{
                     // server did not perform the invalidation, so don't leave an invalid
                     // entry here
@@ -2253,7 +2264,6 @@ public abstract class AbstractRegionMap implements RegionMap {
             // is in region, do nothing
           }
           if (!entryExisted) {
-            forceInvalidateEvent(event);
             owner.checkEntryNotFound(event.getKey());
           }
         } // while(retry)
@@ -2284,6 +2294,11 @@ public abstract class AbstractRegionMap implements RegionMap {
       }
     }
     return didInvalidate;
+    } finally {
+      if (ownerIsInitialized) {
+        forceInvalidateEvent(event, owner);
+      }
+    }
   }
 
   protected void invalidateNewEntry(EntryEventImpl event,
@@ -2410,7 +2425,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                     // a receipt of a TXCommitMessage AND there are callbacks
                     // installed
                     // for this region
-                    boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+                    boolean invokeCallbacks = shouldCreateCBEvent(owner, owner.isInitialized());
                     boolean cbEventInPending = false;
                     cbEvent = createCBEvent(owner, 
                         localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
@@ -2474,7 +2489,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 }
               }
               if (!opCompleted) {
-                boolean invokeCallbacks = shouldCreateCBEvent( owner, true /* isInvalidate
*/, owner.isInitialized());
+                boolean invokeCallbacks = shouldCreateCBEvent( owner, owner.isInitialized());
                 boolean cbEventInPending = false;
                 cbEvent = createCBEvent(owner, 
                     localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
@@ -2539,7 +2554,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 // a receipt of a TXCommitMessage AND there are callbacks
                 // installed
                 // for this region
-                boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+                boolean invokeCallbacks = shouldCreateCBEvent(owner, owner.isInitialized());
                 boolean cbEventInPending = false;
                 cbEvent = createCBEvent(owner, 
                     localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, 
@@ -3316,7 +3331,7 @@ public abstract class AbstractRegionMap implements RegionMap {
     final boolean isRegionReady = owner.isInitialized();
     EntryEventImpl cbEvent = null;
     EntryEventImpl sqlfEvent = null;
-    boolean invokeCallbacks = shouldCreateCBEvent(owner, false /*isInvalidate*/, isRegionReady);
+    boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady);
     boolean cbEventInPending = false;
     cbEvent = createCBEvent(owner, putOp, key, newValue, txId, 
         txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState,
versionTag, tailKey);
@@ -3739,7 +3754,7 @@ public abstract class AbstractRegionMap implements RegionMap {
   }
   
   static boolean shouldCreateCBEvent( final LocalRegion owner, 
-      final boolean isInvalidate, final boolean isInitialized)
+      final boolean isInitialized)
   {
     LocalRegion lr = owner;
     boolean isPartitioned = lr.isUsedForPartitionedRegionBucket();
@@ -3752,17 +3767,10 @@ public abstract class AbstractRegionMap implements RegionMap {
       }*/
       lr = owner.getPartitionedRegion();
     }
-    if (isInvalidate) { // ignore shouldNotifyGatewayHub check for invalidates
-      return (isPartitioned || isInitialized)
+    return (isPartitioned || isInitialized)
           && (lr.shouldDispatchListenerEvent()
             || lr.shouldNotifyBridgeClients()
             || lr.getConcurrencyChecksEnabled());
-    } else {
-      return (isPartitioned || isInitialized)
-          && (lr.shouldDispatchListenerEvent()
-            || lr.shouldNotifyBridgeClients()
-            || lr.getConcurrencyChecksEnabled());
-    }
   }
 
   /** create a callback event for applying a transactional change to the local cache */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index dfd20ef..5935faa 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -2338,6 +2338,9 @@ public class EntryEventImpl
     if (callbacksInvoked()) { 
       buf.append(";callbacksInvoked");
     }
+    if (inhibitCacheListenerNotification()) {
+      buf.append(";inhibitCacheListenerNotification");
+    }
     if (this.versionTag != null) {
       buf.append(";version=").append(this.versionTag);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index c727a53..596a583 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3262,9 +3262,8 @@ public class LocalRegion extends AbstractRegion
   /**
    * @since 5.7
    */
-  protected void serverInvalidate(EntryEventImpl event, boolean invokeCallbacks, 
-      boolean forceNewEntry) {
-    if (event.getOperation().isDistributed()) {
+  void serverInvalidate(EntryEventImpl event) {
+    if (event.getOperation().isDistributed() && !event.isOriginRemote()) {
       ServerRegionProxy mySRP = getServerProxy();
       if (mySRP != null) {
         mySRP.invalidate(event);
@@ -3385,15 +3384,6 @@ public class LocalRegion extends AbstractRegion
   }
 
   /**
-   * @since 5.7
-   */
-  void cacheWriteBeforeInvalidate(EntryEventImpl event, boolean invokeCallbacks, boolean
forceNewEntry) {
-    if (!event.getOperation().isLocal() && !event.isOriginRemote()) {
-      serverInvalidate(event, invokeCallbacks, forceNewEntry);
-    }
-  }
-
-  /**
    * @see DistributedRegion#cacheWriteBeforePut(EntryEventImpl, Set, CacheWriter, boolean,
Object)
    * @param event
    * @param netWriteRecipients

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index dc9db46..67e13fc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -188,9 +188,12 @@ final class ProxyRegionMap implements RegionMap {
       throws EntryNotFoundException {
     
     if (event.getOperation().isLocal()) {
+      if (this.owner.isInitialized()) {
+        AbstractRegionMap.forceInvalidateEvent(event, this.owner);
+      }
       throw new EntryNotFoundException(event.getKey().toString());
     }
-    this.owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+    this.owner.serverInvalidate(event);
     this.owner.recordEvent(event);
     this.owner.basicInvalidatePart2(markerEntry, event, false /*Clear conflict occurred */,
true);
     this.owner.basicInvalidatePart3(markerEntry, event, true);
@@ -275,7 +278,7 @@ final class ProxyRegionMap implements RegionMap {
         txEvent.addDestroy(this.owner, markerEntry, key,aCallbackArgument);
       }
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
-                                                false, !inTokenMode)) {
+                                                !inTokenMode)) {
         // fix for bug 39526
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, op,
             key, null, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext,
txEntryState, versionTag, tailKey);
@@ -305,7 +308,7 @@ final class ProxyRegionMap implements RegionMap {
         txEvent.addInvalidate(this.owner, markerEntry, key, newValue,aCallbackArgument);
       }
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
-                                                true, this.owner.isInitialized())) {
+                                                this.owner.isInitialized())) {
         // fix for bug 39526
         boolean cbEventInPending = false;
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, 
@@ -339,7 +342,7 @@ final class ProxyRegionMap implements RegionMap {
         txEvent.addPut(putOp, this.owner, markerEntry, key, newValue,aCallbackArgument);
       }
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
-                                                false, this.owner.isInitialized())) {
+                                                this.owner.isInitialized())) {
         // fix for bug 39526
         boolean cbEventInPending = false;
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, putOp, key, 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
new file mode 100644
index 0000000..f7454d5
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+import static com.jayway.awaitility.Awaitility.with;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.InterestPolicy;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.AbstractRegionMap;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * Tests client server FORCE_INVALIDATE
+ */
+public class ClientServerForceInvalidateDUnitTest extends CacheTestCase
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger logger = LogService.getLogger();
+
+  private static Region<String, String> region1;
+
+  private static final String REGION_NAME1 = "ClientServerForceInvalidateDUnitTest_region1";
+
+  private static Host host;
+
+  private static VM server1;
+  private static VM server2;
+
+  /** constructor */
+  public ClientServerForceInvalidateDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception
+  {
+    super.setUp();
+    host = Host.getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+  }
+
+  private int initServerCache(VM vm, boolean concurrencyChecksEnabled, boolean partitioned)
{
+    return vm.invoke(() -> createServerCache(concurrencyChecksEnabled, partitioned, 0));
+  }
+
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecks() throws Exception {
+    dotestForceInvalidate(true, true, false, true);
+  }
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnServer() throws
Exception {
+    dotestForceInvalidate(true, false, false, true);
+  }
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnClient() throws
Exception {
+    dotestForceInvalidate(false, true, false, true);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecks() throws Exception {
+    dotestForceInvalidate(true, true, true, true);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnServer() throws Exception
{
+    dotestForceInvalidate(true, false, true, true);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnClient() throws Exception
{
+    dotestForceInvalidate(false, true, true, true);
+  }
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecksServerReplicated() throws
Exception {
+    dotestForceInvalidate(true, true, false, false);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecksServerReplicated() throws Exception
{
+    dotestForceInvalidate(true, true, true, false);
+  }
+
+  /**
+   * 1. create an entry
+   * 2. Install a observer to pause sending subscription events to the client
+   * 3. invalidate the entry from the server (it will be done on server but pause
+   *    prevents it from being sent to the client).
+   * 4. verify that afterInvalidate was invoked on the server.
+   * 5. change the same entry (do a put). Both the client and server now have the
+   *    latest version which is this update.
+   * 6. unpause the observer so that it now sends invalidate event to client.
+   *    It will arrive late and not be done because of concurrency checks.
+   * 7. verify that afterInvalidate was invoked on the client.
+   */
+  public void testInvalidateLosingOnConcurrencyChecks() throws Exception {
+    try {
+      setupServerAndClientVMs(true, true, false, false);
+      final String key = "delayInvalidate";
+      region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+      region1.put(key, "1000");
+      logger.info("installing observers");
+      server1.invoke(() -> installObserver());
+      server2.invoke(() -> installObserver());
+
+      server2.invoke(() -> invalidateOnServer(key));
+
+      validateServerListenerInvoked();
+
+      logger.info("putting a new value 1001");
+      region1.put(key, "1001");
+      logger.info("UnPausing observers");
+      server1.invoke(() -> unpauseObserver());
+      server2.invoke(() -> unpauseObserver());
+
+      waitForClientInvalidate();
+
+    } finally {
+      server1.invoke(() -> cleanupObserver());
+      server2.invoke(() -> cleanupObserver());
+    }
+  }
+  
+  private static void installObserver() {
+    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
+    ClientServerObserverHolder.setInstance(new DelaySendingEvent());
+  }
+  
+  private static void unpauseObserver() {
+    DelaySendingEvent observer = (DelaySendingEvent) ClientServerObserverHolder.getInstance();
+    observer.latch.countDown();
+  }
+  
+  private static void cleanupObserver() {
+    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
+    ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
+  }
+
+  private static void invalidateOnServer(final Object key) {
+    Region<?,?> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+    r.invalidate(key);
+  }
+  private static void createOnServer(final Object key, final Object value) {
+    @SuppressWarnings("unchecked")
+    Region<Object, Object> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+    r.create(key, value);
+  }
+  
+  private void waitForClientInvalidate() {
+    with().pollInterval(10, TimeUnit.MILLISECONDS).await().atMost(20, TimeUnit.SECONDS)
+      .until(() -> hasClientListenerAfterInvalidateBeenInvoked());
+  }
+
+  static class DelaySendingEvent extends ClientServerObserverAdapter {
+    CountDownLatch latch = new CountDownLatch(1);
+    @Override
+    public void afterMessageCreation(Message msg) {
+      try {
+        logger.info("waiting in DelaySendingEvent...");
+        latch.await();
+        logger.info("finished waiting in DelaySendingEvent");
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * 1. Invalidate a non-existent entry from the server.
+   * 2. Validate that the servers see after invalidate.
+   * 3. Validate that the subscribed client invokes after invalidate.
+   */
+  private void dotestForceInvalidate(boolean concurrencyChecksOnServer, boolean concurrencyChecksOnClient,
boolean clientEmpty, boolean serverPartitioned) throws Exception {
+    setupServerAndClientVMs(concurrencyChecksOnServer, concurrencyChecksOnClient, clientEmpty,
serverPartitioned);
+
+    server2.invoke(() -> createOnServer("key", "value"));
+    region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+    server2.invoke(() -> invalidateOnServer("key"));
+
+    validateServerListenerInvoked();
+    waitForClientInvalidate();
+  }
+
+  private void setupServerAndClientVMs(boolean concurrencyChecksOnServer, boolean concurrencyChecksOnClient,
boolean clientEmpty, boolean serverPartitioned) throws Exception {
+    int port1 = initServerCache(server1, concurrencyChecksOnServer, serverPartitioned); //
vm0
+    int port2 = initServerCache(server2, concurrencyChecksOnServer, serverPartitioned); //
vm1
+    String serverName = NetworkUtils.getServerHostName(Host.getHost(0));
+    createClientCache(serverName, port1, port2, clientEmpty, concurrencyChecksOnClient);
+    logger.info("testing force invalidate on on client");
+  }
+
+  private void validateServerListenerInvoked() {
+    boolean listenerInvoked = server1.invoke(() -> validateOnServer())
+        || server2.invoke(() -> validateOnServer());
+    assertTrue(listenerInvoked);
+  }
+  
+  private static boolean validateOnServer() {
+    Region<?,?> region = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+    CacheListener<?,?>[] listeners = region.getAttributes().getCacheListeners();
+    for (CacheListener<?,?> listener : listeners) {
+      if (listener instanceof ServerListener) {
+        ServerListener serverListener = (ServerListener) listener;
+        if (serverListener.afterInvalidateInvoked) {
+          return true;
+        }
+      }
+    }
+    return false;
+    
+  }
+
+  private boolean hasClientListenerAfterInvalidateBeenInvoked() {
+    Region<?,?> region = getCache().getRegion(REGION_NAME1);
+    CacheListener<?,?>[] listeners = region.getAttributes().getCacheListeners();
+    for (CacheListener<?,?> listener : listeners) {
+      if (listener instanceof ClientListener) {
+        ClientListener clientListener = (ClientListener) listener;
+        if (clientListener.afterInvalidateInvoked) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  private static Integer createServerCache(Boolean concurrencyChecksEnabled, Boolean partitioned,
Integer maxThreads)
+  throws Exception {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
+    Properties props = new Properties();
+    Cache cache = new ClientServerForceInvalidateDUnitTest("temp").createCacheV(props);
+    RegionFactory<String, String> factory = cache.createRegionFactory();
+    if (partitioned) {
+      factory.setDataPolicy(DataPolicy.PARTITION);
+      factory.setPartitionAttributes(new PartitionAttributesFactory<String, String>()
+          .setRedundantCopies(0)
+          .setTotalNumBuckets(251)
+          .create());
+    } else {
+      factory.setDataPolicy(DataPolicy.REPLICATE);
+    }
+    factory.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
+    factory.addCacheListener(new ServerListener());
+    Region<String, String> r1 = factory.create(REGION_NAME1);
+    assertNotNull(r1);
+
+    CacheServer server = cache.addCacheServer();
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    logger.info("Starting server on port " + port);
+    server.setPort(port);
+    server.setMaxThreads(maxThreads.intValue());
+    server.start();
+    logger.info("Started server on port " + server.getPort());
+    return new Integer(server.getPort());
+
+  }
+  
+  public static void createClientCache(String h, int port1, int port2, boolean empty, boolean
concurrenctChecksEnabled)
+  throws Exception  {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
+    Properties props = new Properties();
+    props.setProperty("mcast-port", "0");
+    props.setProperty("locators", "");
+    Cache cache = new ClientServerForceInvalidateDUnitTest("temp").createCacheV(props);
+    PoolImpl p = (PoolImpl)PoolManager.createFactory()
+      .addServer(h, port1)
+      .addServer(h, port2)
+      .setSubscriptionEnabled(true)
+      .setThreadLocalConnections(true)
+      .setReadTimeout(1000)
+      .setSocketBufferSize(32768)
+      .setMinConnections(3)
+      .setSubscriptionRedundancy(-1)
+      .setPingInterval(2000)
+      .create("ClientServerForceInvalidate2DUnitTestPool");
+
+    RegionFactory<String, String> factory = cache.createRegionFactory();
+    if (empty) {
+      factory.setDataPolicy(DataPolicy.EMPTY);
+      factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+    } else {
+      factory.setDataPolicy(DataPolicy.NORMAL);
+    }
+    factory.setPoolName(p.getName());
+    factory.setConcurrencyChecksEnabled(concurrenctChecksEnabled);
+    region1 = factory.create(REGION_NAME1);
+    region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+    region1.getAttributesMutator().addCacheListener(new ClientListener());
+    assertNotNull(region1);
+    with().pollDelay(1, TimeUnit.MILLISECONDS).pollInterval(1, TimeUnit.SECONDS).await().atMost(60,
TimeUnit.SECONDS)
+    .until(() -> poolReady(p));
+  }
+  
+  private static boolean poolReady(final PoolImpl pool) {
+    try {
+      Connection conn = pool.acquireConnection();
+      if (conn == null) {
+        //excuse = "acquireConnection returned null?";
+        return false;
+      }
+      return true;
+    } catch (NoAvailableServersException e) {
+      //excuse = "Cannot find a server: " + e;
+      return false;
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private Cache createCacheV(Properties props) throws Exception
+  {
+    DistributedSystem ds = getSystem(props);
+    assertNotNull(ds);
+    ds.disconnect();
+    ds = getSystem(props);
+    Cache cache = getCache();
+    assertNotNull(cache);
+    return cache;
+  }
+
+  static class ClientListener extends CacheListenerAdapter<String, String> {
+    public boolean afterInvalidateInvoked;
+    @Override
+      public void afterCreate(EntryEvent<String, String> event) {
+        super.afterCreate(event);
+        logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+      }
+      @Override
+      public void afterUpdate(EntryEvent<String, String> event) {
+        super.afterUpdate(event);
+        logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+      }
+      @Override
+      public void afterInvalidate(final EntryEvent<String, String> event) {
+        super.afterInvalidate(event);
+        afterInvalidateInvoked = true;
+        String prefix = "";
+        if (!event.isOriginRemote()) {
+          prefix = "    ";
+        }
+        logger.info(prefix + "afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+
+      }
+  }
+  static class ServerListener extends CacheListenerAdapter<String, String> {
+    boolean afterInvalidateInvoked;
+    @Override
+    public void afterCreate(EntryEvent<String, String> event) {
+      super.afterCreate(event);
+      logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+    }
+    @Override
+    public void afterUpdate(EntryEvent<String, String> event) {
+      super.afterUpdate(event);
+      logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+    }
+    @Override
+    public void afterInvalidate(EntryEvent<String, String> event) {
+      super.afterInvalidate(event);
+      afterInvalidateInvoked = true;
+      logger.info("afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+    }
+  }
+
+  @Override
+  protected final void postTearDownCacheTestCase() throws Exception {
+    // close the clients first
+    closeCache();
+    // then close the servers
+    server1.invoke(() -> closeCache());
+    server2.invoke(() -> closeCache());
+  }
+
+  @SuppressWarnings("deprecation")
+  public static void closeCache()
+  {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
+    Cache cache = new ClientServerForceInvalidateDUnitTest("temp").getCache();
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+    }
+  }
+
+}


Mime
View raw message