geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zho...@apache.org
Subject incubator-geode git commit: GEODE-920: lazily create HaContainer for cache server
Date Mon, 04 Apr 2016 22:50:56 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/develop 066a9d57a -> 49e3f523d


GEODE-920: lazily create HaContainer for cache server

The CacheClientNotifier instance might have been created by gateway receicver.
Then not to create the HaContainer until a cache server starts and amend
the CacheClientNotifier instance by initialize the HaContainer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/49e3f523
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/49e3f523
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/49e3f523

Branch: refs/heads/develop
Commit: 49e3f523d16389f5e84847c6dcfd6ab45427f8c2
Parents: 066a9d5
Author: zhouxh <gzhou@pivotal.io>
Authored: Fri Mar 25 10:04:35 2016 -0700
Committer: zhouxh <gzhou@pivotal.io>
Committed: Mon Apr 4 15:48:57 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/InitialImageOperation.java   |  15 +-
 .../gemfire/internal/cache/LocalRegion.java     |   3 +
 .../internal/cache/ha/HAContainerRegion.java    |   5 +
 .../cache/tier/sockets/CacheClientNotifier.java |  58 +++--
 .../cache/wan/CacheClientNotifierDUnitTest.java | 225 +++++++++++++++++++
 5 files changed, 279 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
index 9bd3faf..a72ca8e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InitialImageOperation.java
@@ -2179,9 +2179,11 @@ public class InitialImageOperation  {
         if (logger.isDebugEnabled()) {
           try {
             CacheClientNotifier ccn = CacheClientNotifier.getInstance();
-            CacheClientProxy proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy(
-                region.getName());      
-            logger.debug("Processing FilterInfo for proxy: {} : {}", proxy, msg);
+            if (ccn != null && ccn.getHaContainer() != null) {
+              CacheClientProxy proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy(
+                  region.getName());      
+              logger.debug("Processing FilterInfo for proxy: {} : {}", proxy, msg);
+            }
           } catch (Exception ex) {
             // Ignore.
           }
@@ -3705,8 +3707,13 @@ public class InitialImageOperation  {
      */
     public void registerFilters(LocalRegion region) {
       CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+
       CacheClientProxy proxy;
       try {
+        if (ccn == null || ccn.getHaContainer() == null) {
+          logger.info("Found null cache client notifier. Failed to register Filters during
HARegion GII. Region :{}", region.getName());
+          return;
+        }
         proxy = ((HAContainerWrapper)ccn.getHaContainer()).getProxy(
             region.getName());      
       } catch (Exception ex) {
@@ -3716,7 +3723,7 @@ public class InitialImageOperation  {
       }
       
       if (proxy == null) {
-        logger.info("Found null client proxy. Failed to register Filters during HARegion
GII. Region :{}", region.getName());
+        logger.info("Found null client proxy. Failed to register Filters during HARegion
GII. Region :{}, HaContainer :{}", region.getName(), ccn.getHaContainer());
         return;
       }
       

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 8e30a7a..3ff48bb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -4576,6 +4576,9 @@ public class LocalRegion extends AbstractRegion
   public void refreshEntriesFromServerKeys(Connection con, List serverKeys,
       InterestResultPolicy pol)
   {
+    if (serverKeys == null) {
+      return;
+    }
     ServerRegionProxy proxy = getServerProxy();
     if (logger.isDebugEnabled()) {
       logKeys(serverKeys, pol);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
index 3bed769..fe0c112 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HAContainerRegion.java
@@ -55,6 +55,11 @@ public class HAContainerRegion implements HAContainerWrapper {
     }
   }
 
+  public Region getMapForTest() {
+    Region region = (Region)map;
+    return region;
+  }
+
   public Object putProxy(String haName, CacheClientProxy proxy) {
     return haRegionNameToProxy.put(haName, proxy);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 6ac4690..3178b8d 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -162,6 +162,12 @@ public class CacheClientNotifier {
           messageTimeToLive, transactionTimeToLive,
           listener, overflowAttributesList, isGatewayReceiver);
     }
+    
+    if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) {
+      // Gateway receiver might have create CCN instance without HaContainer
+      // In this case, the HaContainer should be lazily created here
+      ccnSingleton.initHaContainer(overflowAttributesList);
+    }
 //    else {
 //      ccnSingleton.acceptorStats = acceptorStats;
 //      ccnSingleton.maximumMessageCount = maximumMessageCount;
@@ -1671,9 +1677,11 @@ public class CacheClientNotifier {
 
     if (noActiveServer() && ccnSingleton != null){
       ccnSingleton = null;
-      haContainer.cleanUp();
-      if (isDebugEnabled) {
-        logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
+      if (haContainer != null) {
+        haContainer.cleanUp();
+        if (isDebugEnabled) {
+          logger.debug("haContainer ({}) is now cleaned up.", haContainer.getName());
+        }
       }
       this.clearCompiledQueries();
       blackListedClients.clear();
@@ -2147,25 +2155,6 @@ public class CacheClientNotifier {
     // Set the security LogWriter
     this.securityLogWriter = (InternalLogWriter)cache.getSecurityLogger();
 
-    // Create the overflow artifacts
-    if (overflowAttributesList != null
-        && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList
-            .get(0))) {
-      haContainer = new HAContainerRegion(cache.getRegion(Region.SEPARATOR
-          + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl)cache,
-              (String)overflowAttributesList.get(0),
-              ((Integer)overflowAttributesList.get(1)).intValue(),
-              ((Integer)overflowAttributesList.get(2)).intValue(),
-              (String)overflowAttributesList.get(3),
-              (Boolean)overflowAttributesList.get(4))));
-    }
-    else {
-      haContainer = new HAContainerMap(new HashMap());
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("ha container ({}) has been created.", haContainer.getName());
-    }
-
     this.maximumMessageCount = maximumMessageCount;
     this.messageTimeToLive = messageTimeToLive;
     this.transactionTimeToLive = transactionTimeToLive;
@@ -2608,7 +2597,7 @@ public class CacheClientNotifier {
    * (in case of eviction policy "none"). In both the cases, it'll store
    * HAEventWrapper as its key and ClientUpdateMessage as its value.
    */
-  private final HAContainerWrapper haContainer;
+  private HAContainerWrapper haContainer;
 
   //   /**
   //    * The singleton <code>CacheClientNotifier</code> instance
@@ -2691,6 +2680,29 @@ public class CacheClientNotifier {
   public Map getHaContainer() {
     return haContainer;
   }
+  
+  public void initHaContainer(List overflowAttributesList) {
+    // lazily initialize haContainer in case this CCN instance was created by a gateway receiver
+    if (overflowAttributesList != null
+        && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList
+            .get(0))) {
+      haContainer = new HAContainerRegion(_cache.getRegion(Region.SEPARATOR
+          + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl)_cache,
+              (String)overflowAttributesList.get(0),
+              ((Integer)overflowAttributesList.get(1)).intValue(),
+              ((Integer)overflowAttributesList.get(2)).intValue(),
+              (String)overflowAttributesList.get(3),
+              (Boolean)overflowAttributesList.get(4))));
+    }
+    else {
+      haContainer = new HAContainerMap(new HashMap());
+    }
+    assert haContainer != null;
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("ha container ({}) has been created.", haContainer.getName());
+    }
+  }
 
   private final Set blackListedClients = new CopyOnWriteArraySet();
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/49e3f523/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
new file mode 100755
index 0000000..9557f0d
--- /dev/null
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.UserSpecifiedRegionAttributes;
+import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
+import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+public class CacheClientNotifierDUnitTest extends WANTestBase {
+  private static final int NUM_KEYS = 10;
+  
+  public CacheClientNotifierDUnitTest(String name) {
+    super(name);
+    // TODO Auto-generated constructor stub
+  }
+
+  private int createCacheServerWithCSC(VM vm, final boolean withCSC, final int capacity,

+      final String policy, final String diskStoreName) {
+    final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+
+    SerializableRunnable createCacheServer = new SerializableRunnable() {
+      @Override
+      public void run() throws Exception {
+        CacheServerImpl server = (CacheServerImpl)cache.addCacheServer();
+        server.setPort(serverPort);
+        if (withCSC) {
+          if (diskStoreName != null) {
+            DiskStore ds = cache.findDiskStore(diskStoreName);
+            if(ds == null) {
+              ds = cache.createDiskStoreFactory().create(diskStoreName);
+            }
+          }
+          ClientSubscriptionConfig csc = server.getClientSubscriptionConfig();
+          csc.setCapacity(capacity);
+          csc.setEvictionPolicy(policy);
+          csc.setDiskStoreName(diskStoreName);
+          server.setHostnameForClients("localhost");
+          //server.setGroups(new String[]{"serv"});
+        }
+        try {
+          server.start();
+        } catch (IOException e) {
+          com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e);
+        }
+      }
+    };
+    vm.invoke(createCacheServer);
+    return serverPort;
+  }
+
+  private void checkCacheServer(VM vm, final int serverPort, final boolean withCSC, final
int capacity) {
+    SerializableRunnable checkCacheServer = new SerializableRunnable() {
+
+      @Override
+      public void run() throws Exception {
+        List<CacheServer> cacheServers = ((GemFireCacheImpl)cache).getCacheServersAndGatewayReceiver();
+        CacheServerImpl server = null;
+        for (CacheServer cs:cacheServers) {
+          if (cs.getPort() == serverPort) {
+            server = (CacheServerImpl)cs;
+            break;
+          }
+        }
+        assertNotNull(server);
+        CacheClientNotifier ccn = server.getAcceptor().getCacheClientNotifier();
+        HAContainerRegion haContainer = (HAContainerRegion)ccn.getHaContainer();
+        if (server.getAcceptor().isGatewayReceiver()) {
+          assertNull(haContainer);
+          return;
+        }
+        Region internalRegion = haContainer.getMapForTest();
+        RegionAttributes ra = internalRegion.getAttributes();
+        EvictionAttributes ea = ra.getEvictionAttributes();
+        if (withCSC) {
+          assertNotNull(ea);
+          assertEquals(capacity, ea.getMaximum());
+          assertEquals(EvictionAction.OVERFLOW_TO_DISK, ea.getAction());
+        } else {
+          assertNull(ea);
+        }
+      }
+    };
+    vm.invoke(checkCacheServer);
+  }
+
+  private void closeCacheServer(VM vm, final int serverPort) {
+    SerializableRunnable stopCacheServer = new SerializableRunnable() {
+
+      @Override
+      public void run() throws Exception {
+        List<CacheServer> cacheServers = cache.getCacheServers();
+        CacheServerImpl server = null;
+        for (CacheServer cs:cacheServers) {
+          if (cs.getPort() == serverPort) {
+            server = (CacheServerImpl)cs;
+            break;
+          }
+        }
+        assertNotNull(server);
+        server.stop();
+      }
+    };
+    vm.invoke(stopCacheServer);
+  }
+
+  private void verifyRegionSize(VM vm, final int expect) {
+    SerializableRunnable verifyRegionSize = new SerializableRunnable() {
+      @Override
+      public void run() throws Exception {
+        final Region region = cache.getRegion(getTestMethodName() + "_PR");
+
+        Wait.waitForCriterion(new WaitCriterion() {
+          public boolean done() {
+            return region.size() == expect; 
+          }
+          public String description() {
+            return null;
+          }
+        }, 60000, 100, false);
+        assertEquals(expect, region.size());
+      }
+    };
+    vm.invoke(verifyRegionSize);
+  }
+  
+  /*
+   * The test will start several cache servers, including gateway receivers.
+   * Shutdown them and verify the CacheClientNofifier for each server is correct
+   */
+  public void testMultipleCacheServer() throws Exception {
+    /* test senario: */
+    /* create 1 GatewaySender on vm0 */
+    /* create 1 GatewayReceiver on vm1 */
+    /* create 2 cache servers on vm1, one with overflow. */
+    /* verify if the cache server2 still has the overflow attributes */
+    /* create 1 cache client1 on vm2 to register interest on cache server1 */
+    /* create 1 cache client2 on vm3 to register interest on cache server1 */
+    /* do some puts to GatewaySender on vm0 */
+    
+    // create sender at ln
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(
1 ));
+    
+    // create recever and cache servers will be at ny
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2,
lnPort ));
+    vm1.invoke(() -> WANTestBase.createCache( nyPort ));
+    int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    checkCacheServer(vm1, receiverPort, false, 0);
+    
+    // create PR for receiver
+    vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName()
+ "_PR", null, 1, 100, isOffHeap() ));
+    
+    // create cache server1 with overflow
+    int serverPort = createCacheServerWithCSC(vm1, true, 3, "entry", "DEFAULT");
+    checkCacheServer(vm1, serverPort, true, 3);
+    
+    // create cache server 2
+    final int serverPort2 = createCacheServerWithCSC(vm1, false, 0, null, null);
+    // Currently, only the first cache server's overflow attributes will take effect
+    // It will be enhanced in GEODE-1102
+    checkCacheServer(vm1, serverPort2, true, 3);
+    LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2);
+    
+    vm2.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName()
+ "_PR" ));
+    vm3.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName()
+ "_PR" ));
+
+    vm0.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, false, false,
null, true ));
+    vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName()
+ "_PR", "ln", 1, 100, isOffHeap() ));
+    vm0.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS ));
+    
+    /* verify */
+    verifyRegionSize(vm0, NUM_KEYS);
+    verifyRegionSize(vm1, NUM_KEYS);
+    verifyRegionSize(vm2, NUM_KEYS);
+    verifyRegionSize(vm3, NUM_KEYS);
+
+    // close a cache server, then re-test
+    closeCacheServer(vm1, serverPort2);
+
+    vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS*2 ));
+
+    /* verify */
+    verifyRegionSize(vm0, NUM_KEYS*2);
+    verifyRegionSize(vm1, NUM_KEYS*2);
+    verifyRegionSize(vm2, NUM_KEYS*2);
+    verifyRegionSize(vm3, NUM_KEYS*2);
+  }
+
+}


Mime
View raw message