geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [55/63] [abbrv] incubator-geode git commit: GEODE-1329 auto-reconnect attempts cease if kicked out during boot-up of the cache
Date Wed, 04 May 2016 22:57:55 GMT
GEODE-1329 auto-reconnect attempts cease if kicked out during boot-up of the cache

InternalDistributedSystem.reconnect() now includes cache-creation in its retry
loop and, should the cache fail to start due to a CancelException it will shut
down and try again.

While creating a new test in ReconnectDUnitTest I found problems with the
other tests in that class that I fixed.  Notably the method
getDistributedSystemProperties() wasn't returning the correct properties
for a lot of test cases because the dsProperties variable wasn't being set
by them and they were using the getCache() method.  This was causing the
current distributed system to be destroyed and a new one created with
different properties than the test wanted.  That was causing periodic
test failures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b893abe0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b893abe0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b893abe0

Branch: refs/heads/feature/GEODE-1276
Commit: b893abe094b2df73e51cdce1d0716fc984b1115c
Parents: 1400eae
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Mon May 2 14:33:35 2016 -0700
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Mon May 2 14:35:38 2016 -0700

----------------------------------------------------------------------
 .../gemfire/distributed/DistributedSystem.java  |   7 +-
 .../internal/InternalDistributedSystem.java     | 240 +++++++++----------
 .../internal/cache/DistributedRegion.java       |  11 +-
 .../internal/cache/GemFireCacheImpl.java        |  20 +-
 .../gemfire/cache30/ReconnectDUnitTest.java     | 235 ++++++++++++------
 5 files changed, 308 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b893abe0/geode-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
old mode 100644
new mode 100755
index 1de675d..3a52ee0
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/DistributedSystem.java
@@ -1594,12 +1594,17 @@ public abstract class DistributedSystem implements StatisticsFactory
{
 
       } else {
         boolean existingSystemDisconnecting = true;
-        while (!existingSystems.isEmpty() && existingSystemDisconnecting) {
+        boolean isReconnecting = false;
+        while (!existingSystems.isEmpty() && existingSystemDisconnecting &&
!isReconnecting) {
           Assert.assertTrue(existingSystems.size() == 1);
 
           InternalDistributedSystem existingSystem =
               (InternalDistributedSystem) existingSystems.get(0);
           existingSystemDisconnecting = existingSystem.isDisconnecting();
+          // a reconnecting DS will block on GemFireCache.class and a ReconnectThread
+          // holds that lock and invokes this method, so we break out of the loop
+          // if we detect this condition
+          isReconnecting = existingSystem.isReconnectingDS();
           if (existingSystemDisconnecting) {
             boolean interrupted = Thread.interrupted();
             try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b893abe0/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
old mode 100644
new mode 100755
index 3ef8e80..df85417
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/InternalDistributedSystem.java
@@ -1445,7 +1445,9 @@ public class InternalDistributedSystem
    * the attempt has been cancelled.
    */
   public boolean isReconnectCancelled() {
-    return this.reconnectCancelled;
+    synchronized(reconnectCancelledLock) {
+      return reconnectCancelled;
+    }
   }
 
   /**
@@ -2377,10 +2379,7 @@ public class InternalDistributedSystem
    * to reconnect and that failed.
    * */
   private volatile static int reconnectAttemptCounter = 0;
-  public static int getReconnectAttemptCounter() {
-    return reconnectAttemptCounter;
-  }
-  
+
   /**
    * The time at which reconnect attempts last began
    */
@@ -2420,9 +2419,21 @@ public class InternalDistributedSystem
    * this instance of the DS is now disconnected and unusable.
    */
   public boolean isReconnecting(){
-    return attemptingToReconnect || (reconnectDS != null);
+    InternalDistributedSystem rds = this.reconnectDS;
+    if (!attemptingToReconnect) {
+      return false;
+    }
+    if (reconnectCancelled) {
+      return false;
+    }
+    boolean newDsConnected = (rds == null || !rds.isConnected());
+    if (!newDsConnected) {
+      return false;
+    }
+    return true;
   }
-  
+
+
   /**
    * Returns true if we are reconnecting the distributed system
    * and this instance was created for one of the connection
@@ -2498,6 +2509,9 @@ public class InternalDistributedSystem
    */
   public boolean tryReconnect(boolean forcedDisconnect, String reason, GemFireCacheImpl oldCache)
{
     final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (this.isReconnectingDS && forcedDisconnect) {
+      return false;
+    }
     synchronized (CacheFactory.class) { // bug #51335 - deadlock with app thread trying to
create a cache
       synchronized (GemFireCacheImpl.class) {
         // bug 39329: must lock reconnectLock *after* the cache
@@ -2535,7 +2549,7 @@ public class InternalDistributedSystem
    * Returns the value for the number of time reconnect has been tried.
    * Test method used by DUnit.
    * */
-  public static int getReconnectCount(){
+  public static int getReconnectAttemptCounter() {
     return reconnectAttemptCounter;
   }
   
@@ -2590,8 +2604,6 @@ public class InternalDistributedSystem
     int maxTries = oldConfig.getMaxNumReconnectTries();
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
-    
-//    logger.info("reconnecting IDS@"+System.identityHashCode(this));
 
     if (Thread.currentThread().getName().equals("DisconnectThread")) {
       if (isDebugEnabled) {
@@ -2625,18 +2637,17 @@ public class InternalDistributedSystem
     }
     try {
       while (this.reconnectDS == null || !this.reconnectDS.isConnected()) {
-        synchronized(this.reconnectCancelledLock) {
-          if (this.reconnectCancelled) {
-            break;
-          }
+        if (isReconnectCancelled()) {
+          break;
         }
+
         if (!forcedDisconnect) {
           if (isDebugEnabled) {
             logger.debug("Max number of tries : {} and max time out : {}", maxTries, timeOut);
           }
           if(reconnectAttemptCounter >= maxTries){
             if (isDebugEnabled) {
-              logger.debug("Stopping the checkrequiredrole thread becuase reconnect : {}
reached the max number of reconnect tries : {}", reconnectAttemptCounter, maxTries);
+              logger.debug("Stopping the checkrequiredrole thread because reconnect : {}
reached the max number of reconnect tries : {}", reconnectAttemptCounter, maxTries);
             }
             throw new CacheClosedException(LocalizedStrings.InternalDistributedSystem_SOME_REQUIRED_ROLES_MISSING.toLocalizedString());
           }
@@ -2647,18 +2658,12 @@ public class InternalDistributedSystem
         }
         reconnectAttemptCounter++;
         
-        synchronized(this.reconnectCancelledLock) { 
-          if (this.reconnectCancelled) {
-            if (isDebugEnabled) {
-              logger.debug("reconnect can no longer be done because of an explicit disconnect");
-            }
-            return;
-          }
+        if (isReconnectCancelled()) {
+          return;
         }
     
         logger.info("Disconnecting old DistributedSystem to prepare for a reconnect attempt");
-//        logger.info("IDS@"+System.identityHashCode(this));
-        
+
         try {
           disconnect(true, reason, false);
         }
@@ -2667,7 +2672,6 @@ public class InternalDistributedSystem
         }
         
         try {
-  //        log.fine("waiting " + timeOut + " before reconnecting to the distributed system");
           reconnectLock.wait(timeOut);
         }
         catch (InterruptedException e) {
@@ -2675,13 +2679,9 @@ public class InternalDistributedSystem
           Thread.currentThread().interrupt();
           return;
         }
-        synchronized(this.reconnectCancelledLock) { 
-          if (this.reconnectCancelled) {
-            if (isDebugEnabled) {
-              logger.debug("reconnect can no longer be done because of an explicit disconnect");
-            }
-            return;
-          }
+
+        if (isReconnectCancelled()) {
+          return;
         }
         
     
@@ -2691,32 +2691,30 @@ public class InternalDistributedSystem
         try {
           // notify listeners of each attempt and then again after successful
           notifyReconnectListeners(this, this.reconnectDS, true);
+
           if (this.locatorDMTypeForced) {
             System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
           }
-  //        log.fine("DistributedSystem@"+System.identityHashCode(this)+" reconnecting distributed
system.  attempt #"+reconnectAttemptCounter);
+
           configProps.put(DistributionConfig.DS_RECONNECTING_NAME, Boolean.TRUE);
           if (quorumChecker != null) {
             configProps.put(DistributionConfig.DS_QUORUM_CHECKER_NAME, quorumChecker);
           }
+
           InternalDistributedSystem newDS = null;
-          synchronized(this.reconnectCancelledLock) { 
-            if (this.reconnectCancelled) {
-              if (isDebugEnabled) {
-                logger.debug("reconnect can no longer be done because of an explicit disconnect");
-              }
-              return;
-            }
+          if (isReconnectCancelled()) {
+            return;
           }
+
           try {
+
             newDS = (InternalDistributedSystem)connect(configProps);
-          } catch (DistributedSystemDisconnectedException e) {
-            synchronized(this.reconnectCancelledLock) {
-          	  if (this.reconnectCancelled) {
-          	    return;
-          	  } else {
-          	    throw e;
-          	  }
+
+          } catch (CancelException e) {
+            if (isReconnectCancelled()) {
+              return;
+            } else {
+              throw e;
             }
           } finally {
             if (newDS == null  &&  quorumChecker != null) {
@@ -2724,36 +2722,29 @@ public class InternalDistributedSystem
               quorumChecker.resume();
             }
           }
-          if (newDS != null) { // newDS will not be null here but findbugs requires this
check
-            boolean cancelled;
-            synchronized(this.reconnectCancelledLock) { 
-              cancelled = this.reconnectCancelled;
-            }
-            if (cancelled) {
-              newDS.disconnect();
-            } else {
-              this.reconnectDS = newDS;
-              newDS.isReconnectingDS = false;
-              notifyReconnectListeners(this, this.reconnectDS, false);
-            }
+
+          if (this.reconnectCancelled) {
+            newDS.disconnect();
+            continue;
           }
+
+          this.reconnectDS = newDS;
         }
         catch (SystemConnectException e) {
-          // retry;
-          if (isDebugEnabled) {
-            logger.debug("Attempt to reconnect failed with SystemConnectException");
-          }
-          if (e.getMessage().contains("Rejecting the attempt of a member using an older version")
-              || e.getMessage().contains("15806")) { // 15806 is in the message if it's been
localized to another language
+          logger.debug("Attempt to reconnect failed with SystemConnectException");
+
+          if (e.getMessage().contains("Rejecting the attempt of a member using an older version"))
{
             logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_EXCEPTION_OCCURED_WHILE_TRYING_TO_CONNECT_THE_SYSTEM_DURING_RECONNECT),
e);
             attemptingToReconnect = false;
             return;
           }
+          continue;
         }
         catch (GemFireConfigException e) {
           if (isDebugEnabled) {
             logger.debug("Attempt to reconnect failed with GemFireConfigException");
           }
+          continue;
         }
         catch (Exception ee) {
           logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_EXCEPTION_OCCURED_WHILE_TRYING_TO_CONNECT_THE_SYSTEM_DURING_RECONNECT),
ee);
@@ -2766,9 +2757,64 @@ public class InternalDistributedSystem
           }
           reconnectAttemptCounter = savNumOfTries;
         }
+
+
+        DM newDM = this.reconnectDS.getDistributionManager();
+        if ( !inhibitCacheForSQLFire && (newDM instanceof DistributionManager) )
{
+          // sqlfire will have already replayed DDL and recovered.
+          // Admin systems don't carry a cache, but for others we can now create
+          // a cache
+          if (((DistributionManager)newDM).getDMType() != DistributionManager.ADMIN_ONLY_DM_TYPE)
{
+            try {
+              CacheConfig config = new CacheConfig();
+              if (cacheXML != null) {
+                config.setCacheXMLDescription(cacheXML);
+              }
+              cache = GemFireCacheImpl.create(this.reconnectDS, config);
+
+              createAndStartCacheServers(cacheServerCreation, cache);
+
+              if (cache.getCachePerfStats().getReliableRegionsMissing() == 0){
+                reconnectAttemptCounter = 0;
+              }
+              else {
+                // this try failed. The new cache will call reconnect again
+              }
+            }
+            catch (CancelException ignor) {
+              logger.warn("Exception occured while trying to create the cache during reconnect",ignor);
+              reconnectDS.disconnect();
+              reconnectDS = null;
+            }
+            catch (Exception e) {
+              logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_EXCEPTION_OCCURED_WHILE_TRYING_TO_CREATE_THE_CACHE_DURING_RECONNECT),
e);
+            }
+          }
+        }
+
+        if (reconnectDS != null && reconnectDS.isConnected()) {
+          // make sure the new DS and cache are stable before exiting this loop
+          try {
+            Thread.sleep(config.getMemberTimeout() * 3);
+          } catch (InterruptedException e) {
+            logger.info("Reconnect thread has been interrupted - exiting");
+            Thread.currentThread().interrupt();
+            return;
+          }
+        }
+
       } // while()
+
+      if (isReconnectCancelled()) {
+        reconnectDS.disconnect();
+      } else {
+        reconnectDS.isReconnectingDS = false;
+        notifyReconnectListeners(this, this.reconnectDS, false);
+      }
+
     } finally {
       systemAttemptingReconnect = null;
+      attemptingToReconnect = false;
       if (appendToLogFile == null) {
         System.getProperties().remove(APPEND_TO_LOG_FILE);
       } else {
@@ -2783,59 +2829,18 @@ public class InternalDistributedSystem
         mbrMgr.releaseQuorumChecker(quorumChecker);
       }
     }
-    
-    boolean cancelled;
-    synchronized(this.reconnectCancelledLock) { 
-      cancelled = this.reconnectCancelled;
-    }
-    if (cancelled) {
-      if (isDebugEnabled) {
-        logger.debug("reconnect can no longer be done because of an explicit disconnect");
-      }
+
+    if (isReconnectCancelled()) {
+      logger.debug("reconnect can no longer be done because of an explicit disconnect");
       if (reconnectDS != null) {
         reconnectDS.disconnect();
       }
       attemptingToReconnect = false;
       return;
+    } else {
+      logger.info("Reconnect completed.\nNew DistributedSystem is {}\nNew Cache is {}", reconnectDS,
cache);
     }
 
-    try {
-      DM newDM = this.reconnectDS.getDistributionManager();
-      if ( !inhibitCacheForSQLFire && (newDM instanceof DistributionManager) ) {
-        // sqlfire will have already replayed DDL and recovered.
-        // Admin systems don't carry a cache, but for others we can now create
-        // a cache
-        if (((DistributionManager)newDM).getDMType() != DistributionManager.ADMIN_ONLY_DM_TYPE)
{
-          try {
-            CacheConfig config = new CacheConfig();
-            if (cacheXML != null) {
-              config.setCacheXMLDescription(cacheXML);
-            }
-            cache = GemFireCacheImpl.create(this.reconnectDS, config);
-            
-            createAndStartCacheServers(cacheServerCreation, cache);
-
-            if (cache.getCachePerfStats().getReliableRegionsMissing() == 0){
-              reconnectAttemptCounter = 0;
-              logger.info("Reconnected properly");
-            }
-            else {
-              // this try failed. The new cache will call reconnect again
-            }
-          }
-          catch (CancelException ignor) {
-              //getLogWriter().warning("Exception occured while trying to create the cache
during reconnect : "+ignor.toString());
-              throw ignor;
-              // this.reconnectDS.reconnect();
-          }
-          catch (Exception e) {
-            logger.warn(LocalizedMessage.create(LocalizedStrings.InternalDistributedSystem_EXCEPTION_OCCURED_WHILE_TRYING_TO_CREATE_THE_CACHE_DURING_RECONNECT),
e);
-          }
-        }
-      }
-    } finally {
-      attemptingToReconnect = false;
-    }
   }
 
 
@@ -3017,11 +3022,8 @@ public class InternalDistributedSystem
     }
     synchronized(this.reconnectLock) {
       InternalDistributedSystem recon = this.reconnectDS;
-//      (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("IDS.waitUntilReconnected:
reconnectCancelled = "+reconnectCancelled
-//          +"; reconnectDS="+reconnectDS);
 
-          
-      while (attemptingToReconnect && (recon == null || !recon.isConnected())) {
+      while (isReconnecting()) {
         synchronized(this.reconnectCancelledLock) {
           if (this.reconnectCancelled) {
             break;
@@ -3030,16 +3032,12 @@ public class InternalDistributedSystem
         if (time != 0) {
           this.reconnectLock.wait(sleepTime);
         }
-        if (recon == null) {
-          recon = this.reconnectDS;
-        }
         if (time == 0  ||  System.currentTimeMillis() > endTime) {
-//          (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("IDS.waitUntilReconnected
timed out");
           break;
         }
       }
-//      (new ManagerLogWriter(LogWriterImpl.FINE_LEVEL, System.out)).fine("IDS.waitUntilReconnected
finished & returning: attemptingToReconnect="
-//                +attemptingToReconnect+"; reconnectDS=" + recon);
+
+      recon = this.reconnectDS;
       return !attemptingToReconnect  &&  recon != null  &&  recon.isConnected();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b893abe0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
old mode 100644
new mode 100755
index 226d914..cc86e2c
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -944,8 +944,10 @@ public class DistributedRegion extends LocalRegion implements
   protected boolean lostReliability(final InternalDistributedMember id,
       final Set newlyMissingRoles)
   {
-    if (DistributedRegion.ignoreReconnect)
+    if (DistributedRegion.ignoreReconnect) { // test hook
       return false;
+    }
+
     boolean async = false;
     try {
       if (getMembershipAttributes().getLossAction().isReconnect()) {
@@ -998,12 +1000,11 @@ public class DistributedRegion extends LocalRegion implements
           public void run()
           {
             try {
-              // TODO: may need to check isReconnecting and checkReadiness...
-              if (logger.isDebugEnabled()) {
-                logger.debug("Reliability loss with policy of reconnect and membership thread
doing reconnect");
-              }
+              logger.debug("Reliability loss with policy of reconnect and membership thread
doing reconnect");
+
               initializationLatchAfterMemberTimeout.await();
               getSystem().tryReconnect(false, "Role Loss", getCache());
+
               synchronized (missingRequiredRoles) {
                 // any number of threads may be waiting on missingRequiredRoles
                 missingRequiredRoles.notifyAll();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b893abe0/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index c477466..96b7bbc 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -337,6 +337,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
 
   private final ConcurrentMap pathToRegion = new ConcurrentHashMap();
 
+  protected volatile boolean isInitialized = false;
   protected volatile boolean isClosing = false;
   protected volatile boolean closingGatewaySendersByShutdownAll = false;
   protected volatile boolean closingGatewayReceiversByShutdownAll = false;
@@ -1187,6 +1188,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache,
HasCachePer
         DEFAULT_CLIENT_FUNCTION_TIMEOUT);
     clientFunctionTimeout = time >= 0 ? time : DEFAULT_CLIENT_FUNCTION_TIMEOUT;
 
+    isInitialized = true;
   }
 
   /**
@@ -2344,7 +2346,15 @@ public class GemFireCacheImpl implements InternalCache, ClientCache,
HasCachePer
 
   // see Cache.waitUntilReconnected(long, TimeUnit)
   public boolean waitUntilReconnected(long time, TimeUnit units) throws InterruptedException
{
-    return this.system.waitUntilReconnected(time,  units);
+    boolean systemReconnected = this.system.waitUntilReconnected(time,  units);
+    if (!systemReconnected) {
+      return false;
+    }
+    GemFireCacheImpl cache = getInstance();
+    if (cache == null || !cache.isInitialized()) {
+      return false;
+    }
+    return true;
   }
   
   // see Cache.stopReconnecting()
@@ -2354,8 +2364,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache,
HasCachePer
   
   // see Cache.getReconnectedCache()
   public Cache getReconnectedCache() {
-    Cache c = GemFireCacheImpl.getInstance();
-    if (c == this) {
+    GemFireCacheImpl c = GemFireCacheImpl.getInstance();
+    if (c == this || !c.isInitialized()) {
       c = null;
     }
     return c;
@@ -3502,6 +3512,10 @@ public class GemFireCacheImpl implements InternalCache, ClientCache,
HasCachePer
     }
   }
 
+  public boolean isInitialized() {
+    return this.isInitialized;
+  }
+
   public boolean isClosed() {
     return this.isClosing;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b893abe0/geode-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java
old mode 100644
new mode 100755
index a4ba33d..fdbc96c
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java
@@ -16,31 +16,10 @@
  */
 package com.gemstone.gemfire.cache30;
 
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.LossAction;
-import com.gemstone.gemfire.cache.MembershipAttributes;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.RegionExistsException;
-import com.gemstone.gemfire.cache.ResumptionAction;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.Locator;
@@ -53,21 +32,18 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManage
 import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager;
 import com.gemstone.gemfire.internal.AvailablePort;
 import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
-import com.gemstone.gemfire.test.dunit.Assert;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
-import com.gemstone.gemfire.test.dunit.DistributedTestUtils;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.Invoke;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.ThreadUtils;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("serial")
 public class ReconnectDUnitTest extends CacheTestCase
@@ -75,19 +51,21 @@ public class ReconnectDUnitTest extends CacheTestCase
   static int locatorPort;
   static Locator locator;
   static DistributedSystem savedSystem;
+  static GemFireCacheImpl savedCache;
   static int locatorVMNumber = 3;
   static Thread gfshThread;
   
-  Properties dsProperties;
-  
+  static Properties dsProperties;
+  static String fileSeparator = File.separator;
+
   public ReconnectDUnitTest(String name) {
     super(name);
   }
   
   @Override
   public final void postSetUp() throws Exception {
-    this.locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
-    final int locPort = this.locatorPort;
+    locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int locPort = locatorPort;
     Host.getHost(0).getVM(locatorVMNumber)
       .invoke(new SerializableRunnable("start locator") {
       public void run() {
@@ -96,6 +74,7 @@ public class ReconnectDUnitTest extends CacheTestCase
           if (ds != null) {
             ds.disconnect();
           }
+          dsProperties = null;
           locatorPort = locPort;
           Properties props = getDistributedSystemProperties();
           locator = Locator.startLocatorAndDS(locatorPort, new File(""), props);
@@ -107,6 +86,16 @@ public class ReconnectDUnitTest extends CacheTestCase
       }
     });
 
+    SerializableRunnable setDistributedSystemProperties = new SerializableRunnable("set distributed
system properties") {
+      public void run() {
+        dsProperties = null;
+        locatorPort = locPort;
+        getDistributedSystemProperties();
+      }
+    };
+    setDistributedSystemProperties.run();
+    Invoke.invokeInEveryVM(setDistributedSystemProperties);
+
     beginCacheXml();
     createRegion("myRegion", createAtts());
     finishCacheXml("MyDisconnect");
@@ -119,11 +108,12 @@ public class ReconnectDUnitTest extends CacheTestCase
   @Override
   public Properties getDistributedSystemProperties() {
     if (dsProperties == null) {
-      dsProperties = super.getDistributedSystemProperties();
+      dsProperties = new Properties();
       dsProperties.put(DistributionConfig.MAX_WAIT_TIME_FOR_RECONNECT_NAME, "20000");
       dsProperties.put(DistributionConfig.ENABLE_NETWORK_PARTITION_DETECTION_NAME, "true");
       dsProperties.put(DistributionConfig.DISABLE_AUTO_RECONNECT_NAME, "false");
-      dsProperties.put(DistributionConfig.LOCATORS_NAME, "localHost["+this.locatorPort+"]");
+      dsProperties.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+      dsProperties.put(DistributionConfig.LOCATORS_NAME, "localHost["+locatorPort+"]");
       dsProperties.put(DistributionConfig.MCAST_PORT_NAME, "0");
       dsProperties.put(DistributionConfig.MEMBER_TIMEOUT_NAME, "1000");
       dsProperties.put(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
@@ -133,23 +123,25 @@ public class ReconnectDUnitTest extends CacheTestCase
   
   @Override
   public final void postTearDownCacheTestCase() throws Exception {
-    try {
-      Host.getHost(0).getVM(locatorVMNumber).invoke(new SerializableRunnable("stop locator")
{
-        public void run() {
-          if (locator != null) {
-            LogWriterUtils.getLogWriter().info("stopping locator " + locator);
-            locator.stop();
-          }
+    System.out.println("entering postTearDownCacheTestCase");
+    SerializableRunnable disconnect = new SerializableRunnable("disconnect and clean up")
{
+      public void run() {
+        if (savedSystem != null && savedSystem.isReconnecting()) {
+          savedSystem.stopReconnecting();
         }
-      });
-    } finally {
-      Invoke.invokeInEveryVM(new SerializableRunnable() {
-        public void run() {
-          ReconnectDUnitTest.savedSystem = null;
+        savedSystem = null;
+        savedCache = null;
+        dsProperties = null;
+        locator = null;
+        locatorPort = 0;
+        InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
+        if (ds != null) {
+          ds.disconnect();
         }
-      });
-      disconnectAllFromDS();
-    }
+      }
+    };
+    Invoke.invokeInEveryVM(disconnect);
+    disconnect.run();
   }
 
   /**
@@ -199,9 +191,9 @@ public class ReconnectDUnitTest extends CacheTestCase
         //      DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
         locatorPort = locPort;
         Properties props = getDistributedSystemProperties();
-        props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
+        props.put("cache-xml-file", xmlFileLoc+ fileSeparator +"MyDisconnect-cache.xml");
         props.put("max-num-reconnect-tries", "2");
-        props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
+//        props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
         Cache cache = new CacheFactory(props).create();
         IgnoredException.addIgnoredException("com.gemstone.gemfire.ForcedDisconnectException||Possible
loss of quorum");
         Region myRegion = cache.getRegion("root/myRegion");
@@ -249,7 +241,7 @@ public class ReconnectDUnitTest extends CacheTestCase
   /** bug #51335 - customer is also trying to recreate the cache */
   // this test is disabled due to a high failure rate during CI test runs.
   // see bug #52160
-  public void disabledtestReconnectCollidesWithApplication() throws Exception  {
+  public void testReconnectCollidesWithApplication() throws Exception  {
     doTestReconnectOnForcedDisconnect(true);
   }
   
@@ -278,7 +270,7 @@ public class ReconnectDUnitTest extends CacheTestCase
         //      DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
         locatorPort = locPort;
         Properties props = getDistributedSystemProperties();
-        props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
+        props.put("cache-xml-file", xmlFileLoc+ fileSeparator +"MyDisconnect-cache.xml");
         props.put("max-wait-time-reconnect", "1000");
         props.put("max-num-reconnect-tries", "2");
 //        props.put("log-file", "autoReconnectVM"+VM.getCurrentVMNum()+"_"+getPID()+".log");
@@ -298,7 +290,7 @@ public class ReconnectDUnitTest extends CacheTestCase
         //            DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create
region");
         locatorPort = locPort;
         final Properties props = getDistributedSystemProperties();
-        props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
+        props.put("cache-xml-file", xmlFileLoc+ fileSeparator +"MyDisconnect-cache.xml");
         props.put("max-wait-time-reconnect", "5000");
         props.put("max-num-reconnect-tries", "2");
         props.put("start-locator", "localhost["+secondLocPort+"]");
@@ -336,7 +328,10 @@ public class ReconnectDUnitTest extends CacheTestCase
 
     vm0.invoke(create1);
     DistributedMember dm = (DistributedMember)vm1.invoke(create2);
+
+    IgnoredException.addIgnoredException("ForcedDisconnectException");
     forceDisconnect(vm1);
+
     DistributedMember newdm = (DistributedMember)vm1.invoke(new SerializableCallable("wait
for reconnect(1)") {
       public Object call() {
         final DistributedSystem ds = ReconnectDUnitTest.savedSystem;
@@ -393,6 +388,7 @@ public class ReconnectDUnitTest extends CacheTestCase
         DistributedSystem newDs = InternalDistributedSystem.getAnyInstance();
         if (newDs != null) {
           LogWriterUtils.getLogWriter().warning("expected distributed system to be disconnected:
" + newDs);
+          newDs.disconnect();
           return false;
         }
         return true;
@@ -499,7 +495,7 @@ public class ReconnectDUnitTest extends CacheTestCase
         //      DebuggerSupport.waitForJavaDebugger(getLogWriter(), " about to create region");
         locatorPort = locPort;
         Properties props = getDistributedSystemProperties();
-        props.put("cache-xml-file", xmlFileLoc+"/MyDisconnect-cache.xml");
+        props.put("cache-xml-file", xmlFileLoc+ fileSeparator +"MyDisconnect-cache.xml");
         props.put("max-wait-time-reconnect", "1000");
         props.put("max-num-reconnect-tries", "2");
         ReconnectDUnitTest.savedSystem = getSystem(props);
@@ -664,17 +660,17 @@ public class ReconnectDUnitTest extends CacheTestCase
 
     SerializableRunnable roleLoss = new CacheSerializableRunnable(
         "ROLERECONNECTTESTS") {
-      public void run2() throws CacheException, RuntimeException
+      public void run2() throws RuntimeException
       {
         LogWriterUtils.getLogWriter().info("####### STARTING THE REAL TEST ##########");
         locatorPort = locPort;
         Properties props = getDistributedSystemProperties();
-        props.put("cache-xml-file", xmlFileLoc+File.separator+"RoleReconnect-cache.xml");
+        props.put("cache-xml-file", xmlFileLoc+ fileSeparator +"RoleReconnect-cache.xml");
         props.put("max-wait-time-reconnect", "200");
         final int timeReconnect = 3;
         props.put("max-num-reconnect-tries", "3");
         props.put(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
-        props.put("log-file", "roleLossVM0.log");
+//        props.put("log-file", "roleLossVM0.log");
 
         getSystem(props);
 
@@ -693,7 +689,7 @@ public class ReconnectDUnitTest extends CacheTestCase
           basicGetSystem().getLogWriter().info("<ExpectedException action=remove>"
               + "CacheClosedException" + "</ExpectedException");
         }
-        LogWriterUtils.getLogWriter().fine("roleLoss Sleeping SO call dumprun.sh");
+
         WaitCriterion ev = new WaitCriterion() {
           public boolean done() {
             return reconnectTries >= timeReconnect;
@@ -821,10 +817,7 @@ public class ReconnectDUnitTest extends CacheTestCase
           return true;
         }
         Object res = vm0.invoke(() -> ReconnectDUnitTest.reconnectTries());
-        if (((Integer)res).intValue() != 0) {
-          return true;
-        }
-        return false;
+        return ((Integer) res).intValue() != 0;
       }
       public String description() {
         return "waiting for event";
@@ -883,7 +876,7 @@ public class ReconnectDUnitTest extends CacheTestCase
           LogWriterUtils.getLogWriter().info(startupMessage);
           WaitCriterion ev = new WaitCriterion() {
             public boolean done() {
-              return ((Boolean)otherVM.invoke(() -> ReconnectDUnitTest.isInitialRolePlayerStarted())).booleanValue();
+              return otherVM.invoke(() -> ReconnectDUnitTest.isInitialRolePlayerStarted()).booleanValue();
             }
             public String description() {
               return null;
@@ -930,7 +923,7 @@ public class ReconnectDUnitTest extends CacheTestCase
           ev = new WaitCriterion() {
             String excuse;
             public boolean done() {
-              if (InternalDistributedSystem.getReconnectCount() != 0) {
+              if (InternalDistributedSystem.getReconnectAttemptCounter() != 0) {
                 excuse = "reconnectCount is " + reconnectTries
                     + " waiting for it to be zero";
                 return false;
@@ -1023,6 +1016,67 @@ public class ReconnectDUnitTest extends CacheTestCase
     }; // roleloss runnable
   }
 
+  /**
+   * auto-reconnect was found to stop attempting to reconnect and rebuild
+   * the cache if another forced-disconnect was triggered after reconnect
+   * but before cache creation was completed.  This test uses a region
+   * listener to crash the reconnecting distributed system during cache
+   * creation and asserts that it then reconnects and rebuilds the cache.
+   */
+  public void testReconnectFailsInCacheCreation() throws Exception {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    final int locPort = locatorPort;
+
+    final String xmlFileLoc = (new File(".")).getAbsolutePath();
+
+    SerializableRunnable createCache = new SerializableRunnable(
+            "Create Cache and Regions") {
+      public void run()  {
+        locatorPort = locPort;
+        final Properties props = getDistributedSystemProperties();
+        props.put("max-wait-time-reconnect", "1000");
+//        props.put("log-file", "");
+        dsProperties = props;
+        ReconnectDUnitTest.savedSystem = getSystem(props);
+        ReconnectDUnitTest.savedCache = (GemFireCacheImpl)getCache();
+        Region myRegion = createRegion("myRegion", createAtts());
+        myRegion.put("MyKey", "MyValue");
+        myRegion.getAttributesMutator().addCacheListener(new CacheKillingListener());
+      }
+    };
+
+    vm0.invoke(createCache);  // vm0 keeps the locator from losing quorum when vm1 crashes
+
+    vm1.invoke(createCache);
+    IgnoredException.addIgnoredException("DistributedSystemDisconnectedException|ForcedDisconnectException",
vm1);
+    forceDisconnect(vm1);
+
+    vm1.invoke(new SerializableRunnable("wait for reconnect") {
+      public void run() {
+        final GemFireCacheImpl cache = ReconnectDUnitTest.savedCache;
+        Wait.waitForCriterion(new WaitCriterion() {
+          public boolean done() {
+            return cache.isReconnecting();
+          }
+          public String description() {
+            return "waiting for cache to begin reconnecting";
+          }
+        }, 30000, 100, true);
+        System.out.println("entering reconnect wait for " + cache);
+        try {
+          cache.waitUntilReconnected(20, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          fail("interrupted");
+        }
+        assertNotNull(cache.getReconnectedCache());
+      }
+    });
+  }
+
   private CacheSerializableRunnable getRoleAPlayerRunnable(
       final int locPort, final String regionName, final String myKey, final String myValue,
       final String startupMessage) {
@@ -1099,7 +1153,7 @@ public class ReconnectDUnitTest extends CacheTestCase
         LogWriterUtils.getLogWriter().info("STARTED THE REQUIREDROLES CACHE");
         initialRolePlayerStarted = true;
 
-        while(!((Boolean)otherVM.invoke(() -> ReconnectDUnitTest.isInitialized())).booleanValue()){
+        while(!otherVM.invoke(() -> ReconnectDUnitTest.isInitialized()).booleanValue()){
           try{
             Thread.sleep(15);
           }catch(InterruptedException ignor){
@@ -1169,4 +1223,35 @@ public class ReconnectDUnitTest extends CacheTestCase
     return 0;
   }
 
+  /**
+   * CacheKillingListener crashes the distributed system when it is invoked
+   * for the first time.  After that it ignores any notifications.
+   */
+  public static class CacheKillingListener extends CacheListenerAdapter implements Declarable
{
+    public static int crashCount = 0;
+
+    @Override
+    public void afterRegionCreate(final RegionEvent event) {
+      if (crashCount == 0) {
+        crashCount += 1;
+        // we crash the system in a different thread than the ReconnectThread
+        // to simulate receiving a ForcedDisconnect from the membership manager
+        // in the UDP reader thread
+        Thread t = new Thread("crash reconnecting system (ReconnectDUnitTest)") {
+          public void run() {
+            System.out.println("crashing distributed system");
+            GemFireCacheImpl cache = (GemFireCacheImpl)event.getRegion().getCache();
+            MembershipManagerHelper.crashDistributedSystem(cache.getDistributedSystem());
+          }
+        };
+        t.setDaemon(true);
+        t.start();
+      }
+    }
+
+    @Override
+    public void init(Properties props) {
+    }
+
+  }
 }



Mime
View raw message