geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject incubator-geode git commit: GEODE-915: fix FORCE_INVALIDATE_EVENT - new unit test ClientServerForceInvalidateDUnitTest - ProxyRegionMap now checks for force invalidate before it throws EntryNotFound - AbstractRegionMap invalidate now does force inv
Date Fri, 01 Apr 2016 18:44:43 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 3c4b3e6d4 -> a7b9b90f8


GEODE-915: fix FORCE_INVALIDATE_EVENT
  - new unit test ClientServerForceInvalidateDUnitTest
  - ProxyRegionMap now checks for force invalidate before it throws EntryNotFound
  - AbstractRegionMap invalidate now does force invalidate unless the region is not initialized

As part of the work also did the following code cleanup work:
1. Removed the isInvalidate parameter from shouldCreateCBEvent since this method did the exact
same thing all values of this parameter.
2. Changed cacheWriteBeforeInvalidate to be serverInvalidate since all this method does is
possibly send the invalidate to the server.
3. EntryEventImpl toString will now include inhibitCacheListenerNotification if it is true
4. Removed the unused parameter invokeCallbacks and forceNewEntry from serverInvalidate


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a7b9b90f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a7b9b90f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a7b9b90f

Branch: refs/heads/develop
Commit: a7b9b90f8a7b46c488eb30b10499a1f72c57ce04
Parents: 3c4b3e6
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Wed Mar 30 15:10:29 2016 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Fri Apr 1 11:41:02 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractRegionMap.java       |  66 +--
 .../gemfire/internal/cache/EntryEventImpl.java  |   3 +
 .../gemfire/internal/cache/LocalRegion.java     |  14 +-
 .../gemfire/internal/cache/ProxyRegionMap.java  |  11 +-
 .../ClientServerForceInvalidateDUnitTest.java   | 414 +++++++++++++++++++
 5 files changed, 463 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7b9b90f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index da6b131..7b9dce7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -1624,7 +1624,7 @@ public abstract class AbstractRegionMap implements RegionMap {
               // Create an entry event only if the calling context is
               // a receipt of a TXCommitMessage AND there are callbacks installed
               // for this region
-              boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/,
isRegionReady || inRI);
+              boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
               EntryEventImpl cbEvent = createCBEvent(owner, op,
                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState, versionTag, tailKey);
               try {
@@ -1736,7 +1736,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 }
                 else {
                   try {
-                    boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady
|| inRI);
+                    boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady ||
inRI);
                     cbEvent = createCBEvent(owner, op,
                         key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState, versionTag, tailKey);
                     try {
@@ -1798,7 +1798,7 @@ public abstract class AbstractRegionMap implements RegionMap {
             if (!opCompleted) {
               // already has value set to Token.DESTROYED
               opCompleted = true;
-              boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady ||
inRI);
+              boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
               cbEvent = createCBEvent(owner, op,
                   key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo,
bridgeContext, txEntryState, versionTag, tailKey);
               try {
@@ -1888,6 +1888,14 @@ public abstract class AbstractRegionMap implements RegionMap {
    * If true then invalidates that throw EntryNotFoundException
    * or that are already invalid will first call afterInvalidate on CacheListeners. 
    * The old value on the event passed to afterInvalidate will be null.
+   * If the region is not initialized then callbacks will not be done.
+   * This property only applies to non-transactional invalidates.
+   * Transactional invalidates ignore this property.
+   * Note that empty "proxy" regions on a client will not be sent invalidates
+   * from the server unless they also set the proxy InterestPolicy to ALL.
+   * If the invalidate is not sent then this property will not cause a listener 
+   * on that client to be notified of the invalidate.
+   * A non-empty "caching-proxy" will receive invalidates from the server.
    */
   public static boolean FORCE_INVALIDATE_EVENT = Boolean.getBoolean("gemfire.FORCE_INVALIDATE_EVENT");
 
@@ -1895,9 +1903,9 @@ public abstract class AbstractRegionMap implements RegionMap {
    * If the FORCE_INVALIDATE_EVENT flag is true
    * then invoke callbacks on the given event.
    */
-  void forceInvalidateEvent(EntryEventImpl event) {
+  static void forceInvalidateEvent(EntryEventImpl event, LocalRegion owner) {
     if (FORCE_INVALIDATE_EVENT) {
-      event.invokeCallbacks(_getOwner(), false, false);
+      event.invokeCallbacks(owner, false, false);
     }
   }
   
@@ -1917,8 +1925,9 @@ public abstract class AbstractRegionMap implements RegionMap {
     boolean didInvalidate = false;
     RegionEntry invalidatedRe = null;
     boolean clearOccured = false;
-
     DiskRegion dr = owner.getDiskRegion();
+    boolean ownerIsInitialized = owner.isInitialized();
+    try {
     // Fix for Bug #44431. We do NOT want to update the region and wait
     // later for index INIT as region.clear() can cause inconsistency if
     // happened in parallel as it also does index INIT.
@@ -1967,9 +1976,8 @@ public abstract class AbstractRegionMap implements RegionMap {
                         // that's okay - when writing an invalid into a disk, the
                         // region has been cleared (including this token)
                       }
-                      forceInvalidateEvent(event);
                     } else {
-                      owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+                      owner.serverInvalidate(event);
                       if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer())
{
                         // server did not perform the invalidation, so don't leave an invalid
                         // entry here
@@ -2041,17 +2049,20 @@ public abstract class AbstractRegionMap implements RegionMap {
                 if (forceNewEntry && event.isFromServer()) {
                   // don't invoke listeners - we didn't force new entries for
                   // CCU invalidations before 7.0, and listeners don't care
-                  event.inhibitCacheListenerNotification(true);
+                  if (!FORCE_INVALIDATE_EVENT) {
+                    event.inhibitCacheListenerNotification(true);
+                  }
                 }
                 event.setRegionEntry(newRe);
-                owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+                owner.serverInvalidate(event);
                 if (!forceNewEntry && event.noVersionReceivedFromServer()) {
                   // server did not perform the invalidation, so don't leave an invalid
                   // entry here
                   return false;
                 }
                 try {
-                  if (!owner.isInitialized() && owner.getDataPolicy().withReplication())
{
+                  ownerIsInitialized = owner.isInitialized();
+                  if (!ownerIsInitialized && owner.getDataPolicy().withReplication())
{
                     final int oldSize = owner.calculateRegionEntryValueSize(newRe);
                     invalidateEntry(event, newRe, oldSize);
                   }
@@ -2111,7 +2122,8 @@ public abstract class AbstractRegionMap implements RegionMap {
             re = null;
           }
           if (re == null) {
-            if (!owner.isInitialized()) {
+            ownerIsInitialized = owner.isInitialized();
+            if (!ownerIsInitialized) {
               // when GII message arrived or processed later than invalidate
               // message, the entry should be created as placeholder
               RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner,
event.getKey(),
@@ -2139,7 +2151,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 }
        
                 // bug #43287 - send event to server even if it's not in the client (LRU
may have evicted it)
-                owner.cacheWriteBeforeInvalidate(event, true, false);
+                owner.serverInvalidate(event);
                 if (owner.concurrencyChecksEnabled) {
                   if (event.getVersionTag() == null) {
                     // server did not perform the invalidation, so don't leave an invalid
@@ -2196,11 +2208,10 @@ public abstract class AbstractRegionMap implements RegionMap {
                   if (event.getVersionTag() != null && owner.getVersionVector() !=
null) {
                     owner.getVersionVector().recordVersion((InternalDistributedMember) event.getDistributedMember(),
event.getVersionTag());
                   }
-                  forceInvalidateEvent(event);
                 }
                 else { // previous value not invalid
                   event.setRegionEntry(re);
-                  owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+                  owner.serverInvalidate(event);
                   if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer())
{
                     // server did not perform the invalidation, so don't leave an invalid
                     // entry here
@@ -2263,7 +2274,6 @@ public abstract class AbstractRegionMap implements RegionMap {
             // is in region, do nothing
           }
           if (!entryExisted) {
-            forceInvalidateEvent(event);
             owner.checkEntryNotFound(event.getKey());
           }
         } // while(retry)
@@ -2294,6 +2304,11 @@ public abstract class AbstractRegionMap implements RegionMap {
       }
     }
     return didInvalidate;
+    } finally {
+      if (ownerIsInitialized) {
+        forceInvalidateEvent(event, owner);
+      }
+    }
   }
 
   protected void invalidateNewEntry(EntryEventImpl event,
@@ -2420,7 +2435,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                     // a receipt of a TXCommitMessage AND there are callbacks
                     // installed
                     // for this region
-                    boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+                    boolean invokeCallbacks = shouldCreateCBEvent(owner, owner.isInitialized());
                     boolean cbEventInPending = false;
                     cbEvent = createCBEvent(owner, 
                         localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
@@ -2484,7 +2499,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 }
               }
               if (!opCompleted) {
-                boolean invokeCallbacks = shouldCreateCBEvent( owner, true /* isInvalidate
*/, owner.isInitialized());
+                boolean invokeCallbacks = shouldCreateCBEvent( owner, owner.isInitialized());
                 boolean cbEventInPending = false;
                 cbEvent = createCBEvent(owner, 
                     localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
@@ -2549,7 +2564,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                 // a receipt of a TXCommitMessage AND there are callbacks
                 // installed
                 // for this region
-                boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+                boolean invokeCallbacks = shouldCreateCBEvent(owner, owner.isInitialized());
                 boolean cbEventInPending = false;
                 cbEvent = createCBEvent(owner, 
                     localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, 
@@ -3326,7 +3341,7 @@ public abstract class AbstractRegionMap implements RegionMap {
     final boolean isRegionReady = owner.isInitialized();
     EntryEventImpl cbEvent = null;
     EntryEventImpl sqlfEvent = null;
-    boolean invokeCallbacks = shouldCreateCBEvent(owner, false /*isInvalidate*/, isRegionReady);
+    boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady);
     boolean cbEventInPending = false;
     cbEvent = createCBEvent(owner, putOp, key, newValue, txId, 
         txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState,
versionTag, tailKey);
@@ -3749,7 +3764,7 @@ public abstract class AbstractRegionMap implements RegionMap {
   }
   
   static boolean shouldCreateCBEvent( final LocalRegion owner, 
-      final boolean isInvalidate, final boolean isInitialized)
+      final boolean isInitialized)
   {
     LocalRegion lr = owner;
     boolean isPartitioned = lr.isUsedForPartitionedRegionBucket();
@@ -3762,17 +3777,10 @@ public abstract class AbstractRegionMap implements RegionMap {
       }*/
       lr = owner.getPartitionedRegion();
     }
-    if (isInvalidate) { // ignore shouldNotifyGatewayHub check for invalidates
-      return (isPartitioned || isInitialized)
+    return (isPartitioned || isInitialized)
           && (lr.shouldDispatchListenerEvent()
             || lr.shouldNotifyBridgeClients()
             || lr.getConcurrencyChecksEnabled());
-    } else {
-      return (isPartitioned || isInitialized)
-          && (lr.shouldDispatchListenerEvent()
-            || lr.shouldNotifyBridgeClients()
-            || lr.getConcurrencyChecksEnabled());
-    }
   }
 
   /** create a callback event for applying a transactional change to the local cache */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7b9b90f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index 6905a15..ba9ac11 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -2336,6 +2336,9 @@ public class EntryEventImpl
     if (callbacksInvoked()) { 
       buf.append(";callbacksInvoked");
     }
+    if (inhibitCacheListenerNotification()) {
+      buf.append(";inhibitCacheListenerNotification");
+    }
     if (this.versionTag != null) {
       buf.append(";version=").append(this.versionTag);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7b9b90f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 95616fe..8e30a7a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3259,9 +3259,8 @@ public class LocalRegion extends AbstractRegion
   /**
    * @since 5.7
    */
-  protected void serverInvalidate(EntryEventImpl event, boolean invokeCallbacks, 
-      boolean forceNewEntry) {
-    if (event.getOperation().isDistributed()) {
+  void serverInvalidate(EntryEventImpl event) {
+    if (event.getOperation().isDistributed() && !event.isOriginRemote()) {
       ServerRegionProxy mySRP = getServerProxy();
       if (mySRP != null) {
         mySRP.invalidate(event);
@@ -3382,15 +3381,6 @@ public class LocalRegion extends AbstractRegion
   }
 
   /**
-   * @since 5.7
-   */
-  void cacheWriteBeforeInvalidate(EntryEventImpl event, boolean invokeCallbacks, boolean
forceNewEntry) {
-    if (!event.getOperation().isLocal() && !event.isOriginRemote()) {
-      serverInvalidate(event, invokeCallbacks, forceNewEntry);
-    }
-  }
-
-  /**
    * @see DistributedRegion#cacheWriteBeforePut(EntryEventImpl, Set, CacheWriter, boolean,
Object)
    * @param event
    * @param netWriteRecipients

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7b9b90f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index fb5596c..7b4504d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -187,9 +187,12 @@ final class ProxyRegionMap implements RegionMap {
       throws EntryNotFoundException {
     
     if (event.getOperation().isLocal()) {
+      if (this.owner.isInitialized()) {
+        AbstractRegionMap.forceInvalidateEvent(event, this.owner);
+      }
       throw new EntryNotFoundException(event.getKey().toString());
     }
-    this.owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+    this.owner.serverInvalidate(event);
     this.owner.recordEvent(event);
     this.owner.basicInvalidatePart2(markerEntry, event, false /*Clear conflict occurred */,
true);
     this.owner.basicInvalidatePart3(markerEntry, event, true);
@@ -274,7 +277,7 @@ final class ProxyRegionMap implements RegionMap {
         txEvent.addDestroy(this.owner, markerEntry, key,aCallbackArgument);
       }
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
-                                                false, !inTokenMode)) {
+                                                !inTokenMode)) {
         // fix for bug 39526
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, op,
             key, null, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext,
txEntryState, versionTag, tailKey);
@@ -304,7 +307,7 @@ final class ProxyRegionMap implements RegionMap {
         txEvent.addInvalidate(this.owner, markerEntry, key, newValue,aCallbackArgument);
       }
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
-                                                true, this.owner.isInitialized())) {
+                                                this.owner.isInitialized())) {
         // fix for bug 39526
         boolean cbEventInPending = false;
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, 
@@ -338,7 +341,7 @@ final class ProxyRegionMap implements RegionMap {
         txEvent.addPut(putOp, this.owner, markerEntry, key, newValue,aCallbackArgument);
       }
       if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
-                                                false, this.owner.isInitialized())) {
+                                                this.owner.isInitialized())) {
         // fix for bug 39526
         boolean cbEventInPending = false;
         EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, putOp, key, 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7b9b90f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
new file mode 100644
index 0000000..ce411d8
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+import static com.jayway.awaitility.Awaitility.with;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.InterestPolicy;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.AbstractRegionMap;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * Tests client server FORCE_INVALIDATE
+ */
+public class ClientServerForceInvalidateDUnitTest extends CacheTestCase
+{
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger logger = LogService.getLogger();
+
+  private static Region<String, String> region1;
+
+  private static final String REGION_NAME1 = "ClientServerForceInvalidateDUnitTest_region1";
+
+  private static Host host;
+
+  private static VM server1;
+  private static VM server2;
+
+  /** constructor */
+  public ClientServerForceInvalidateDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public final void postSetUp() throws Exception
+  {
+    host = Host.getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+  }
+
+  private int initServerCache(VM vm, boolean concurrencyChecksEnabled, boolean partitioned)
{
+    return vm.invoke(() -> createServerCache(concurrencyChecksEnabled, partitioned, 0));
+  }
+
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecks() throws Exception {
+    dotestForceInvalidate(true, true, false, true);
+  }
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnServer() throws
Exception {
+    dotestForceInvalidate(true, false, false, true);
+  }
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnClient() throws
Exception {
+    dotestForceInvalidate(false, true, false, true);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecks() throws Exception {
+    dotestForceInvalidate(true, true, true, true);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnServer() throws Exception
{
+    dotestForceInvalidate(true, false, true, true);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnClient() throws Exception
{
+    dotestForceInvalidate(false, true, true, true);
+  }
+  public void testForceInvalidateOnCachingProxyWithConcurrencyChecksServerReplicated() throws
Exception {
+    dotestForceInvalidate(true, true, false, false);
+  }
+  public void testForceInvalidateOnProxyWithConcurrencyChecksServerReplicated() throws Exception
{
+    dotestForceInvalidate(true, true, true, false);
+  }
+
+  /**
+   * 1. create an entry
+   * 2. Install a observer to pause sending subscription events to the client
+   * 3. invalidate the entry from the server (it will be done on server but pause
+   *    prevents it from being sent to the client).
+   * 4. verify that afterInvalidate was invoked on the server.
+   * 5. change the same entry (do a put). Both the client and server now have the
+   *    latest version which is this update.
+   * 6. unpause the observer so that it now sends invalidate event to client.
+   *    It will arrive late and not be done because of concurrency checks.
+   * 7. verify that afterInvalidate was invoked on the client.
+   */
+  public void testInvalidateLosingOnConcurrencyChecks() throws Exception {
+    try {
+      setupServerAndClientVMs(true, true, false, false);
+      final String key = "delayInvalidate";
+      region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+      region1.put(key, "1000");
+      logger.info("installing observers");
+      server1.invoke(() -> installObserver());
+      server2.invoke(() -> installObserver());
+
+      server2.invoke(() -> invalidateOnServer(key));
+
+      validateServerListenerInvoked();
+
+      logger.info("putting a new value 1001");
+      region1.put(key, "1001");
+      logger.info("UnPausing observers");
+      server1.invoke(() -> unpauseObserver());
+      server2.invoke(() -> unpauseObserver());
+
+      waitForClientInvalidate();
+
+    } finally {
+      server1.invoke(() -> cleanupObserver());
+      server2.invoke(() -> cleanupObserver());
+    }
+  }
+  
+  private static void installObserver() {
+    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
+    ClientServerObserverHolder.setInstance(new DelaySendingEvent());
+  }
+  
+  private static void unpauseObserver() {
+    DelaySendingEvent observer = (DelaySendingEvent) ClientServerObserverHolder.getInstance();
+    observer.latch.countDown();
+  }
+  
+  private static void cleanupObserver() {
+    CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
+    ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
+  }
+
+  private static void invalidateOnServer(final Object key) {
+    Region<?,?> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+    r.invalidate(key);
+  }
+  private static void createOnServer(final Object key, final Object value) {
+    @SuppressWarnings("unchecked")
+    Region<Object, Object> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+    r.create(key, value);
+  }
+  
+  private void waitForClientInvalidate() {
+    with().pollInterval(10, TimeUnit.MILLISECONDS).await().atMost(20, TimeUnit.SECONDS)
+      .until(() -> hasClientListenerAfterInvalidateBeenInvoked());
+  }
+
+  static class DelaySendingEvent extends ClientServerObserverAdapter {
+    CountDownLatch latch = new CountDownLatch(1);
+    @Override
+    public void afterMessageCreation(Message msg) {
+      try {
+        logger.info("waiting in DelaySendingEvent...");
+        latch.await();
+        logger.info("finished waiting in DelaySendingEvent");
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * 1. Invalidate a non-existent entry from the server.
+   * 2. Validate that the servers see after invalidate.
+   * 3. Validate that the subscribed client invokes after invalidate.
+   */
+  private void dotestForceInvalidate(boolean concurrencyChecksOnServer, boolean concurrencyChecksOnClient,
boolean clientEmpty, boolean serverPartitioned) throws Exception {
+    setupServerAndClientVMs(concurrencyChecksOnServer, concurrencyChecksOnClient, clientEmpty,
serverPartitioned);
+
+    server2.invoke(() -> createOnServer("key", "value"));
+    region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+    server2.invoke(() -> invalidateOnServer("key"));
+
+    validateServerListenerInvoked();
+    waitForClientInvalidate();
+  }
+
+  private void setupServerAndClientVMs(boolean concurrencyChecksOnServer, boolean concurrencyChecksOnClient,
boolean clientEmpty, boolean serverPartitioned) throws Exception {
+    int port1 = initServerCache(server1, concurrencyChecksOnServer, serverPartitioned); //
vm0
+    int port2 = initServerCache(server2, concurrencyChecksOnServer, serverPartitioned); //
vm1
+    String serverName = NetworkUtils.getServerHostName(Host.getHost(0));
+    createClientCache(serverName, port1, port2, clientEmpty, concurrencyChecksOnClient);
+    logger.info("testing force invalidate on on client");
+  }
+
+  private void validateServerListenerInvoked() {
+    boolean listenerInvoked = server1.invoke(() -> validateOnServer())
+        || server2.invoke(() -> validateOnServer());
+    assertTrue(listenerInvoked);
+  }
+  
+  private static boolean validateOnServer() {
+    Region<?,?> region = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+    CacheListener<?,?>[] listeners = region.getAttributes().getCacheListeners();
+    for (CacheListener<?,?> listener : listeners) {
+      if (listener instanceof ServerListener) {
+        ServerListener serverListener = (ServerListener) listener;
+        if (serverListener.afterInvalidateInvoked) {
+          return true;
+        }
+      }
+    }
+    return false;
+    
+  }
+
+  private boolean hasClientListenerAfterInvalidateBeenInvoked() {
+    Region<?,?> region = getCache().getRegion(REGION_NAME1);
+    CacheListener<?,?>[] listeners = region.getAttributes().getCacheListeners();
+    for (CacheListener<?,?> listener : listeners) {
+      if (listener instanceof ClientListener) {
+        ClientListener clientListener = (ClientListener) listener;
+        if (clientListener.afterInvalidateInvoked) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  private static Integer createServerCache(Boolean concurrencyChecksEnabled, Boolean partitioned,
Integer maxThreads)
+  throws Exception {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
+    Properties props = new Properties();
+    Cache cache = new ClientServerForceInvalidateDUnitTest("temp").createCacheV(props);
+    RegionFactory<String, String> factory = cache.createRegionFactory();
+    if (partitioned) {
+      factory.setDataPolicy(DataPolicy.PARTITION);
+      factory.setPartitionAttributes(new PartitionAttributesFactory<String, String>()
+          .setRedundantCopies(0)
+          .setTotalNumBuckets(251)
+          .create());
+    } else {
+      factory.setDataPolicy(DataPolicy.REPLICATE);
+    }
+    factory.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
+    factory.addCacheListener(new ServerListener());
+    Region<String, String> r1 = factory.create(REGION_NAME1);
+    assertNotNull(r1);
+
+    CacheServer server = cache.addCacheServer();
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    logger.info("Starting server on port " + port);
+    server.setPort(port);
+    server.setMaxThreads(maxThreads.intValue());
+    server.start();
+    logger.info("Started server on port " + server.getPort());
+    return new Integer(server.getPort());
+
+  }
+  
+  public static void createClientCache(String h, int port1, int port2, boolean empty, boolean
concurrenctChecksEnabled)
+  throws Exception  {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
+    Properties props = new Properties();
+    props.setProperty("mcast-port", "0");
+    props.setProperty("locators", "");
+    Cache cache = new ClientServerForceInvalidateDUnitTest("temp").createCacheV(props);
+    PoolImpl p = (PoolImpl)PoolManager.createFactory()
+      .addServer(h, port1)
+      .addServer(h, port2)
+      .setSubscriptionEnabled(true)
+      .setThreadLocalConnections(true)
+      .setReadTimeout(1000)
+      .setSocketBufferSize(32768)
+      .setMinConnections(3)
+      .setSubscriptionRedundancy(-1)
+      .setPingInterval(2000)
+      .create("ClientServerForceInvalidateDUnitTestPool");
+
+    RegionFactory<String, String> factory = cache.createRegionFactory();
+    if (empty) {
+      factory.setDataPolicy(DataPolicy.EMPTY);
+      factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+    } else {
+      factory.setDataPolicy(DataPolicy.NORMAL);
+    }
+    factory.setPoolName(p.getName());
+    factory.setConcurrencyChecksEnabled(concurrenctChecksEnabled);
+    region1 = factory.create(REGION_NAME1);
+    region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+    region1.getAttributesMutator().addCacheListener(new ClientListener());
+    assertNotNull(region1);
+    with().pollDelay(1, TimeUnit.MILLISECONDS).pollInterval(1, TimeUnit.SECONDS).await().atMost(60,
TimeUnit.SECONDS)
+    .until(() -> poolReady(p));
+  }
+  
+  private static boolean poolReady(final PoolImpl pool) {
+    try {
+      Connection conn = pool.acquireConnection();
+      if (conn == null) {
+        //excuse = "acquireConnection returned null?";
+        return false;
+      }
+      return true;
+    } catch (NoAvailableServersException e) {
+      //excuse = "Cannot find a server: " + e;
+      return false;
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private Cache createCacheV(Properties props) throws Exception
+  {
+    DistributedSystem ds = getSystem(props);
+    assertNotNull(ds);
+    ds.disconnect();
+    ds = getSystem(props);
+    Cache cache = getCache();
+    assertNotNull(cache);
+    return cache;
+  }
+
+  static class ClientListener extends CacheListenerAdapter<String, String> {
+    public boolean afterInvalidateInvoked;
+    @Override
+      public void afterCreate(EntryEvent<String, String> event) {
+        super.afterCreate(event);
+        logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+      }
+      @Override
+      public void afterUpdate(EntryEvent<String, String> event) {
+        super.afterUpdate(event);
+        logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+      }
+      @Override
+      public void afterInvalidate(final EntryEvent<String, String> event) {
+        super.afterInvalidate(event);
+        afterInvalidateInvoked = true;
+        String prefix = "";
+        if (!event.isOriginRemote()) {
+          prefix = "    ";
+        }
+        logger.info(prefix + "afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+
+      }
+  }
+  static class ServerListener extends CacheListenerAdapter<String, String> {
+    boolean afterInvalidateInvoked;
+    @Override
+    public void afterCreate(EntryEvent<String, String> event) {
+      super.afterCreate(event);
+      logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+    }
+    @Override
+    public void afterUpdate(EntryEvent<String, String> event) {
+      super.afterUpdate(event);
+      logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+    }
+    @Override
+    public void afterInvalidate(EntryEvent<String, String> event) {
+      super.afterInvalidate(event);
+      afterInvalidateInvoked = true;
+      logger.info("afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue()
+ "} at=" + System.currentTimeMillis());
+    }
+  }
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    // close the clients first
+    closeForceInvalidateCache();
+    // then close the servers
+    server1.invoke(() -> closeForceInvalidateCache());
+    server2.invoke(() -> closeForceInvalidateCache());
+  }
+
+  @SuppressWarnings("deprecation")
+  private static void closeForceInvalidateCache()
+  {
+    AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
+    Cache cache = new ClientServerForceInvalidateDUnitTest("temp").getCache();
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+    }
+  }
+
+}



Mime
View raw message