Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1920518CAA for ; Mon, 4 Jan 2016 17:04:46 +0000 (UTC) Received: (qmail 19703 invoked by uid 500); 4 Jan 2016 17:04:46 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 19674 invoked by uid 500); 4 Jan 2016 17:04:46 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 19665 invoked by uid 99); 4 Jan 2016 17:04:45 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jan 2016 17:04:45 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 88831C793E for ; Mon, 4 Jan 2016 17:04:45 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.78 X-Spam-Level: * X-Spam-Status: No, score=1.78 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 1S6uJ1g_tNW1 for ; Mon, 4 Jan 2016 17:04:38 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id B790E42A22 for ; Mon, 4 Jan 2016 17:04:37 +0000 (UTC) Received: (qmail 18626 invoked by uid 99); 4 Jan 2016 17:04:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Jan 2016 17:04:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C4A0E0547; Mon, 4 Jan 2016 17:04:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: klund@apache.org To: commits@geode.incubator.apache.org Date: Mon, 04 Jan 2016 17:04:38 -0000 Message-Id: <02435e4ed64d4ac8906208b704f939b6@git.apache.org> In-Reply-To: <52199321ae5e4651b3663e163667d1fa@git.apache.org> References: <52199321ae5e4651b3663e163667d1fa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/8] incubator-geode git commit: GEODE-707 cache loader not invoked on concurrent load if first load fails with an exception GEODE-707 cache loader not invoked on concurrent load if first load fails with an exception avoid setting the future to {null, versionTag} when the loader throws an exception so that a concurrent load attempt will be allowed Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/81eafccc Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/81eafccc Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/81eafccc Branch: refs/heads/feature/GEODE-715 Commit: 81eafccce3cfce2500dcce786c65de1ae5b057dc Parents: 935b76a Author: Bruce Schuchardt Authored: Tue Dec 29 07:54:56 2015 -0800 Committer: Bruce Schuchardt Committed: Tue Dec 29 07:54:56 2015 -0800 ---------------------------------------------------------------------- .../gemfire/internal/cache/LocalRegion.java | 45 +++-- .../internal/cache/PartitionedRegion.java | 75 -------- .../gemfire/cache30/SearchAndLoadDUnitTest.java | 177 ++++++++++++++++++- 3 files changed, 199 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java index caf07ce..2bc2f05 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java @@ -1261,7 +1261,7 @@ public class LocalRegion extends AbstractRegion * @param disableCopyOnRead if true then disable copy on read * @param preferCD true if the preferred result form is CachedDeserializable * @param clientEvent client's event, if any (for version tag retrieval) - * @param returnTombstones TODO + * @param returnTombstones whether destroyed entries should be returned * @param retainResult if true then the result may be a retained off-heap reference * @return the value for the given key */ @@ -1613,21 +1613,32 @@ public class LocalRegion extends AbstractRegion throw err; } } - // didn't find a future, do one more getDeserialized to catch race - // condition where the future was just removed by another get thread + // didn't find a future, do one more probe for the entry to catch a race + // condition where the future was just removed by another thread try { - localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false); - // TODO verify that this method is not used for PR or BR and hence allowReadFromHDFS does not matter - // stats have now been updated - if (localValue != null && !Token.isInvalid(localValue)) { - result = localValue; - return result; + boolean partitioned = this.getDataPolicy().withPartitioning(); + if (!partitioned) { + localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false); + + // stats have now been updated + if (localValue != null && !Token.isInvalid(localValue)) { + result = localValue; + return result; + } + isCreate = localValue == null; + result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks, + localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones, false/*allowReadFromHDFS*/); + + } else { + + // This code was moved from PartitionedRegion.nonTxnFindObject(). That method has been removed. + // For PRs we don't want to deserialize the value and we can't use findObjectInSystem because + // it can invoke code that is transactional. + result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks, + localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS); + // TODO why are we not passing the client event or returnTombstones in the above invokation? } - isCreate = localValue == null; - result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks, - localValue, disableCopyOnRead, preferCD, null, clientEvent, returnTombstones, false/*allowReadFromHDFS*/); - if (result == null && localValue != null) { if (localValue != Token.TOMBSTONE || returnTombstones) { result = localValue; @@ -1636,8 +1647,12 @@ public class LocalRegion extends AbstractRegion // findObjectInSystem does not call conditionalCopy } finally { - VersionTag tag = (clientEvent==null)? null : clientEvent.getVersionTag(); - thisFuture.set(new Object[]{result, tag}); + if (result != null) { + VersionTag tag = (clientEvent==null)? null : clientEvent.getVersionTag(); + thisFuture.set(new Object[]{result, tag}); + } else { + thisFuture.set(null); + } this.getFutures.remove(keyInfo.getKey()); } if (!disableCopyOnRead) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java old mode 100644 new mode 100755 index a36d719..a14e99f --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -265,9 +265,6 @@ import com.gemstone.gemfire.i18n.StringId; * are copied up to a configurable level (for high availability) and placed on * multiple VMs for improved performance and increased storage capacity. * - * @since 5.0 - * @author Rohit Reja, Tushar Apshankar, Girish Thombare, Negi Tribhuwan, Greg - * Passmore, Mitch Thomas, Bruce Schuchardt */ public class PartitionedRegion extends LocalRegion implements CacheDistributionAdvisee, QueryExecutor { @@ -3314,78 +3311,6 @@ public class PartitionedRegion extends LocalRegion implements } /** - * override the one in LocalRegion since we don't need to do getDeserialized. - */ - @Override Object nonTxnFindObject(KeyInfo keyInfo, boolean isCreate, - boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, - EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) - throws TimeoutException, CacheLoaderException - { - Object result = null; - FutureResult thisFuture = new FutureResult(getCancelCriterion()); - Future otherFuture = (Future)this.getFutures.putIfAbsent(keyInfo.getKey(), thisFuture); - // only one thread can get their future into the map for this key at a time - if (otherFuture != null) { - try { - result = otherFuture.get(); - if (result != null) { - if (!preferCD && result instanceof CachedDeserializable) { - CachedDeserializable cd = (CachedDeserializable)result; - // fix for bug 43023 - if (!disableCopyOnRead && isCopyOnRead()) { - result = cd.getDeserializedWritableCopy(null, null); - } else { - result = cd.getDeserializedForReading(); - } - - } else if (!disableCopyOnRead) { - result = conditionalCopy(result); - } - - //For sqlf since the deserialized value is nothing but chunk - // before returning the found value increase its use count - /* if(GemFireCacheImpl.sqlfSystem() && result instanceof Chunk) { - if(!((Chunk)result).use()) { - return null; - } - }*/ - // what was a miss is now a hit - RegionEntry re = null; - if (isCreate) { - re = basicGetEntry(keyInfo.getKey()); - updateStatsForGet(re, true); - } - return result; - } - // if value == null, try our own search/load - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // TODO check a CancelCriterion here? - return null; - } - catch (ExecutionException e) { - // unexpected since there is no background thread - AssertionError err = new AssertionError("unexpected exception"); - err.initCause(err); - throw err; - } - } - try { - result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks, - localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS); - } - finally { - if (result instanceof Chunk) { - thisFuture.set(null); - } else { - thisFuture.set(result); - } - this.getFutures.remove(keyInfo.getKey()); - } - return result; - } - /** * override the one in LocalRegion since we don't need to do getDeserialized. */ @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/81eafccc/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java index b33bda2..cf9ff9c 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SearchAndLoadDUnitTest.java @@ -16,17 +16,17 @@ */ package com.gemstone.gemfire.cache30; -//import com.gemstone.gemfire.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import com.gemstone.gemfire.cache.*; import dunit.*; -//import hydra.ClientMgr; /** * This class tests various search load and write scenarios for distributed regions - * @author Sudhir Menon - * */ +@SuppressWarnings({"deprecation", "unchecked", "rawtypes", "serial"}) public class SearchAndLoadDUnitTest extends CacheTestCase { static boolean loaderInvoked; @@ -48,6 +48,10 @@ public class SearchAndLoadDUnitTest extends CacheTestCase { /** A CacheWriter used by a test */ protected static TestCacheWriter writer; + static boolean exceptionThrown; + static final CountDownLatch readyForExceptionLatch = new CountDownLatch(1); + static final CountDownLatch loaderInvokedLatch = new CountDownLatch(1); + public SearchAndLoadDUnitTest(String name) { super(name); } @@ -171,8 +175,166 @@ public class SearchAndLoadDUnitTest extends CacheTestCase { }); } - public void testNetLoadNoLoaders() - throws CacheException, InterruptedException { + + /** + * This test is for a bug in which a cache loader threw an exception + * that caused the wrong value to be put in a Future in nonTxnFindObject. This + * in turn caused a concurrent search for the object to not invoke the loader a + * second time. + * + * VM0 is used to create a cache and a region having a loader that simulates the + * conditions that caused the bug. One async thread then does a get() which invokes + * the loader. Another async thread does a get() which reaches nonTxnFindObject + * and blocks waiting for the first thread's load to complete. The loader then + * throws an exception that is sent back to the first thread. The second thread + * should then cause the loader to be invoked again, and this time the loader will + * return a value. Both threads then validate that they received the expected + * result. + */ + public void testConcurrentLoad() throws Throwable { + + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + + final String name = this.getUniqueName() + "Region"; + final String objectName = "theKey"; + final Integer value = new Integer(44); + final String exceptionString = "causing first cache-load to fail"; + + remoteLoaderInvoked = false; + loaderInvoked = false; + + vm0.invoke(new CacheSerializableRunnable("create region " + name + " in vm0") { + public void run2() { + remoteLoaderInvoked = false; + loaderInvoked = false; + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setConcurrencyChecksEnabled(true); + factory.setCacheLoader(new CacheLoader() { + boolean firstInvocation = true; + public synchronized Object load(LoaderHelper helper) { + System.out.println("invoked cache loader for " + helper.getKey()); + loaderInvoked = true; + loaderInvokedLatch.countDown(); + if (firstInvocation) { + firstInvocation = false; + try { + // wait for both threads to be ready for the exception to be thrown + System.out.println("waiting for vm0t2 to be ready before throwing exception"); + readyForExceptionLatch.await(30, TimeUnit.SECONDS); + // give the second thread time to get into loader code + Thread.sleep(5000); + } catch (InterruptedException e) { + fail("interrupted"); + } + System.out.println("throwing exception"); + exceptionThrown = true; + throw new RuntimeException(exceptionString); + } + System.out.println("returning value="+value); + return value; + } + + public void close() { + + } + }); + + Region region = createRegion(name,factory.create()); + region.create(objectName, null); + addExpectedException(exceptionString); + } + }); + + AsyncInvocation async1 = null; + try { + async1 = vm0.invokeAsync(new CacheSerializableRunnable("Concurrently invoke the remote loader on the same key - t1") { + public void run2() { + Region region = getCache().getRegion("root/"+name); + + getLogWriter().info("t1 is invoking get("+objectName+")"); + try { + getLogWriter().info("t1 retrieved value " + region.get(objectName)); + fail("first load should have triggered an exception"); + } catch (RuntimeException e) { + if (!e.getMessage().contains(exceptionString)) { + throw e; + } + } + } + }); + vm0.invoke(new CacheSerializableRunnable("Concurrently invoke the loader on the same key - t2") { + public void run2() { + final Region region = getCache().getRegion("root/"+name); + final Object[] valueHolder = new Object[1]; + + // wait for vm1 to cause the loader to be invoked + getLogWriter().info("t2 is waiting for loader to be invoked by t1"); + try { + loaderInvokedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + fail("interrupted"); + } + assertTrue(loaderInvoked); + + Thread t = new Thread("invoke get()") { + public void run() { + try { + valueHolder[0] = region.get(objectName); + } catch (RuntimeException e) { + valueHolder[0] = e; + } + } + }; + + t.setDaemon(true); + t.start(); + try { + // let the thread get to the point of blocking on vm1's Future + // in LocalRegion.nonTxnFindObject() + Thread.sleep(5000); + } catch (InterruptedException e) { + fail("interrupted"); + } + + readyForExceptionLatch.countDown(); + try { + t.join(30000); + } catch (InterruptedException e) { + fail("interrupted"); + } + if (t.isAlive()) { + t.interrupt(); + fail("get() operation blocked for too long - test needs some work"); + } + + getLogWriter().info("t2 is invoking get("+objectName+")"); + Object value = valueHolder[0]; + if (value instanceof RuntimeException) { + if ( ((Exception)value).getMessage().contains(exceptionString) ) { + fail("second load should not have thrown an exception"); + } else { + throw (RuntimeException)value; + } + } else { + getLogWriter().info("t2 retrieved value " + value); + assertNotNull(value); + } + } + }); + } finally { + if (async1 != null) { + async1.join(); + if (async1.exceptionOccurred()) { + throw async1.getException(); + } + } + } + } + + + public void testNetLoadNoLoaders() throws CacheException, InterruptedException { Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); @@ -318,7 +480,6 @@ public class SearchAndLoadDUnitTest extends CacheTestCase { VM vm2 = host.getVM(2); final String name = this.getUniqueName() + "-ACK"; final String objectName = "B"; - final Integer value = new Integer(43); loaderInvoked = false; remoteLoaderInvoked = false; remoteLoaderInvokedCount = 0; @@ -369,7 +530,7 @@ public class SearchAndLoadDUnitTest extends CacheTestCase { } }); - Region region = createRegion(name,factory.create()); + createRegion(name,factory.create()); } catch (CacheException ex) { fail("While creating ACK region", ex);