Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A779318DA5 for ; Fri, 3 Jul 2015 19:23:10 +0000 (UTC) Received: (qmail 26449 invoked by uid 500); 3 Jul 2015 19:23:10 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 26409 invoked by uid 500); 3 Jul 2015 19:23:10 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 26400 invoked by uid 99); 3 Jul 2015 19:23:10 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:23:10 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 03 Jul 2015 19:20:33 +0000 Received: (qmail 18351 invoked by uid 99); 3 Jul 2015 19:21:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 19:21:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3ED71E368A; Fri, 3 Jul 2015 19:21:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rvs@apache.org To: commits@geode.incubator.apache.org Date: Fri, 03 Jul 2015 19:21:14 -0000 Message-Id: <38285747b4074b70ab47340b6f78ba77@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/51] [partial] incubator-geode git commit: SGA #2 X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java index 5c7c7bc..494efaf 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java @@ -8,6 +8,7 @@ package com.gemstone.gemfire.internal.cache; + import java.io.IOException; import java.lang.reflect.Method; import java.util.Collection; @@ -25,6 +26,7 @@ import com.gemstone.gemfire.InvalidDeltaException; import com.gemstone.gemfire.cache.CacheRuntimeException; import com.gemstone.gemfire.cache.CacheWriter; import com.gemstone.gemfire.cache.CacheWriterException; +import com.gemstone.gemfire.cache.CustomEvictionAttributes; import com.gemstone.gemfire.cache.DiskAccessException; import com.gemstone.gemfire.cache.EntryExistsException; import com.gemstone.gemfire.cache.EntryNotFoundException; @@ -56,10 +58,20 @@ import com.gemstone.gemfire.internal.cache.versions.VersionHolder; import com.gemstone.gemfire.internal.cache.versions.VersionSource; import com.gemstone.gemfire.internal.cache.versions.VersionStamp; import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; +import com.gemstone.gemfire.internal.concurrent.MapCallbackAdapter; +import com.gemstone.gemfire.internal.concurrent.MapResult; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; +import com.gemstone.gemfire.internal.offheap.OffHeapHelper; +import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper; +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl; +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk; +import com.gemstone.gemfire.internal.offheap.annotations.Released; +import com.gemstone.gemfire.internal.offheap.annotations.Retained; +import com.gemstone.gemfire.internal.offheap.annotations.Unretained; import com.gemstone.gemfire.internal.sequencelog.EntryLogger; import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap; import com.gemstone.gemfire.pdx.PdxInstance; @@ -92,6 +104,12 @@ abstract class AbstractRegionMap implements RegionMap { /** An internal Listener for index maintenance for SQLFabric. */ private final IndexUpdater indexUpdater; + /** + * This test hook is used to force the conditions for defect 48182. + * This hook is used by Bug48182JUnitTest. + */ + static Runnable testHookRunnableFor48182 = null; + private RegionEntryFactory entryFactory; private Attributes attr; private transient Object owner; // the region that owns this map @@ -122,13 +140,16 @@ abstract class AbstractRegionMap implements RegionMap { final GemFireCacheImpl cache; boolean isDisk; boolean withVersioning = false; + boolean offHeap = false; if (owner instanceof LocalRegion) { LocalRegion region = (LocalRegion)owner; isDisk = region.getDiskRegion() != null; cache = region.getGemFireCache(); withVersioning = region.getConcurrencyChecksEnabled(); + offHeap = region.getOffHeap(); } else if (owner instanceof PlaceHolderDiskRegion) { + offHeap = ((PlaceHolderDiskRegion) owner).getOffHeap(); isDisk = true; withVersioning = ((PlaceHolderDiskRegion)owner).getFlags().contains( DiskRegionFlag.IS_WITH_VERSIONING); @@ -168,21 +189,29 @@ abstract class AbstractRegionMap implements RegionMap { if (isLRU) { if (isDisk) { if (withVersioning) { - { + if (offHeap) { + factory = VersionedStatsDiskLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedStatsDiskLRURegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMStatsDiskLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VMStatsDiskLRURegionEntryHeap.getEntryFactory(); } } } else { if (withVersioning) { - { + if (offHeap) { + factory = VersionedStatsLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedStatsLRURegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMStatsLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VMStatsLRURegionEntryHeap.getEntryFactory(); } } @@ -190,21 +219,29 @@ abstract class AbstractRegionMap implements RegionMap { } else { // !isLRU if (isDisk) { if (withVersioning) { - { + if (offHeap) { + factory = VersionedStatsDiskRegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedStatsDiskRegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMStatsDiskRegionEntryOffHeap.getEntryFactory(); + } else { factory = VMStatsDiskRegionEntryHeap.getEntryFactory(); } } } else { if (withVersioning) { - { + if (offHeap) { + factory = VersionedStatsRegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedStatsRegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMStatsRegionEntryOffHeap.getEntryFactory(); + } else { factory = VMStatsRegionEntryHeap.getEntryFactory(); } } @@ -215,22 +252,30 @@ abstract class AbstractRegionMap implements RegionMap { if (isLRU) { if (isDisk) { if (withVersioning) { - { + if (offHeap) { + factory = VersionedThinDiskLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedThinDiskLRURegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMThinDiskLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VMThinDiskLRURegionEntryHeap.getEntryFactory(); } } } else { if (withVersioning) { - { + if (offHeap) { + factory = VersionedThinLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedThinLRURegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMThinLRURegionEntryOffHeap.getEntryFactory(); + } else { factory = VMThinLRURegionEntryHeap.getEntryFactory(); } } @@ -239,22 +284,30 @@ abstract class AbstractRegionMap implements RegionMap { else { // !isLRU if (isDisk) { if (withVersioning) { - { + if (offHeap) { + factory = VersionedThinDiskRegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedThinDiskRegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMThinDiskRegionEntryOffHeap.getEntryFactory(); + } else { factory = VMThinDiskRegionEntryHeap.getEntryFactory(); } } } else { if (withVersioning) { - { + if (offHeap) { + factory = VersionedThinRegionEntryOffHeap.getEntryFactory(); + } else { factory = VersionedThinRegionEntryHeap.getEntryFactory(); } } else { - { + if (offHeap) { + factory = VMThinRegionEntryOffHeap.getEntryFactory(); + } else { factory = VMThinRegionEntryHeap.getEntryFactory(); } } @@ -274,12 +327,15 @@ abstract class AbstractRegionMap implements RegionMap { concurrencyLevel, isIdentityMap, entryCreator); } else { - return new CustomEntryConcurrentHashMap(initialCapacity, loadFactor, - concurrencyLevel, isIdentityMap); + return new CustomEntryConcurrentHashMap(initialCapacity, + loadFactor, concurrencyLevel, isIdentityMap); } } public void changeOwner(LocalRegion r) { + if (r == _getOwnerObject()) { + return; + } setOwner(r); } @@ -332,7 +388,7 @@ abstract class AbstractRegionMap implements RegionMap { // this is currently used by stats and eviction @Override public int sizeInVM() { - return size(); + return _getMap().size(); } public boolean isEmpty() @@ -350,8 +406,13 @@ abstract class AbstractRegionMap implements RegionMap { return (Collection)_getMap().values(); } - public final boolean containsKey(Object key) - { + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public Collection regionEntriesInVM() { + return (Collection)_getMap().values(); + } + + public final boolean containsKey(Object key) { RegionEntry re = getEntry(key); if (re == null) { return false; @@ -362,23 +423,52 @@ abstract class AbstractRegionMap implements RegionMap { return true; } - public RegionEntry getEntry(Object key) - { - return (RegionEntry)_getMap().get(key); + public RegionEntry getEntry(Object key) { + RegionEntry re = (RegionEntry)_getMap().get(key); + if (re != null && re.isMarkedForEviction()) { + // entry has been faulted in from HDFS + return null; + } + return re; + } + + protected RegionEntry getEntry(EntryEventImpl event) { + return getEntry(event.getKey()); } + @Override public final RegionEntry getEntryInVM(Object key) { return (RegionEntry)_getMap().get(key); } - private final RegionEntry putEntryIfAbsent(Object key, RegionEntry re) - { - return (RegionEntry)_getMap().putIfAbsent(key, re); + + public final RegionEntry putEntryIfAbsent(Object key, RegionEntry re) { + RegionEntry value = (RegionEntry)_getMap().putIfAbsent(key, re); + if (value == null && (re instanceof OffHeapRegionEntry) + && _isOwnerALocalRegion() && _getOwner().isThisRegionBeingClosedOrDestroyed()) { + // prevent orphan during concurrent destroy (#48068) + if (_getMap().remove(key, re)) { + ((OffHeapRegionEntry)re).release(); + } + _getOwner().checkReadiness(); // throw RegionDestroyedException + } + return value; + } + + @Override + public final RegionEntry getOperationalEntryInVM(Object key) { + RegionEntry re = (RegionEntry)_getMap().get(key); + if (re != null && re.isMarkedForEviction()) { + // entry has been faulted in from HDFS + return null; + } + return re; } + public final void removeEntry(Object key, RegionEntry re, boolean updateStat) { - if (re.isTombstone() && _getMap().get(key) == re) { + if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){ logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace")); return; // can't remove tombstones except from the tombstone sweeper } @@ -394,7 +484,7 @@ abstract class AbstractRegionMap implements RegionMap { EntryEventImpl event, final LocalRegion owner, final IndexUpdater indexUpdater) { boolean success = false; - if (re.isTombstone()) { + if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) { logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace")); return; // can't remove tombstones except from the tombstone sweeper } @@ -402,6 +492,19 @@ abstract class AbstractRegionMap implements RegionMap { if (indexUpdater != null) { indexUpdater.onEvent(owner, event, re); } + + //This is messy, but custom eviction calls removeEntry + //rather than re.destroy I think to avoid firing callbacks, etc. + //However, the value still needs to be set to removePhase1 + //in order to remove the entry from disk. + if(event.isCustomEviction() && !re.isRemoved()) { + try { + re.removePhase1(owner, false); + } catch (RegionClearedException e) { + //that's ok, we were just trying to do evict incoming eviction + } + } + if (_getMap().remove(key, re)) { re.removePhase2(); success = true; @@ -416,7 +519,7 @@ abstract class AbstractRegionMap implements RegionMap { } } - private final void incEntryCount(int delta) { + protected final void incEntryCount(int delta) { LocalRegion lr = _getOwner(); if (lr != null) { CachePerfStats stats = lr.getCachePerfStats(); @@ -439,6 +542,17 @@ abstract class AbstractRegionMap implements RegionMap { _getMap().clear(); } + public void close() { + /* + for (SuspectEntryList l: this.suspectEntries.values()) { + for (EntryEventImpl e: l) { + e.release(); + } + } + */ + clear(null); + } + /** * Clear the region and, if an RVV is given, return a collection of the * version sources in all remaining tags @@ -505,6 +619,9 @@ abstract class AbstractRegionMap implements RegionMap { boolean tombstone = re.isTombstone(); // note: it.remove() did not reliably remove the entry so we use remove(K,V) here if (_getMap().remove(re.getKey(), re)) { + if (OffHeapRegionEntryHelper.doesClearNeedToCheckForOffHeap()) { + GatewaySenderEventImpl.release(re._getValue()); // OFFHEAP _getValue ok + } //If this is an overflow only region, we need to free the entry on //disk at this point. try { @@ -658,26 +775,35 @@ abstract class AbstractRegionMap implements RegionMap { .entrySetWithReusableEntries().iterator(); while (it.hasNext()) { Map.Entry me = it.next(); - it.remove(); // OFFHEAP: I'm not sure that this sets the value in the ARE to removed. If it doesn't then if it is offheap we need to decrc + it.remove(); // This removes the RegionEntry from "rm" but it does not decrement its refcount to an offheap value. RegionEntry oldRe = (RegionEntry)me.getValue(); Object key = me.getKey(); - Object value = oldRe._getValueUse((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true); - if (value == Token.NOT_AVAILABLE) { - // fix for bug 43993 - value = null; - } - if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) { - continue; - } - RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value); - copyRecoveredEntry(oldRe, newRe); - if (newRe.isTombstone()) { - VersionTag tag = newRe.getVersionStamp().asVersionTag(); - tombstones.put(tag, newRe); + + @Retained @Released Object value = oldRe._getValueRetain((RegionEntryContext) ((AbstractRegionMap) rm)._getOwnerObject(), true); + + try { + if (value == Token.NOT_AVAILABLE) { + // fix for bug 43993 + value = null; + } + if (value == Token.TOMBSTONE && !_getOwner().getConcurrencyChecksEnabled()) { + continue; + } + RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value); + copyRecoveredEntry(oldRe, newRe); + // newRe is now in this._getMap(). + if (newRe.isTombstone()) { + VersionTag tag = newRe.getVersionStamp().asVersionTag(); + tombstones.put(tag, newRe); + } + _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe)); + incEntryCount(1); + lruEntryUpdate(newRe); + } finally { + if (OffHeapHelper.release(value)) { + ((OffHeapRegionEntry)oldRe).release(); + } } - _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe)); - incEntryCount(1); - lruEntryUpdate(newRe); lruUpdateCallback(); } } else { @@ -693,15 +819,7 @@ abstract class AbstractRegionMap implements RegionMap { tombstones.put(re.getVersionStamp().asVersionTag(), re); } } - if (_getOwner() instanceof BucketRegion) { - Object value = re._getValueUse(_getOwner(), false); - if (value == Token.NOT_AVAILABLE) { - value = null; - } - if(value != null) { - _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateValueSize(value)); - } - } + _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re)); } // Since lru was not being done during recovery call it now. lruUpdateCallback(); @@ -729,9 +847,10 @@ abstract class AbstractRegionMap implements RegionMap { _getMap().put(newRe.getKey(), newRe); } + @Retained // Region entry may contain an off-heap value public final RegionEntry initRecoveredEntry(Object key, DiskEntry.RecoveredEntry value) { boolean needsCallback = false; - RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value); + @Retained RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value); synchronized (newRe) { if (value.getVersionTag()!=null && newRe.getVersionStamp()!=null) { newRe.getVersionStamp().setVersions(value.getVersionTag()); @@ -746,9 +865,18 @@ abstract class AbstractRegionMap implements RegionMap { _getOwner().getCachePerfStats().incRetries(); } } - } // isRemoved + } + /* + * Entry already exists which should be impossible. + * Free the current entry (if off-heap) and + * throw an exception. + */ else { - return null; + if (newRe instanceof OffHeapRegionEntry) { + ((OffHeapRegionEntry) newRe).release(); + } + + throw new IllegalStateException("Could not recover entry for key " + key + ". The entry already exists!"); } } // synchronized } @@ -877,6 +1005,10 @@ abstract class AbstractRegionMap implements RegionMap { RegionEntry newRe = getEntryFactory().createEntry(owner, key, Token.REMOVED_PHASE1); EntryEventImpl event = null; + + @Retained @Released Object oldValue = null; + + try { RegionEntry oldRe = null; synchronized (newRe) { try { @@ -923,7 +1055,7 @@ abstract class AbstractRegionMap implements RegionMap { // code will be executed only in case of sqlfabric now. Probably // the code can be made more generic for both SQL Fabric and GemFire. if (indexUpdater != null) { - Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas + oldValue = oldRe.getValueInVM(owner); // OFFHEAP: ListOfDeltas if (oldValue instanceof ListOfDeltas) { // apply the deltas on this new value. update index // Make a new event object @@ -932,15 +1064,21 @@ abstract class AbstractRegionMap implements RegionMap { if (owner instanceof BucketRegion) { rgn = ((BucketRegion)owner).getPartitionedRegion(); } - event = new EntryEventImpl(rgn, Operation.CREATE, key, null, + event = EntryEventImpl.create(rgn, Operation.CREATE, key, null, Boolean.TRUE /* indicate that GII is in progress */, false, null); + try { event.setOldValue(newValue); if (logger.isDebugEnabled()) { logger.debug("initialImagePut: received base value for list of deltas; event: {}", event); } ((ListOfDeltas)oldValue).apply(event); - oldRe.setValue(owner, prepareValueForCache(owner, event.getNewValue()), event); + Object preparedNewValue =oldRe.prepareValueForCache(owner, + event.getNewValueAsOffHeapDeserializedOrRaw(), true); + if(preparedNewValue instanceof Chunk) { + event.setNewValue(preparedNewValue); + } + oldRe.setValue(owner, preparedNewValue, event); //event.setNewValue(event.getOldValue()); event.setOldValue(null); try { @@ -955,11 +1093,17 @@ abstract class AbstractRegionMap implements RegionMap { // this must be done within the oldRe sync block indexUpdater.postEvent(owner, event, oldRe, done); } + } finally { + if (event != null) { + event.release(); + event = null; + } + } } } try { if (indexUpdater != null) { - event = new EntryEventImpl(owner, Operation.CREATE, key, + event = EntryEventImpl.create(owner, Operation.CREATE, key, newValue, Boolean.TRUE /* indicate that GII is in progress */, false, null); @@ -967,7 +1111,7 @@ abstract class AbstractRegionMap implements RegionMap { } result = oldRe.initialImagePut(owner, lastModified, newValue, wasRecovered, acceptedVersionTag); if (result) { - if (oldIsTombstone){ + if (oldIsTombstone) { owner.unscheduleTombstone(oldRe); if (newValue != Token.TOMBSTONE){ lruEntryCreate(oldRe); @@ -1004,6 +1148,10 @@ abstract class AbstractRegionMap implements RegionMap { if (indexUpdater != null) { indexUpdater.postEvent(owner, event, oldRe, result); } + if (event != null) { + event.release(); + event = null; + } } } } @@ -1025,7 +1173,7 @@ abstract class AbstractRegionMap implements RegionMap { try { if (result) { if (indexUpdater != null) { - event = new EntryEventImpl(owner, Operation.CREATE, key, + event = EntryEventImpl.create(owner, Operation.CREATE, key, newValue, Boolean.TRUE /* indicate that GII is in progress */, false, null); @@ -1050,6 +1198,10 @@ abstract class AbstractRegionMap implements RegionMap { if (result && indexUpdater != null) { indexUpdater.postEvent(owner, event, newRe, done); } + if (event != null) { + event.release(); + event = null; + } } } } @@ -1065,6 +1217,10 @@ abstract class AbstractRegionMap implements RegionMap { } } } // synchronized + } finally { + if (event != null) event.release(); + OffHeapHelper.release(oldValue); + } } catch(RegionClearedException rce) { //Asif: do not issue any sort of callbacks done = false; @@ -1121,7 +1277,7 @@ abstract class AbstractRegionMap implements RegionMap { boolean retry = true; // int retries = -1; - + RETRY_LOOP: while (retry) { retry = false; @@ -1155,9 +1311,17 @@ RETRY_LOOP: lockForCacheModification(owner, event); try { - RegionEntry re = getEntry(event.getKey()); + + RegionEntry re = getOrCreateRegionEntry(owner, event, Token.REMOVED_PHASE1, null, true, true); RegionEntry tombstone = null; boolean haveTombstone = false; + /* + * Execute the test hook runnable inline (not threaded) if it is not null. + */ + if(null != testHookRunnableFor48182) { + testHookRunnableFor48182.run(); + } + try { if (logger.isTraceEnabled(LogMarker.LRU_TOMBSTONE_COUNT) && !(owner instanceof HARegion)) { logger.trace(LogMarker.LRU_TOMBSTONE_COUNT, @@ -1214,12 +1378,13 @@ RETRY_LOOP: } } else { event.setRegionEntry(oldRe); + // Last transaction related eviction check. This should // prevent // transaction conflict (caused by eviction) when the entry // is being added to transaction state. if (isEviction) { - if (!confirmEvictionDestroy(oldRe)) { + if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) { opCompleted = false; return opCompleted; } @@ -1276,9 +1441,14 @@ RETRY_LOOP: try { // bug #42228 - leaving "removed" entries in the cache re = newRe; event.setRegionEntry(newRe); + try { //if concurrency checks are enabled, destroy will //set the version tag + if (isEviction) { + opCompleted = false; + return opCompleted; + } opCompleted = destroyEntry(newRe, event, inTokenMode, cacheWrite, expectedOldValue, true, removeRecoveredEntry); if (opCompleted) { // This is a new entry that was created because we are in @@ -1321,10 +1491,14 @@ RETRY_LOOP: // Note no need for LRU work since the entry is destroyed // and will be removed when gii completes } finally { // bug #42228 - if (!opCompleted && !haveTombstone /* to fix bug 51583 do this for all operations */) { + if (!opCompleted && !haveTombstone /* to fix bug 51583 do this for all operations */ ) { + // owner.getLogWriterI18n().warning(LocalizedStrings.DEBUG, "BRUCE: removing incomplete entry"); removeEntry(event.getKey(), newRe, false); } + if (!opCompleted && isEviction) { + removeEntry(event.getKey(), newRe, false); + } } } // !opCompleted } // synchronized newRe @@ -1335,7 +1509,7 @@ RETRY_LOOP: } } // inTokenMode or tombstone creation else { - if (!isEviction || owner.concurrencyChecksEnabled) { + if (!isEviction || owner.concurrencyChecksEnabled) { // The following ensures that there is not a concurrent operation // on the entry and leaves behind a tombstone if concurrencyChecksEnabled. // It fixes bug #32467 by propagating the destroy to the server even though @@ -1390,9 +1564,11 @@ RETRY_LOOP: doPart3 = true; } } - if (throwex) { + if (throwex) { if (ex == null) { - throw new EntryNotFoundException(event.getKey().toString()); + // Fix for 48182, check cache state and/or region state before sending entry not found. + // this is from the server and any exceptions will propogate to the client + owner.checkEntryNotFound(event.getKey()); } else { throw ex; } @@ -1481,10 +1657,11 @@ RETRY_LOOP: } } event.setRegionEntry(re); + // See comment above about eviction checks if (isEviction) { assert expectedOldValue == null; - if (!confirmEvictionDestroy(re)) { + if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) { opCompleted = false; return opCompleted; } @@ -1566,6 +1743,12 @@ RETRY_LOOP: } } // !isRemoved else { // already removed + if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) { + // For HDFS region there may be a race with eviction + // so retry the operation. fixes bug 49150 + retry = true; + continue RETRY_LOOP; + } if (re.isTombstone() && event.getVersionTag() != null) { // if we're dealing with a tombstone and this is a remote event // (e.g., from cache client update thread) we need to update @@ -1585,7 +1768,7 @@ RETRY_LOOP: } if (!inTokenMode && !isEviction) { - throw new EntryNotFoundException(event.getKey().toString()); + owner.checkEntryNotFound(event.getKey()); } // if (isEviction && re.isTombstone()) { // owner.unscheduleTombstone(re); @@ -1668,6 +1851,7 @@ RETRY_LOOP: final boolean isRegionReady = !inTokenMode; final boolean hasRemoteOrigin = !((TXId)txId).getMemberId().equals(owner.getMyId()); + boolean cbEventInPending = false; lockForTXCacheModification(owner, versionTag); IndexManager oqlIndexManager = owner.getIndexManager() ; try { @@ -1683,7 +1867,8 @@ RETRY_LOOP: synchronized (re) { if (!re.isRemoved() || re.isTombstone()) { EntryEventImpl sqlfEvent = null; - Object oldValue = re.getValueInVM(owner); // OFFHEAP escapes to eei and sqlfEvent + @Retained @Released Object oldValue = re.getValueInVM(owner); + try { final int oldSize = owner.calculateRegionEntryValueSize(re); // Create an entry event only if the calling context is // a receipt of a TXCommitMessage AND there are callbacks installed @@ -1691,6 +1876,7 @@ RETRY_LOOP: boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/, isRegionReady || inRI); EntryEventImpl cbEvent = createCBEvent(owner, op, key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { if (/* owner.isUsedForPartitionedRegionBucket() && */ indexUpdater != null) { @@ -1753,6 +1939,7 @@ RETRY_LOOP: cbEvent, true/*callDispatchListenerEvent*/); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } if (!clearOccured) { @@ -1761,6 +1948,12 @@ RETRY_LOOP: if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent!= null) { txEntryState.setVersionTag(cbEvent.getVersionTag()); } + } finally { + if (!cbEventInPending) cbEvent.release(); + } + } finally { + OffHeapHelper.release(oldValue); + } } } } finally { @@ -1795,6 +1988,7 @@ RETRY_LOOP: boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI); cbEvent = createCBEvent(owner, op, key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { cbEvent.setRegionEntry(oldRe); cbEvent.setOldValue(Token.NOT_AVAILABLE); if (isDebugEnabled) { @@ -1811,6 +2005,7 @@ RETRY_LOOP: cbEvent, dispatchListenerEvent); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } int oldSize = 0; @@ -1829,6 +2024,9 @@ RETRY_LOOP: owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode, false /* Clear Conflicting with the operation */); lruEntryDestroy(oldRe); + } finally { + if (!cbEventInPending) cbEvent.release(); + } } catch (RegionClearedException rce) { owner.txApplyDestroyPart2(oldRe, oldRe.getKey(), inTokenMode, @@ -1852,6 +2050,7 @@ RETRY_LOOP: boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI); cbEvent = createCBEvent(owner, op, key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { cbEvent.setRegionEntry(newRe); cbEvent.setOldValue(Token.NOT_AVAILABLE); if (isDebugEnabled) { @@ -1868,6 +2067,7 @@ RETRY_LOOP: cbEvent, dispatchListenerEvent); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } EntryLogger.logTXDestroy(_getOwnerObject(), key); @@ -1885,6 +2085,9 @@ RETRY_LOOP: false /*clearConflict*/); // Note no need for LRU work since the entry is destroyed // and will be removed when gii completes + } finally { + if (!cbEventInPending) cbEvent.release(); + } } if (owner.concurrencyChecksEnabled && txEntryState != null && cbEvent != null) { txEntryState.setVersionTag(cbEvent.getVersionTag()); @@ -1906,6 +2109,7 @@ RETRY_LOOP: EntryEventImpl cbEvent = createCBEvent(owner, op, key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { if (owner.isUsedForPartitionedRegionBucket()) { txHandleWANEvent(owner, cbEvent, txEntryState); } @@ -1914,6 +2118,10 @@ RETRY_LOOP: owner.invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY,cbEvent,false); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; + } + } finally { + if (!cbEventInPending) cbEvent.release(); } } } catch( DiskAccessException dae) { @@ -2003,13 +2211,19 @@ RETRY_LOOP: FilterProfile fp = owner.getFilterProfile(); if (!oldRe.isRemoved() && (fp != null && fp.getCqCount() > 0)) { - Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl oldValue + + @Retained @Released Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP EntryEventImpl oldValue + // this will not fault in the value. + try { if (oldValue == Token.NOT_AVAILABLE){ event.setOldValue(oldRe.getValueOnDiskOrBuffer(owner)); } else { event.setOldValue(oldValue); } + } finally { + OffHeapHelper.release(oldValue); + } } boolean isCreate = false; try { @@ -2257,7 +2471,14 @@ RETRY_LOOP: if (re.isValueNull()) { event.setOldValue(re.getValueOnDiskOrBuffer(owner)); } else { - event.setOldValue(re.getValueInVM(owner)); // OFFHEAP escapes to EntryEventImpl oldValue + + @Retained @Released Object v = re.getValueInVM(owner); + + try { + event.setOldValue(v); // OFFHEAP escapes to EntryEventImpl oldValue + } finally { + OffHeapHelper.release(v); + } } } final boolean oldWasTombstone = re.isTombstone(); @@ -2298,7 +2519,7 @@ RETRY_LOOP: // is in region, do nothing } if (!entryExisted) { - throw new EntryNotFoundException(event.getKey().toString()); + owner.checkEntryNotFound(event.getKey()); } } // while(retry) } // !forceNewEntry @@ -2394,7 +2615,7 @@ RETRY_LOOP: } } if (!entryExisted) { - throw new EntryNotFoundException(event.getKey().toString()); + owner.checkEntryNotFound(event.getKey()); } } catch( DiskAccessException dae) { this._getOwner().handleDiskAccessException(dae); @@ -2449,14 +2670,17 @@ RETRY_LOOP: final boolean oldWasTombstone = oldRe.isTombstone(); final int oldSize = owner.calculateRegionEntryValueSize(oldRe); Object oldValue = oldRe.getValueInVM(owner); // OFFHEAP eei + try { // Create an entry event only if the calling context is // a receipt of a TXCommitMessage AND there are callbacks // installed // for this region boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized()); + boolean cbEventInPending = false; cbEvent = createCBEvent(owner, localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { cbEvent.setRegionEntry(oldRe); cbEvent.setOldValue(oldValue); if (logger.isDebugEnabled()) { @@ -2475,7 +2699,7 @@ RETRY_LOOP: processAndGenerateTXVersionTag(owner, cbEvent, oldRe, txEntryState); boolean clearOccured = false; try { - oldRe.setValue(owner, prepareValueForCache(owner, newValue)); + oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, true)); EntryLogger.logTXInvalidate(_getOwnerObject(), key); owner.updateSizeOnPut(key, oldSize, 0); if (oldWasTombstone) { @@ -2496,6 +2720,7 @@ RETRY_LOOP: true/*callDispatchListenerEvent*/); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } if (!clearOccured) { @@ -2504,21 +2729,29 @@ RETRY_LOOP: if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) { txEntryState.setVersionTag(cbEvent.getVersionTag()); } + } finally { + if (!cbEventInPending) cbEvent.release(); + } + } finally { + OffHeapHelper.release(oldValue); + } } } } if (!opCompleted) { boolean invokeCallbacks = shouldCreateCBEvent( owner, true /* isInvalidate */, owner.isInitialized()); + boolean cbEventInPending = false; cbEvent = createCBEvent(owner, localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { cbEvent.setRegionEntry(newRe); txRemoveOldIndexEntry(Operation.INVALIDATE, newRe); newRe.setValueResultOfSearch(false); boolean clearOccured = false; try { processAndGenerateTXVersionTag(owner, cbEvent, newRe, txEntryState); - newRe.setValue(owner, prepareValueForCache(owner, newValue)); + newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, true)); EntryLogger.logTXInvalidate(_getOwnerObject(), key); owner.updateSizeOnCreate(newRe.getKey(), 0);//we are putting in a new invalidated entry } @@ -2536,6 +2769,7 @@ RETRY_LOOP: true/*callDispatchListenerEvent*/); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } opCompleted = true; @@ -2546,6 +2780,9 @@ RETRY_LOOP: if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) { txEntryState.setVersionTag(cbEvent.getVersionTag()); } + } finally { + if (!cbEventInPending) cbEvent.release(); + } } } finally { @@ -2568,9 +2805,11 @@ RETRY_LOOP: // installed // for this region boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized()); + boolean cbEventInPending = false; cbEvent = createCBEvent(owner, localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { cbEvent.setRegionEntry(re); cbEvent.setOldValue(oldValue); txRemoveOldIndexEntry(Operation.INVALIDATE, re); @@ -2584,7 +2823,7 @@ RETRY_LOOP: processAndGenerateTXVersionTag(owner, cbEvent, re, txEntryState); boolean clearOccured = false; try { - re.setValue(owner, prepareValueForCache(owner, newValue)); + re.setValue(owner, re.prepareValueForCache(owner, newValue, true)); EntryLogger.logTXInvalidate(_getOwnerObject(), key); if (wasTombstone) { owner.unscheduleTombstone(re); @@ -2605,6 +2844,7 @@ RETRY_LOOP: true/*callDispatchListenerEvent*/); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } if (!clearOccured) { @@ -2613,6 +2853,9 @@ RETRY_LOOP: if (shouldPerformConcurrencyChecks(owner, cbEvent) && txEntryState != null) { txEntryState.setVersionTag(cbEvent.getVersionTag()); } + } finally { + if (!cbEventInPending) cbEvent.release(); + } } } } else { //re == null @@ -2621,16 +2864,22 @@ RETRY_LOOP: // that the invalidate is already applied on the Initial image // provider, thus causing region entry to be absent. // Notify clients with client events. + boolean cbEventInPending = false; cbEvent = createCBEvent(owner, localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE, key, newValue, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey); + try { switchEventOwnerAndOriginRemote(cbEvent, hasRemoteOrigin); if (pendingCallbacks == null) { owner.invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, cbEvent, false); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; + } + } finally { + if (!cbEventInPending) cbEvent.release(); } } } @@ -2669,10 +2918,19 @@ RETRY_LOOP: } } - private RegionEntry getOrCreateRegionEntry(Object ownerRegion, Object key, Object value, boolean onlyExisting) { - RegionEntry retVal = getEntry(key); + private RegionEntry getOrCreateRegionEntry(Object ownerRegion, + EntryEventImpl event, Object value, + MapCallbackAdapter valueCreator, + boolean onlyExisting, boolean returnTombstone) { + Object key = event.getKey(); + RegionEntry retVal = null; + if (event.isFetchFromHDFS()) { + retVal = getEntry(event); + } else { + retVal = getEntryInVM(key); + } if (onlyExisting) { - if (retVal != null && retVal.isTombstone()) { + if (!returnTombstone && (retVal != null && retVal.isTombstone())) { return null; } return retVal; @@ -2680,14 +2938,30 @@ RETRY_LOOP: if (retVal != null) { return retVal; } + if (valueCreator != null) { + value = valueCreator.newValue(key, ownerRegion, value, null); + } retVal = getEntryFactory().createEntry((RegionEntryContext) ownerRegion, key, value); RegionEntry oldRe = putEntryIfAbsent(key, retVal); if (oldRe != null) { + if (retVal instanceof OffHeapRegionEntry) { + ((OffHeapRegionEntry) retVal).release(); + } return oldRe; } return retVal; } + protected static final MapCallbackAdapter + listOfDeltasCreator = new MapCallbackAdapter() { + @Override + public Object newValue(Object key, Object context, Object createParams, + final MapResult result) { + return new ListOfDeltas(4); + } + }; + /** * Neeraj: The below if block is to handle the special * scenario witnessed in Sqlfabric for now. (Though its @@ -2708,28 +2982,37 @@ RETRY_LOOP: boolean isOldValueDelta = true; try { if (ifOld) { - final Delta delta = (Delta)event.getRawNewValue(); - RegionEntry re = getOrCreateRegionEntry(owner, event.getKey(), new ListOfDeltas(delta), false); + final Delta delta = event.getDeltaNewValue(); + RegionEntry re = getOrCreateRegionEntry(owner, event, null, + listOfDeltasCreator, false, false); assert re != null; synchronized (re) { - Object oVal = re.getValueInVMOrDiskWithoutFaultIn(owner); + @Retained @Released Object oVal = re.getValueOffHeapOrDiskWithoutFaultIn(owner); if (oVal != null) { + try { if (oVal instanceof ListOfDeltas) { if (logger.isDebugEnabled()) { logger.debug("basicPut: adding delta to list of deltas: {}", delta); } ((ListOfDeltas)oVal).merge(delta); + @Retained Object newVal = ((AbstractRegionEntry)re).prepareValueForCache(owner, oVal, true); + re.setValue(owner, newVal); // TODO:KIRK:48068 prevent orphan } else { isOldValueDelta = false; } + }finally { + OffHeapHelper.release(oVal); + } } else { if (logger.isDebugEnabled()) { logger.debug("basicPut: new list of deltas with delta: {}", delta); } - Object newVal = new ListOfDeltas(delta); - re.setValue(owner, newVal); // TODO no need to call AbstractRegionMap.prepareValueForCache here? + @Retained Object newVal = new ListOfDeltas(delta); + // TODO no need to call AbstractRegionMap.prepareValueForCache here? + newVal = ((AbstractRegionEntry)re).prepareValueForCache(owner, newVal, true); + re.setValue(owner, newVal); // TODO:KIRK:48068 prevent orphan } } } @@ -2812,7 +3095,6 @@ RETRY_LOOP: boolean retrieveOldValueForDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null; - Object oldValueForDelta = null; lockForCacheModification(owner, event); IndexManager oqlIndexManager = null; try { @@ -2842,7 +3124,8 @@ RETRY_LOOP: RegionEntry re = null; boolean eventRecorded = false; boolean onlyExisting = ifOld && !replaceOnClient; - re = getOrCreateRegionEntry(owner, event.getKey(), Token.REMOVED_PHASE1, onlyExisting); + re = getOrCreateRegionEntry(owner, event, + Token.REMOVED_PHASE1, null, onlyExisting, false); if (re == null) { throwExceptionForSqlFire(event); return null; @@ -2853,8 +3136,9 @@ RETRY_LOOP: // from the map. otherwise we can append an event to it // and change its state if (re.isRemovedPhase2()) { - re = getOrCreateRegionEntry(owner, event.getKey(), Token.REMOVED_PHASE1, onlyExisting); - _getOwner().getCachePerfStats().incRetries(); + re = getOrCreateRegionEntry(owner, event, + Token.REMOVED_PHASE1, null, onlyExisting, false); + _getOwner().getCachePerfStats().incRetries(); if (re == null) { // this will happen when onlyExisting is true throwExceptionForSqlFire(event); @@ -2862,6 +3146,7 @@ RETRY_LOOP: } continue; } else { + @Released Object oldValueForDelta = null; if (retrieveOldValueForDelta) { // defer the lruUpdateCallback to prevent a deadlock (see bug 51121). final boolean disabled = disableLruUpdateCallback(); @@ -2897,47 +3182,92 @@ RETRY_LOOP: // notify index of an update notifyIndex(re, true); - try { - try { - if ((cacheWrite && event.getOperation().isUpdate()) // if there is a cacheWriter, type of event has already been set - || !re.isRemoved() - || replaceOnClient) { - // update - updateEntry(event, requireOldValue, oldValueForDelta, re); - } else { - // create - createEntry(event, owner, re); + try { + if ((cacheWrite && event.getOperation().isUpdate()) // if there is a cacheWriter, type of event has already been set + || !re.isRemoved() + || replaceOnClient) { + // update + updateEntry(event, requireOldValue, oldValueForDelta, re); + } else { + // create + createEntry(event, owner, re); + } + owner.recordEvent(event); + eventRecorded = true; + } catch (RegionClearedException rce) { + clearOccured = true; + owner.recordEvent(event); + } catch (ConcurrentCacheModificationException ccme) { + VersionTag tag = event.getVersionTag(); + if (tag != null && tag.isTimeStampUpdated()) { + // Notify gateways of new time-stamp. + owner.notifyTimestampsToGateways(event); + } + throw ccme; } - owner.recordEvent(event); - eventRecorded = true; - } catch (RegionClearedException rce) { - clearOccured = true; - owner.recordEvent(event); - } catch (ConcurrentCacheModificationException ccme) { - VersionTag tag = event.getVersionTag(); - if (tag != null && tag.isTimeStampUpdated()) { - // Notify gateways of new time-stamp. - owner.notifyTimestampsToGateways(event); + if (uninitialized) { + event.inhibitCacheListenerNotification(true); } - throw ccme; + updateLru(clearOccured, re, event); + + lastModifiedTime = owner.basicPutPart2(event, re, + !uninitialized, lastModifiedTime, clearOccured); + } finally { + notifyIndex(re, false); } - if (uninitialized) { - event.inhibitCacheListenerNotification(true); + result = re; + break; + } finally { + OffHeapHelper.release(oldValueForDelta); + if (re != null && !onlyExisting && !isOpComplete(re, event)) { + owner.cleanUpOnIncompleteOp(event, re, eventRecorded, + false/* updateStats */, replaceOnClient); } - updateLru(clearOccured, re, event); + else if (re != null && owner.isUsedForPartitionedRegionBucket()) { + BucketRegion br = (BucketRegion)owner; + CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats(); + long startTime= stats.startCustomEviction(); + CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes(); + // No need to update indexes if entry was faulted in but operation did not succeed. + if (csAttr != null && (csAttr.isEvictIncoming() || re.isMarkedForEviction())) { + + if (csAttr.getCriteria().doEvict(event)) { + stats.incEvictionsInProgress(); + // set the flag on event saying the entry should be evicted + // and not indexed + EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(), + null/* newValue */, null, false, owner.getMyId()); + try { - lastModifiedTime = owner.basicPutPart2(event, re, - !uninitialized, lastModifiedTime, clearOccured); - } finally { - notifyIndex(re, false); - } - result = re; - break; - } finally { - if (re != null && !isOpComplete(re, event)) { - owner.cleanUpOnIncompleteOp(event, re, eventRecorded, - false/* updateStats */, replaceOnClient); + destroyEvent.setOldValueFromRegion(); + destroyEvent.setCustomEviction(true); + destroyEvent.setPossibleDuplicate(event.isPossibleDuplicate()); + if(logger.isDebugEnabled()) { + logger.debug("Evicting the entry " + destroyEvent); + } + if(result != null) { + removeEntry(event.getKey(),re, true, destroyEvent,owner, indexUpdater); + } + else{ + removeEntry(event.getKey(),re, true, destroyEvent,owner, null); + } + //mark the region entry for this event as evicted + event.setEvicted(); + stats.incEvictions(); + if(logger.isDebugEnabled()) { + logger.debug("Evicted the entry " + destroyEvent); + } + //removeEntry(event.getKey(), re); + } finally { + destroyEvent.release(); + stats.decEvictionsInProgress(); + } + } else { + re.clearMarkedForEviction(); + } + } + stats.endCustomEviction(startTime); } } // try } @@ -3018,10 +3348,18 @@ RETRY_LOOP: // replace is propagated to server, so no need to check // satisfiesOldValue on client if (expectedOldValue != null && !replaceOnClient) { - Object v = re._getValueUse(event.getLocalRegion(), true); - if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v)) { + SimpleMemoryAllocatorImpl.skipRefCountTracking(); + + @Retained @Released Object v = re._getValueRetain(event.getLocalRegion(), true); + + SimpleMemoryAllocatorImpl.unskipRefCountTracking(); + try { + if (!AbstractRegionEntry.checkExpectedOldValue(expectedOldValue, v, event.getLocalRegion())) { return false; } + } finally { + OffHeapHelper.releaseWithNoTracking(v); + } } return true; } @@ -3038,15 +3376,43 @@ RETRY_LOOP: // calculations will be incorrect in case the value was read from // disk but not brought into the VM like what getValueInVMOrDisk // method does when value is not found in VM + // PRECONDITION: caller must be synced on re private void setOldValueInEvent(EntryEventImpl event, RegionEntry re, boolean cacheWrite, boolean requireOldValue) { - if (getIndexUpdater() != null || cacheWrite || requireOldValue || - event.getOperation().guaranteesOldValue()) { - if (event.hasDelta() || event.getOperation().guaranteesOldValue()) { - Object oldValueInVMOrDisk = re.getValueOffHeapOrDiskWithoutFaultIn(event.getLocalRegion()); - event.setOldValue(oldValueInVMOrDisk, requireOldValue); + boolean needToSetOldValue = getIndexUpdater() != null || cacheWrite || requireOldValue || event.getOperation().guaranteesOldValue(); + if (needToSetOldValue) { + if (event.hasDelta() || event.getOperation().guaranteesOldValue() + || GemFireCacheImpl.sqlfSystem()) { + // In these cases we want to even get the old value from disk if it is not in memory + SimpleMemoryAllocatorImpl.skipRefCountTracking(); + @Released Object oldValueInVMOrDisk = re.getValueOffHeapOrDiskWithoutFaultIn(event.getLocalRegion()); + SimpleMemoryAllocatorImpl.unskipRefCountTracking(); + try { + event.setOldValue(oldValueInVMOrDisk, requireOldValue + || GemFireCacheImpl.sqlfSystem()); + } finally { + OffHeapHelper.releaseWithNoTracking(oldValueInVMOrDisk); + } } else { - Object oldValueInVM = re._getValueUse(event.getLocalRegion(), true); // OFFHEAP: re synced so can use its ref. - event.setOldValue(oldValueInVM, requireOldValue); + // In these cases only need the old value if it is in memory + SimpleMemoryAllocatorImpl.skipRefCountTracking(); + + @Retained @Released Object oldValueInVM = re._getValueRetain(event.getLocalRegion(), true); // OFFHEAP: re synced so can use its ref. + + SimpleMemoryAllocatorImpl.unskipRefCountTracking(); + try { + event.setOldValue(oldValueInVM, + requireOldValue || GemFireCacheImpl.sqlfSystem()); + } finally { + OffHeapHelper.releaseWithNoTracking(oldValueInVM); + } + } + } else { + // if the old value is in memory then if it is a GatewaySenderEventImpl then + // we want to set the old value. + @Unretained Object ov = re._getValue(); // OFFHEAP _getValue is ok since re is synced and we only use it if its a GatewaySenderEventImpl. + // Since GatewaySenderEventImpl is never stored in an off-heap region nor a compressed region we don't need to worry about ov being compressed. + if (ov instanceof GatewaySenderEventImpl) { + event.setOldValue(ov, true); } } } @@ -3178,7 +3544,7 @@ RETRY_LOOP: } protected boolean destroyEntry(RegionEntry re, EntryEventImpl event, - boolean inTokenMode, boolean cacheWrite, Object expectedOldValue, + boolean inTokenMode, boolean cacheWrite, @Released Object expectedOldValue, boolean forceDestroy, boolean removeRecoveredEntry) throws CacheWriterException, TimeoutException, EntryNotFoundException, RegionClearedException { @@ -3216,8 +3582,10 @@ RETRY_LOOP: EntryEventImpl cbEvent = null; EntryEventImpl sqlfEvent = null; boolean invokeCallbacks = shouldCreateCBEvent(owner, false /*isInvalidate*/, isRegionReady); + boolean cbEventInPending = false; cbEvent = createCBEvent(owner, putOp, key, newValue, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey); + try { if (logger.isDebugEnabled()) { logger.debug("txApplyPut cbEvent={}", cbEvent); } @@ -3293,7 +3661,7 @@ RETRY_LOOP: //cbEvent.putExistingEntry(owner, re); sqlfEvent.putExistingEntry(owner, re); } else { - re.setValue(owner, prepareValueForCache(owner, newValue, cbEvent)); + re.setValue(owner, re.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate())); } if (putOp.isCreate()) { owner.updateSizeOnCreate(key, owner.calculateRegionEntryValueSize(re)); @@ -3332,6 +3700,7 @@ RETRY_LOOP: cbEvent, hasRemoteOrigin); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } if (!clearOccured) { @@ -3400,7 +3769,7 @@ RETRY_LOOP: //cbEvent.putExistingEntry(owner, oldRe); sqlfEvent.putExistingEntry(owner, oldRe); } else { - oldRe.setValue(owner, prepareValueForCache(owner, newValue, cbEvent)); + oldRe.setValue(owner, oldRe.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate())); if (wasTombstone) { owner.unscheduleTombstone(oldRe); } @@ -3444,6 +3813,7 @@ RETRY_LOOP: cbEvent, true/*callDispatchListenerEvent*/); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } if (!clearOccured) { @@ -3479,7 +3849,7 @@ RETRY_LOOP: if (sqlfEvent != null ) { sqlfEvent.putNewEntry(owner,newRe); } else { - newRe.setValue(owner, prepareValueForCache(owner, newValue, cbEvent)); + newRe.setValue(owner, newRe.prepareValueForCache(owner, newValue, cbEvent, !putOp.isCreate())); } owner.updateSizeOnCreate(newRe.getKey(), owner.calculateRegionEntryValueSize(newRe)); } @@ -3508,6 +3878,7 @@ RETRY_LOOP: true/*callDispatchListenerEvent*/); } else { pendingCallbacks.add(cbEvent); + cbEventInPending = true; } } if (!clearOccured) { @@ -3535,6 +3906,10 @@ RETRY_LOOP: oqlIndexManager.countDownIndexUpdaters(); } } + } finally { + if (!cbEventInPending) cbEvent.release(); + if (sqlfEvent != null) sqlfEvent.release(); + } } private void txHandleWANEvent(final LocalRegion owner, EntryEventImpl cbEvent, TXEntryState txEntryState) { @@ -3564,6 +3939,15 @@ RETRY_LOOP: } catch (ConcurrentCacheModificationException ignore) { // ignore this execption, however invoke callbacks for this operation } + + // For distributed transactions, stuff the next region version generated + // in phase-1 commit into the cbEvent so that ARE.generateVersionTag can later + // just apply it and not regenerate it in phase-2 commit + if (cbEvent != null && txEntryState != null && txEntryState.getDistTxEntryStates() != null) { + cbEvent.setNextRegionVersion(txEntryState.getDistTxEntryStates().getRegionVersion()); + } + + //cbEvent.setNextRegionVersion(txEntryState.getNextRegionVersion()); owner.generateAndSetVersionTag(cbEvent, re); } } @@ -3588,47 +3972,6 @@ RETRY_LOOP: return event; } - public static Object prepareValueForCache(RegionEntryContext r, Object val) { - return prepareValueForCache(r, val, null); - } - /** - * Prepares and returns a value to be stored in the cache. - * Current prep is to make sure a PdxInstance is not stored in the cache - * and to copy values into offheap memory of the region is using off heap storage. - * - * @param r the region the prepared object will be stored in - * @param val the value that will be stored - * @return the prepared value - */ - public static Object prepareValueForCache(RegionEntryContext r, Object val, EntryEventImpl event) { - Object nv = val; - if (nv instanceof PdxInstance) { - // We do not want to put PDXs in the cache as values. - // So get the serialized bytes and use a CachedDeserializable. - try { - byte[] data = ((ConvertableToBytes)nv).toBytes(); - byte[] compressedData = compressBytes(r, data); - if (data == compressedData) { - nv = CachedDeserializableFactory.create(data); - } else { - nv = compressedData; - } - } catch (IOException e) { - throw new PdxSerializationException("Could not convert " + nv + " to bytes", e); - } - } else { - nv = AbstractRegionEntry.compress(r, nv, event); - } - return nv; - } - - private static byte[] compressBytes(RegionEntryContext context, byte[] value) { - if (AbstractRegionEntry.isCompressible(context, value)) { - value = context.getCompressor().compress(value); - } - return value; - } - /** * Removing the existing indexed value requires the current value in the cache, * that is the one prior to applying the operation. @@ -3702,10 +4045,13 @@ RETRY_LOOP: eventRegion = re.getPartitionedRegion(); } - EntryEventImpl retVal = new EntryEventImpl( + EntryEventImpl retVal = EntryEventImpl.create( re, op, key, newValue, aCallbackArgument, txEntryState == null, originator); + boolean returnedRetVal = false; + try { + if(bridgeContext!=null) { retVal.setContext(bridgeContext); @@ -3778,7 +4124,13 @@ RETRY_LOOP: } } retVal.setTransactionId(txId); + returnedRetVal = true; return retVal; + } finally { + if (!returnedRetVal) { + retVal.release(); + } + } } public final void writeSyncIfPresent(Object key, Runnable runner) @@ -3892,7 +4244,14 @@ RETRY_LOOP: if (actualRe != re) { // null actualRe is okay here return true; // tombstone was evicted at some point } - int entryVersion = re.getVersionStamp().getEntryVersion(); + VersionStamp vs = re.getVersionStamp(); + if (vs == null) { + // if we have no VersionStamp why were we even added as a tombstone? + // We used to see an NPE here. See bug 52092. + logger.error("Unexpected RegionEntry scheduled as tombstone: re.getClass {} destroyedVersion {}", re.getClass(), destroyedVersion); + return true; + } + int entryVersion = vs.getEntryVersion(); boolean isSameTombstone = (entryVersion == destroyedVersion && re.isTombstone()); return !isSameTombstone; } @@ -3903,8 +4262,8 @@ RETRY_LOOP: int destroyedVersion = version.getEntryVersion(); DiskRegion dr = this._getOwner().getDiskRegion(); + synchronized(this._getOwner().getSizeGuard()) { // do this sync first; see bug 51985 synchronized (re) { - synchronized(this._getOwner().getSizeGuard()) { int entryVersion = re.getVersionStamp().getEntryVersion(); boolean isTombstone = re.isTombstone(); boolean isSameTombstone = (entryVersion == destroyedVersion && isTombstone); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java index 1b2a8c1..2a88268 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractUpdateOperation.java @@ -322,5 +322,10 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation throw ex; } } + + @Override + protected boolean mayAddToMultipleSerialGateways(DistributionManager dm) { + return _mayAddToMultipleSerialGateways(dm); + } } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java index 40c8808..1299d75 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java @@ -882,7 +882,24 @@ public final class BucketAdvisor extends CacheDistributionAdvisor { */ private final InternalDistributedMember getExistingPrimary() { return basicGetPrimaryMember(); - } + } + + /** + * If the current member is primary for this bucket return true, otherwise, + * give some time for the current member to become primary and + * then return whether it is a primary (true/false). + */ + public final boolean isPrimaryWithWait() { + if (this.isPrimary()) { + return true; + } + // wait for the current member to become primary holder + InternalDistributedMember primary = waitForNewPrimary(); + if(primary != null) { + return true; + } + return false; + } /** * This method was split out from getPrimary() due to bug #40639 @@ -976,6 +993,11 @@ public final class BucketAdvisor extends CacheDistributionAdvisor { } } finally { if (lostPrimary) { + Bucket br = this.regionAdvisor.getBucket(getBucket().getId()); + if( br != null && br instanceof BucketRegion) { + ((BucketRegion)br).beforeReleasingPrimaryLockDuringDemotion(); + } + releasePrimaryLock(); // this was a deposePrimary call so we need to depose children as well deposePrimaryForColocatedChildren(); @@ -1582,7 +1604,39 @@ public final class BucketAdvisor extends CacheDistributionAdvisor { return false; } } + + private final static long BUCKET_STORAGE_WAIT = Long.getLong("gemfire.BUCKET_STORAGE_WAIT", 15000).longValue(); // 15 seconds + public boolean waitForStorage() { + synchronized (this) { + // let's park this thread and wait for storage! + StopWatch timer = new StopWatch(true); + try { + for (;;) { + if (this.regionAdvisor.isBucketLocal(getBucket().getId())) { + return true; + } + getProxyBucketRegion().getPartitionedRegion().checkReadiness(); + if (isClosed()) { + return false; + } + long timeLeft = BUCKET_STORAGE_WAIT - timer.elapsedTimeMillis(); + if (timeLeft <= 0) { + return false; + } + if (logger.isDebugEnabled()) { + logger.debug("Waiting for bucket storage" + this); + } + this.wait(timeLeft); // spurious wakeup ok + } + } + catch (InterruptedException e) { + // abort and return null + Thread.currentThread().interrupt(); + } + return false; + } + } public void clearPrimaryElector() { synchronized(this) { primaryElector = null;