geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [02/47] incubator-geode git commit: GEODE-707 cache loader not invoked on concurrent load if first load fails with an exception
Date Fri, 08 Jan 2016 18:42:03 GMT
GEODE-707 cache loader not invoked on concurrent load if first load fails with an exception

avoid setting the future to {null, versionTag} when the loader throws an
exception so that a concurrent load attempt will be allowed


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/81eafccc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/81eafccc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/81eafccc

Branch: refs/heads/feature/GEODE-714
Commit: 81eafccce3cfce2500dcce786c65de1ae5b057dc
Parents: 935b76a
Author: Bruce Schuchardt <bschuchardt@pivotal.io>
Authored: Tue Dec 29 07:54:56 2015 -0800
Committer: Bruce Schuchardt <bschuchardt@pivotal.io>
Committed: Tue Dec 29 07:54:56 2015 -0800

----------------------------------------------------------------------
 .../gemfire/internal/cache/LocalRegion.java     |  45 +++--
 .../internal/cache/PartitionedRegion.java       |  75 --------
 .../gemfire/cache30/SearchAndLoadDUnitTest.java | 177 ++++++++++++++++++-
 3 files changed, 199 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index caf07ce..2bc2f05 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -1261,7 +1261,7 @@ public class LocalRegion extends AbstractRegion
    * @param disableCopyOnRead if true then disable copy on read
    * @param preferCD true if the preferred result form is CachedDeserializable
    * @param clientEvent client's event, if any (for version tag retrieval)
-   * @param returnTombstones TODO
+   * @param returnTombstones whether destroyed entries should be returned
    * @param retainResult if true then the result may be a retained off-heap reference
    * @return the value for the given key
    */
@@ -1613,21 +1613,32 @@ public class LocalRegion extends AbstractRegion
         throw err;
       }
     }
-    // didn't find a future, do one more getDeserialized to catch race
-    // condition where the future was just removed by another get thread
+    // didn't find a future, do one more probe for the entry to catch a race
+    // condition where the future was just removed by another thread
     try {
-      localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD,
clientEvent, false, false/*allowReadFromHDFS*/, false);
-      // TODO verify that this method is not used for PR or BR and hence allowReadFromHDFS
does not matter
-      // stats have now been updated
-      if (localValue != null && !Token.isInvalid(localValue)) {
-        result = localValue;
-        return result;
+      boolean partitioned = this.getDataPolicy().withPartitioning();
+      if (!partitioned) {
+        localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD,
clientEvent, false, false/*allowReadFromHDFS*/, false);
+
+        // stats have now been updated
+        if (localValue != null && !Token.isInvalid(localValue)) {
+          result = localValue;
+          return result;
+        }
+        isCreate = localValue == null;
+        result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
+            localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones,
false/*allowReadFromHDFS*/);
+
+      } else {
+        
+        // This code was moved from PartitionedRegion.nonTxnFindObject().  That method has
been removed.
+        // For PRs we don't want to deserialize the value and we can't use findObjectInSystem
because
+        // it can invoke code that is transactional.
+        result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
+            localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
+        // TODO why are we not passing the client event or returnTombstones in the above
invokation?
       }
-      isCreate = localValue == null;
 
-      result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
-          localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
-      
       if (result == null && localValue != null) {
         if (localValue != Token.TOMBSTONE || returnTombstones) {
           result = localValue;
@@ -1636,8 +1647,12 @@ public class LocalRegion extends AbstractRegion
       // findObjectInSystem does not call conditionalCopy
     }
     finally {
-      VersionTag tag = (clientEvent==null)? null : clientEvent.getVersionTag();
-      thisFuture.set(new Object[]{result, tag});
+      if (result != null) {
+        VersionTag tag = (clientEvent==null)? null : clientEvent.getVersionTag();
+        thisFuture.set(new Object[]{result, tag});
+      } else {
+        thisFuture.set(null);
+      }
       this.getFutures.remove(keyInfo.getKey());
     }
     if (!disableCopyOnRead) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
old mode 100644
new mode 100755
index a36d719..a14e99f
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -265,9 +265,6 @@ import com.gemstone.gemfire.i18n.StringId;
  * are copied up to a configurable level (for high availability) and placed on
  * multiple VMs for improved performance and increased storage capacity.
  * 
- * @since 5.0
- * @author Rohit Reja, Tushar Apshankar, Girish Thombare, Negi Tribhuwan, Greg
- *         Passmore, Mitch Thomas, Bruce Schuchardt
  */
 public class PartitionedRegion extends LocalRegion implements 
   CacheDistributionAdvisee, QueryExecutor {
@@ -3314,78 +3311,6 @@ public class PartitionedRegion extends LocalRegion implements
   }
 
   /**
-    * override the one in LocalRegion since we don't need to do getDeserialized.
-    */
-   @Override Object nonTxnFindObject(KeyInfo keyInfo, boolean isCreate,
-      boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD,
-      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) 
-      throws TimeoutException, CacheLoaderException
-  {
-    Object result = null;
-    FutureResult thisFuture = new FutureResult(getCancelCriterion());
-    Future otherFuture = (Future)this.getFutures.putIfAbsent(keyInfo.getKey(), thisFuture);
-    // only one thread can get their future into the map for this key at a time
-    if (otherFuture != null) {
-      try {
-        result = otherFuture.get();
-        if (result != null) {
-          if (!preferCD && result instanceof CachedDeserializable) {
-            CachedDeserializable cd = (CachedDeserializable)result;
-            // fix for bug 43023
-            if (!disableCopyOnRead && isCopyOnRead()) {
-              result = cd.getDeserializedWritableCopy(null, null);
-            } else {
-              result = cd.getDeserializedForReading();
-            }
-            
-          } else if (!disableCopyOnRead) {
-            result = conditionalCopy(result);
-          }
-          
-        //For sqlf since the deserialized value is nothing but chunk
-          // before returning the found value increase its use count
-         /* if(GemFireCacheImpl.sqlfSystem() && result instanceof Chunk) {
-            if(!((Chunk)result).use()) {
-              return null;
-            }
-          }*/
-           // what was a miss is now a hit
-          RegionEntry re = null;
-          if (isCreate) {
-            re = basicGetEntry(keyInfo.getKey());
-            updateStatsForGet(re, true);
-          }
-          return result;
-        }
-        // if value == null, try our own search/load
-      }
-      catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        // TODO check a CancelCriterion here?
-        return null;
-      }
-      catch (ExecutionException e) {
-        // unexpected since there is no background thread
-        AssertionError err = new AssertionError("unexpected exception");
-        err.initCause(err);
-        throw err;
-      }
-    }
-    try {
-      result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
-          localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
-    }
-    finally {
-      if (result instanceof Chunk) {
-        thisFuture.set(null);
-      } else {
-        thisFuture.set(result);
-      }
-      this.getFutures.remove(keyInfo.getKey());
-    }
-    return result;
-  }
-  /**
    * override the one in LocalRegion since we don't need to do getDeserialized.
    */
    @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
index b33bda2..cf9ff9c 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java
@@ -16,17 +16,17 @@
  */
 package com.gemstone.gemfire.cache30;
 
-//import com.gemstone.gemfire.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import com.gemstone.gemfire.cache.*;
 
 import dunit.*;
-//import hydra.ClientMgr;
 
 /**
  * This class tests various search load and write scenarios for distributed regions
- * @author Sudhir Menon
- *
  */
+@SuppressWarnings({"deprecation", "unchecked", "rawtypes", "serial"})
 public class SearchAndLoadDUnitTest extends CacheTestCase {
 
   static boolean loaderInvoked;
@@ -48,6 +48,10 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
   /** A <code>CacheWriter</code> used by a test */
   protected static TestCacheWriter writer;
 
+  static boolean exceptionThrown;
+  static final CountDownLatch readyForExceptionLatch = new CountDownLatch(1);
+  static final CountDownLatch loaderInvokedLatch = new CountDownLatch(1);
+
   public SearchAndLoadDUnitTest(String name) {
     super(name);
   }
@@ -171,8 +175,166 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
     });
   }
 
-  public void testNetLoadNoLoaders()
-  throws CacheException, InterruptedException {
+
+  /**
+   * This test is for a bug in which a cache loader threw an exception
+   * that caused the wrong value to be put in a Future in nonTxnFindObject.  This
+   * in turn caused a concurrent search for the object to not invoke the loader a
+   * second time.
+   * 
+   * VM0 is used to create a cache and a region having a loader that simulates the
+   * conditions that caused the bug.  One async thread then does a get() which invokes
+   * the loader.  Another async thread does a get() which reaches nonTxnFindObject
+   * and blocks waiting for the first thread's load to complete.  The loader then
+   * throws an exception that is sent back to the first thread.  The second thread
+   * should then cause the loader to be invoked again, and this time the loader will
+   * return a value.  Both threads then validate that they received the expected
+   * result.
+   */
+  public void testConcurrentLoad() throws Throwable {
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    
+    final String name = this.getUniqueName() + "Region";
+    final String objectName = "theKey";
+    final Integer value = new Integer(44);
+    final String exceptionString = "causing first cache-load to fail";
+
+    remoteLoaderInvoked = false;
+    loaderInvoked = false;
+    
+    vm0.invoke(new CacheSerializableRunnable("create region " + name + " in vm0") {
+      public void run2() {
+        remoteLoaderInvoked = false;
+        loaderInvoked = false;
+        AttributesFactory factory = new AttributesFactory();
+        factory.setScope(Scope.DISTRIBUTED_ACK);
+        factory.setConcurrencyChecksEnabled(true);
+        factory.setCacheLoader(new CacheLoader() {
+          boolean firstInvocation = true;
+          public synchronized Object load(LoaderHelper helper) {
+            System.out.println("invoked cache loader for " + helper.getKey());
+            loaderInvoked = true;
+            loaderInvokedLatch.countDown();
+            if (firstInvocation) {
+              firstInvocation = false;
+              try { 
+                // wait for both threads to be ready for the exception to be thrown
+                System.out.println("waiting for vm0t2 to be ready before throwing exception");
+                readyForExceptionLatch.await(30, TimeUnit.SECONDS);
+                // give the second thread time to get into loader code
+                Thread.sleep(5000);
+              } catch (InterruptedException e) {
+                fail("interrupted");
+              }
+              System.out.println("throwing exception");
+              exceptionThrown = true;
+              throw new RuntimeException(exceptionString);
+            }
+            System.out.println("returning value="+value);
+            return value;
+          }
+
+          public void close() {
+
+          }
+        });
+
+        Region region = createRegion(name,factory.create());
+        region.create(objectName, null);
+        addExpectedException(exceptionString);
+      }
+    });
+
+    AsyncInvocation async1 = null;
+    try {
+      async1 = vm0.invokeAsync(new CacheSerializableRunnable("Concurrently invoke the remote
loader on the same key - t1") {
+        public void run2() {
+          Region region = getCache().getRegion("root/"+name);
+  
+          getLogWriter().info("t1 is invoking get("+objectName+")");
+          try {
+            getLogWriter().info("t1 retrieved value " + region.get(objectName));
+            fail("first load should have triggered an exception");
+          } catch (RuntimeException e) {
+            if (!e.getMessage().contains(exceptionString)) {
+              throw e;
+            }
+          }
+        }
+      });
+      vm0.invoke(new CacheSerializableRunnable("Concurrently invoke the loader on the same
key - t2") {
+        public void run2() {
+          final Region region = getCache().getRegion("root/"+name);
+          final Object[] valueHolder = new Object[1];
+  
+          // wait for vm1 to cause the loader to be invoked
+          getLogWriter().info("t2 is waiting for loader to be invoked by t1");
+          try {
+            loaderInvokedLatch.await(30, TimeUnit.SECONDS);
+          } catch (InterruptedException e) {
+            fail("interrupted");
+          }
+          assertTrue(loaderInvoked);
+          
+          Thread t = new Thread("invoke get()") {
+            public void run() {
+              try {
+                valueHolder[0] = region.get(objectName);
+              } catch (RuntimeException e) {
+                valueHolder[0] = e;
+              }
+            }
+          };
+          
+          t.setDaemon(true);
+          t.start();
+          try {
+            // let the thread get to the point of blocking on vm1's Future
+            // in LocalRegion.nonTxnFindObject()
+            Thread.sleep(5000);
+          } catch (InterruptedException e) {
+            fail("interrupted");
+          }
+          
+          readyForExceptionLatch.countDown();
+          try {
+            t.join(30000);
+          } catch (InterruptedException e) {
+            fail("interrupted");
+          }
+          if (t.isAlive()) {
+            t.interrupt();
+            fail("get() operation blocked for too long - test needs some work");
+          }
+          
+          getLogWriter().info("t2 is invoking get("+objectName+")");
+          Object value = valueHolder[0];
+          if (value instanceof RuntimeException) {
+            if ( ((Exception)value).getMessage().contains(exceptionString) ) {
+              fail("second load should not have thrown an exception");
+            } else {
+              throw (RuntimeException)value;
+            }
+          } else {
+            getLogWriter().info("t2 retrieved value " + value);
+            assertNotNull(value);
+          }
+        }
+      });
+    } finally {
+      if (async1 != null) {
+        async1.join();
+        if (async1.exceptionOccurred()) {
+          throw async1.getException();
+        }
+      }
+    }
+  }
+  
+  
+  public void testNetLoadNoLoaders() throws CacheException, InterruptedException {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
@@ -318,7 +480,6 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
     VM vm2 = host.getVM(2);
     final String name = this.getUniqueName() + "-ACK";
     final String objectName = "B";
-    final Integer value = new Integer(43);
     loaderInvoked = false;
     remoteLoaderInvoked = false;
     remoteLoaderInvokedCount = 0;
@@ -369,7 +530,7 @@ public class SearchAndLoadDUnitTest extends CacheTestCase {
 
                 }
               });
-            Region region = createRegion(name,factory.create());
+            createRegion(name,factory.create());
           }
           catch (CacheException ex) {
             fail("While creating ACK region", ex);


Mime
View raw message