Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 79176183A4 for ; Thu, 9 Jul 2015 17:05:33 +0000 (UTC) Received: (qmail 66112 invoked by uid 500); 9 Jul 2015 17:05:33 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 66079 invoked by uid 500); 9 Jul 2015 17:05:33 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 66066 invoked by uid 99); 9 Jul 2015 17:05:33 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jul 2015 17:05:33 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 09 Jul 2015 17:00:41 +0000 Received: (qmail 54006 invoked by uid 99); 9 Jul 2015 17:02:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jul 2015 17:02:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8F249E6842; Thu, 9 Jul 2015 17:02:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dschneider@apache.org To: commits@geode.incubator.apache.org Date: Thu, 09 Jul 2015 17:02:49 -0000 Message-Id: <51e88ab317a543cbbd28fe87944e4efe@git.apache.org> In-Reply-To: <7f249334f8de4a87a653a7c04a138bba@git.apache.org> References: <7f249334f8de4a87a653a7c04a138bba@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/57] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java index e3120f8..b394d10 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.logging.log4j.Logger; @@ -24,7 +25,11 @@ import com.gemstone.gemfire.cache.Operation; import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.cache.TimeoutException; +import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue; +import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl; import com.gemstone.gemfire.internal.cache.lru.LRUStatistics; +import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; +import com.gemstone.gemfire.internal.cache.versions.VersionSource; import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor; import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; @@ -33,9 +38,10 @@ import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewa import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.offheap.OffHeapRegionEntryHelper; public abstract class AbstractBucketRegionQueue extends BucketRegion { - private static final Logger logger = LogService.getLogger(); + protected static final Logger logger = LogService.getLogger(); /** * The maximum size of this single queue before we start blocking puts @@ -244,8 +250,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { + key, rde); } } finally { - //merge42180: are we considering offheap in cedar. Comment freeOffHeapReference intentionally - //event.freeOffHeapReferences(); + event.release(); } this.notifyEntriesRemoved(); @@ -301,16 +306,16 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { logger.debug("For bucket {} , enqueing event {} caused exception", getId(), event, e); } } finally { - /*if (event != null) { - event.release(); // merge44873: this is offheap related change from cheetah - }*/ + if (event != null) { + event.release(); + } } } } finally { if (!tempQueue.isEmpty()) { - /*for (GatewaySenderEventImpl e: tempQueue) { - e.release(); // merge44873: this is offheap related change from cheetah - }*/ + for (GatewaySenderEventImpl e: tempQueue) { + e.release(); + } tempQueue.clear(); } getInitializationLock().writeLock().unlock(); @@ -364,7 +369,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { //if (ov instanceof GatewaySenderEventImpl) { // ((GatewaySenderEventImpl)ov).release(); //} - + GatewaySenderEventImpl.release(event.getRawOldValue()); } return success; @@ -378,6 +383,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { //if (rov instanceof GatewaySenderEventImpl) { // ((GatewaySenderEventImpl) rov).release(); //} + GatewaySenderEventImpl.release(event.getRawOldValue()); } @@ -429,21 +435,31 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { // is never stored offheap so this EntryEventImpl values will never be off-heap. // So the value that ends up being stored in this region is a GatewaySenderEventImpl // which may have a reference to a value stored off-heap. - EntryEventImpl event = new EntryEventImpl(this, Operation.UPDATE, key, + EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key, value, null, false, getMyId()); // here avoiding unnecessary validations of key, value. Readniness check // will be handled in virtualPut. avoiding extractDelta as this will be new // entry everytime // EntryEventImpl event = getPartitionedRegion().newUpdateEntryEvent(key, // value, null); - //event.copyOffHeapToHeap(); + event.copyOffHeapToHeap(); if (logger.isDebugEnabled()) { logger.debug("Value : {}", event.getRawNewValue()); } waitIfQueueFull(); - + + int sizeOfHdfsEvent = -1; try { + if (this instanceof HDFSBucketRegionQueue) { + // need to fetch the size before event is inserted in queue. + // fix for #50016 + if (this.getBucketAdvisor().isPrimary()) { + HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue(); + sizeOfHdfsEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted); + } + } + didPut = virtualPut(event, false, false, null, false, startPut, true); checkReadiness(); @@ -454,10 +470,9 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { throw new ForceReattemptException("Bucket moved", rde); } } finally { - //if (!didPut) { - // GatewaySenderEventImpl gwVal = (GatewaySenderEventImpl) value; - // gwVal.release(); - //} + if (!didPut) { + GatewaySenderEventImpl.release(value); + } } //check again if the key exists in failedBatchRemovalMessageKeys, @@ -467,38 +482,38 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { destroyKey(key); didPut = false; } else { - addToEventQueue(key, didPut, event); + addToEventQueue(key, didPut, event, sizeOfHdfsEvent); } return didPut; } + @Override + public void closeEntries() { + OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + @Override + public void run() { + AbstractBucketRegionQueue.super.closeEntries(); + } + }); + clearQueues(); + + } -// @Override -// public void closeEntries() { -// OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { -// @Override -// public void run() { -// AbstractBucketRegionQueue.super.closeEntries(); -// } -// }); -// clearQueues(); -// -// } -// -// @Override -// public Set clearEntries(final RegionVersionVector rvv) { -// final AtomicReference> result = new AtomicReference>(); -// OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { -// @Override -// public void run() { -// result.set(AbstractBucketRegionQueue.super.clearEntries(rvv)); -// } -// }); -// clearQueues(); -// return result.get(); -// } + @Override + public Set clearEntries(final RegionVersionVector rvv) { + final AtomicReference> result = new AtomicReference>(); + OffHeapRegionEntryHelper.doWithOffHeapClear(new Runnable() { + @Override + public void run() { + result.set(AbstractBucketRegionQueue.super.clearEntries(rvv)); + } + }); + clearQueues(); + return result.get(); + } protected abstract void clearQueues(); - protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event); + protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, + int sizeOfHdfsEvent); @Override public void afterAcquiringPrimaryState() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java index a237b7c..a10d503 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegion.java @@ -9,6 +9,8 @@ package com.gemstone.gemfire.internal.cache; import java.io.PrintStream; import java.util.EnumSet; +import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -32,6 +34,7 @@ import com.gemstone.gemfire.internal.cache.versions.VersionTag; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; +import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap; import joptsimple.internal.Strings; /** @@ -77,6 +80,8 @@ public abstract class AbstractDiskRegion implements DiskRegionView { private String compressorClassName; private Compressor compressor; + private boolean offHeap; + /** * Records the version vector of what has been persisted to disk. * This may lag behind the version vector of what is in memory, because @@ -137,6 +142,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView { this.versionVector = drv.getRegionVersionVector(); this.compressorClassName = drv.getCompressorClassName(); this.compressor = drv.getCompressor(); + this.offHeap = drv.getOffHeap(); if (drv instanceof PlaceHolderDiskRegion) { this.setRVVTrusted(((PlaceHolderDiskRegion) drv).getRVVTrusted()); } @@ -218,6 +224,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView { this.versionVector = drv.getRegionVersionVector(); this.compressorClassName = drv.getCompressorClassName(); this.compressor = drv.getCompressor(); + this.offHeap = drv.getOffHeap(); } ////////////////////// Instance Methods ////////////////////// @@ -257,7 +264,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView { float loadFactor, boolean statisticsEnabled, boolean isBucket, EnumSet flags, String partitionName, int startingBucketId, - String compressorClassName) { + String compressorClassName, boolean offHeap) { this.lruAlgorithm = lruAlgorithm; this.lruAction = lruAction; this.lruLimit = lruLimit; @@ -273,6 +280,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView { this.partitionName = partitionName; this.startingBucketId = startingBucketId; this.compressorClassName = compressorClassName; + this.offHeap = offHeap; if (!ds.isOffline()) { createCompressorFromClassName(); } @@ -519,6 +527,21 @@ public abstract class AbstractDiskRegion implements DiskRegionView { if(isReadyForRecovery()) { ds.updateDiskRegion(this); this.entriesMapIncompatible = false; + if (this.entries != null) { + CustomEntryConcurrentHashMap other = ((AbstractRegionMap)this.entries)._getMap(); + Iterator> it = other + .entrySetWithReusableEntries().iterator(); + while (it.hasNext()) { + Map.Entry me = it.next(); + RegionEntry oldRe = (RegionEntry)me.getValue(); + if (oldRe instanceof OffHeapRegionEntry) { + ((OffHeapRegionEntry) oldRe).release(); + } else { + // no need to keep iterating; they are all either off heap or on heap. + break; + } + } + } this.entries = null; this.readyForRecovery = false; } @@ -746,6 +769,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView { msg += " -concurrencyLevel=" + getConcurrencyLevel() + " -initialCapacity=" + getInitialCapacity() + " -loadFactor=" + getLoadFactor() + + " -offHeap=" + getOffHeap() + " -compressor=" + (getCompressorClassName() == null ? "none" : getCompressorClassName()) + " -statisticsEnabled=" + getStatisticsEnabled(); if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY)) { @@ -787,6 +811,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView { sb.append("-concurrencyLevel=" + getConcurrencyLevel()); sb.append(lineSeparator); sb.append("-initialCapacity=" + getInitialCapacity()); sb.append(lineSeparator); sb.append("-loadFactor=" + getLoadFactor()); sb.append(lineSeparator); + sb.append("-offHeap=" + getOffHeap()); sb.append(lineSeparator); sb.append("-compressor=" + (getCompressorClassName() == null ? "none" : getCompressorClassName())); sb.append(lineSeparator); sb.append("-statisticsEnabled=" + getStatisticsEnabled()); sb.append(lineSeparator); @@ -860,6 +885,7 @@ public abstract class AbstractDiskRegion implements DiskRegionView { msg.append("\n\tconcurrencyLevel=").append(getConcurrencyLevel()); msg.append("\n\tinitialCapacity=").append(getInitialCapacity()); msg.append("\n\tloadFactor=").append(getLoadFactor()); + msg.append("\n\toffHeap=").append(getOffHeap()); msg.append("\n\tstatisticsEnabled=").append(getStatisticsEnabled()); msg.append("\n\tdrId=").append(getId()); @@ -936,6 +962,11 @@ public abstract class AbstractDiskRegion implements DiskRegionView { return this.compressor; } + @Override + public boolean getOffHeap() { + return this.offHeap; + } + public CachePerfStats getCachePerfStats() { return this.ds.getCache().getCachePerfStats(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java index 7d48082..5d6ce4d 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractDiskRegionEntry.java @@ -6,6 +6,11 @@ *========================================================================= */ package com.gemstone.gemfire.internal.cache; + +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; +import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue; +import com.gemstone.gemfire.internal.offheap.annotations.Released; +import com.gemstone.gemfire.internal.offheap.annotations.Retained; /** * * @author sbawaska @@ -38,9 +43,25 @@ public abstract class AbstractDiskRegionEntry @Override public void setValueWithContext(RegionEntryContext context, Object value) { _setValue(value); - //_setValue(compress(context,value)); // compress is now called in AbstractRegionMap.prepareValueForCache + if (value != null && context != null && (this instanceof OffHeapRegionEntry) + && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) { + ((OffHeapRegionEntry)this).release(); + ((LocalRegion)context).checkReadiness(); + } } // Do not add any instances fields to this class. // Instead add them to the DISK section of LeafRegionEntry.cpp. + + @Override + public void handleValueOverflow(RegionEntryContext context) { + if (context instanceof BucketRegionQueue || context instanceof SerialGatewaySenderQueue.SerialGatewaySenderQueueMetaRegion) { + GatewaySenderEventImpl.release(this._getValue()); // OFFHEAP _getValue ok + } + } + @Override + public void afterValueOverflow(RegionEntryContext context) { + //NO OP + //Overridden in sqlf RegionEntry + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java index e4fb873..c3d99b8 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java @@ -19,6 +19,7 @@ import com.gemstone.gemfire.cache.EvictionAction; import com.gemstone.gemfire.cache.EvictionAlgorithm; import com.gemstone.gemfire.cache.RegionDestroyedException; import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; import com.gemstone.gemfire.internal.cache.lru.EnableLRU; import com.gemstone.gemfire.internal.cache.lru.HeapEvictor; import com.gemstone.gemfire.internal.cache.lru.HeapLRUCapacityController; @@ -34,6 +35,7 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; +import com.gemstone.gemfire.internal.offheap.StoredObject; import com.gemstone.gemfire.internal.size.ReflectionSingleObjectSizer; /** @@ -70,8 +72,9 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap { ea = ((LocalRegion)owner).getEvictionAttributes().getAlgorithm(); ec = ((LocalRegion)owner).getEvictionController(); } else if (owner instanceof PlaceHolderDiskRegion) { - ea = ((PlaceHolderDiskRegion)owner).getActualLruAlgorithm(); - ec = ((PlaceHolderDiskRegion)owner).getEvictionAttributes().createEvictionController(null); + PlaceHolderDiskRegion phdr = (PlaceHolderDiskRegion)owner; + ea = phdr.getActualLruAlgorithm(); + ec = phdr.getEvictionAttributes().createEvictionController(null, phdr.getOffHeap()); } else { throw new IllegalStateException("expected LocalRegion or PlaceHolderDiskRegion"); } @@ -166,9 +169,15 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap { // make sure this cached deserializable is still in the entry // @todo what if a clear is done and this entry is no longer in the region? { - Object curVal = le._getValue(); + Object curVal = le._getValue(); // OFFHEAP: _getValue ok if (curVal != cd) { + if (cd instanceof StoredObject) { + if (!cd.equals(curVal)) { + return false; + } + } else { return false; + } } } // TODO:KIRK:OK if (le.getValueInVM((RegionEntryContext) _getOwnerObject()) != cd) return false; @@ -536,6 +545,20 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap { // reset the tx thread local } + private boolean mustEvict() { + LocalRegion owner = _getOwner(); + InternalResourceManager resourceManager = owner.getCache().getResourceManager(); + + final boolean monitorStateIsEviction; + if (!owner.getAttributes().getOffHeap()) { + monitorStateIsEviction = resourceManager.getHeapMonitor().getState().isEviction(); + } else { + monitorStateIsEviction = resourceManager.getOffHeapMonitor().getState().isEviction(); + } + + return monitorStateIsEviction && this.sizeInVM() > 0; + } + public final int centralizedLruUpdateCallback() { final boolean isDebugEnabled_LRU = logger.isTraceEnabled(LogMarker.LRU); @@ -550,8 +573,7 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap { } LRUStatistics stats = _getLruList().stats(); try { - while (_getOwner().getCache().getHeapEvictor().mustEvict() - && this.sizeInVM() > 0 && evictedBytes == 0) { + while (mustEvict() && evictedBytes == 0) { LRUEntry removalEntry = (LRUEntry)_getLruList().getLRUEntry(); if (removalEntry != null) { evictedBytes = evictEntry(removalEntry, stats); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java index 7e39f8c..3a31e90 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractOplogDiskRegionEntry.java @@ -13,6 +13,7 @@ import com.gemstone.gemfire.cache.EntryNotFoundException; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.internal.ByteArrayDataInput; import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.offheap.annotations.Retained; /** * Abstract implementation class of RegionEntry interface. @@ -67,9 +68,16 @@ public abstract class AbstractOplogDiskRegionEntry } @Override + @Retained + public final Object getValueRetain(RegionEntryContext context) { + return Helper.faultInValueRetain(this, (LocalRegion) context); + } + + @Override public final Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) { return Helper.getValueInVMOrDiskWithoutFaultIn(this, owner); } + @Retained @Override public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) { return Helper.getValueOffHeapOrDiskWithoutFaultIn(this, owner); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java index b9b7b26..1dcd918 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.logging.log4j.Logger; @@ -36,6 +37,7 @@ import com.gemstone.gemfire.cache.CacheLoaderException; import com.gemstone.gemfire.cache.CacheStatistics; import com.gemstone.gemfire.cache.CacheWriter; import com.gemstone.gemfire.cache.CacheWriterException; +import com.gemstone.gemfire.cache.CustomEvictionAttributes; import com.gemstone.gemfire.cache.CustomExpiry; import com.gemstone.gemfire.cache.DataPolicy; import com.gemstone.gemfire.cache.DiskWriteAttributes; @@ -43,6 +45,7 @@ import com.gemstone.gemfire.cache.EntryExistsException; import com.gemstone.gemfire.cache.EntryNotFoundException; import com.gemstone.gemfire.cache.EvictionAttributes; import com.gemstone.gemfire.cache.EvictionAttributesMutator; +import com.gemstone.gemfire.cache.EvictionCriteria; import com.gemstone.gemfire.cache.ExpirationAction; import com.gemstone.gemfire.cache.ExpirationAttributes; import com.gemstone.gemfire.cache.MembershipAttributes; @@ -91,6 +94,7 @@ import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.util.ArrayUtils; import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration; +import com.google.common.util.concurrent.Service.State; /** * Takes care of RegionAttributes, AttributesMutator, and some no-brainer method @@ -195,6 +199,12 @@ public abstract class AbstractRegion implements Region, RegionAttributes, protected boolean enableAsyncConflation; + /** + * True if this region uses off-heap memory; otherwise false (default) + * @since 9.0 + */ + protected boolean offHeap; + protected boolean cloningEnable = false; protected DiskWriteAttributes diskWriteAttributes; @@ -221,6 +231,8 @@ public abstract class AbstractRegion implements Region, RegionAttributes, protected EvictionAttributesImpl evictionAttributes = new EvictionAttributesImpl(); + protected CustomEvictionAttributes customEvictionAttributes; + /** The membership attributes defining required roles functionality */ protected MembershipAttributes membershipAttributes; @@ -243,6 +255,10 @@ public abstract class AbstractRegion implements Region, RegionAttributes, protected String poolName; + protected String hdfsStoreName; + + protected boolean hdfsWriteOnly; + protected Compressor compressor; /** @@ -883,6 +899,16 @@ public abstract class AbstractRegion implements Region, RegionAttributes, return this.subscriptionAttributes; } + @Override + public final String getHDFSStoreName() { + return this.hdfsStoreName; + } + + @Override + public final boolean getHDFSWriteOnly() { + return this.hdfsWriteOnly; + } + /** * Get IndexManger for region */ @@ -1700,6 +1726,16 @@ public abstract class AbstractRegion implements Region, RegionAttributes, this.dataPolicy = attrs.getDataPolicy(); // do this one first this.scope = attrs.getScope(); + this.offHeap = attrs.getOffHeap(); + + // fix bug #52033 by invoking setOffHeap now (localMaxMemory may now be the temporary placeholder for off-heap until DistributedSystem is created + // found non-null PartitionAttributes and offHeap is true so let's setOffHeap on PA now + PartitionAttributes pa = attrs.getPartitionAttributes(); + if (this.offHeap && pa != null) { + PartitionAttributesImpl impl = (PartitionAttributesImpl)pa; + impl.setOffHeap(this.offHeap); + } + this.evictionAttributes = new EvictionAttributesImpl((EvictionAttributesImpl)attrs .getEvictionAttributes()); if (attrs.getPartitionAttributes() != null @@ -1721,8 +1757,9 @@ public abstract class AbstractRegion implements Region, RegionAttributes, if (this.evictionAttributes != null && !this.evictionAttributes.getAlgorithm().isNone()) { this.setEvictionController(this.evictionAttributes - .createEvictionController(this)); + .createEvictionController(this, attrs.getOffHeap())); } + this.customEvictionAttributes = attrs.getCustomEvictionAttributes(); storeCacheListenersField(attrs.getCacheListeners()); assignCacheLoader(attrs.getCacheLoader()); assignCacheWriter(attrs.getCacheWriter()); @@ -1781,6 +1818,9 @@ public abstract class AbstractRegion implements Region, RegionAttributes, + "when multiuser-authentication is true."); } } + this.hdfsStoreName = attrs.getHDFSStoreName(); + this.hdfsWriteOnly = attrs.getHDFSWriteOnly(); + this.diskStoreName = attrs.getDiskStoreName(); this.isDiskSynchronous = attrs.isDiskSynchronous(); if (this.diskStoreName == null) { @@ -1845,11 +1885,52 @@ public abstract class AbstractRegion implements Region, RegionAttributes, return this.evictionAttributes; } + /** + * {@inheritDoc} + */ + @Override + public CustomEvictionAttributes getCustomEvictionAttributes() { + return this.customEvictionAttributes; + } + public EvictionAttributesMutator getEvictionAttributesMutator() { return this.evictionAttributes; } + /** + * {@inheritDoc} + */ + @Override + public CustomEvictionAttributes setCustomEvictionAttributes(long newStart, + long newInterval) { + checkReadiness(); + + if (this.customEvictionAttributes == null) { + throw new IllegalArgumentException( + LocalizedStrings.AbstractRegion_NO_CUSTOM_EVICTION_SET + .toLocalizedString(getFullPath())); + } + + if (newStart == 0) { + newStart = this.customEvictionAttributes.getEvictorStartTime(); + } + this.customEvictionAttributes = new CustomEvictionAttributesImpl( + this.customEvictionAttributes.getCriteria(), newStart, newInterval, + newStart == 0 && newInterval == 0); + +// if (this.evService == null) { +// initilializeCustomEvictor(); +// } else {// we are changing the earlier one which is already started. +// EvictorService service = getEvictorTask(); +// service.changeEvictionInterval(newInterval); +// if (newStart != 0) +// service.changeStartTime(newStart); +// } + + return this.customEvictionAttributes; + } + public void setEvictionController(LRUAlgorithm evictionController) { this.evictionController = evictionController; @@ -1987,10 +2068,98 @@ public abstract class AbstractRegion implements Region, RegionAttributes, } /** - * @since 8.1 - */ + * @since 8.1 + * property used to find region operations that reach out to HDFS multiple times + */ @Override public ExtensionPoint> getExtensionPoint() { return extensionPoint; } + + public boolean getOffHeap() { + return this.offHeap; + } + /** + * property used to find region operations that reach out to HDFS multiple times + */ + private static final boolean DEBUG_HDFS_CALLS = Boolean.getBoolean("DebugHDFSCalls"); + + /** + * throws exception if region operation goes out to HDFS multiple times + */ + private static final boolean THROW_ON_MULTIPLE_HDFS_CALLS = Boolean.getBoolean("throwOnMultipleHDFSCalls"); + + private ThreadLocal logHDFSCalls = DEBUG_HDFS_CALLS ? new ThreadLocal() : null; + + public void hdfsCalled(Object key) { + if (!DEBUG_HDFS_CALLS) { + return; + } + logHDFSCalls.get().addStack(new Throwable()); + logHDFSCalls.get().setKey(key); + } + public final void operationStart() { + if (!DEBUG_HDFS_CALLS) { + return; + } + if (logHDFSCalls.get() == null) { + logHDFSCalls.set(new CallLog()); + //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationStart", new Throwable()); + } else { + logHDFSCalls.get().incNestedCall(); + //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:incNestedCall:", new Throwable()); + } + } + public final void operationCompleted() { + if (!DEBUG_HDFS_CALLS) { + return; + } + //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationCompleted", new Throwable()); + if (logHDFSCalls.get() != null && logHDFSCalls.get().decNestedCall() < 0) { + logHDFSCalls.get().assertCalls(); + logHDFSCalls.set(null); + } + } + + public static class CallLog { + private List stackTraces = new ArrayList(); + private Object key; + private int nestedCall = 0; + public void incNestedCall() { + nestedCall++; + } + public int decNestedCall() { + return --nestedCall; + } + public void addStack(Throwable stack) { + this.stackTraces.add(stack); + } + public void setKey(Object key) { + this.key = key; + } + public void assertCalls() { + if (stackTraces.size() > 1) { + Throwable firstTrace = new Throwable(); + Throwable lastTrace = firstTrace; + for (Throwable t : this.stackTraces) { + lastTrace.initCause(t); + lastTrace = t; + } + if (THROW_ON_MULTIPLE_HDFS_CALLS) { + throw new RuntimeException("SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace); + } else { + InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace); + } + } + } + } + + public EvictionCriteria getEvictionCriteria() { + EvictionCriteria criteria = null; + if (this.customEvictionAttributes != null + && !this.customEvictionAttributes.isEvictIncoming()) { + criteria = this.customEvictionAttributes.getCriteria(); + } + return criteria; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java index d7f3963..809996b 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java @@ -9,7 +9,11 @@ package com.gemstone.gemfire.internal.cache; import java.io.IOException; +import java.util.Arrays; + import org.apache.logging.log4j.Logger; +import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE; +import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE; import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.InvalidDeltaException; @@ -41,11 +45,24 @@ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; import com.gemstone.gemfire.internal.cache.versions.VersionSource; import com.gemstone.gemfire.internal.cache.versions.VersionStamp; import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.lang.StringUtils; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; +import com.gemstone.gemfire.internal.offheap.MemoryAllocator; +import com.gemstone.gemfire.internal.offheap.OffHeapCachedDeserializable; +import com.gemstone.gemfire.internal.offheap.OffHeapHelper; +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl; +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk; +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.ChunkType; +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.GemFireChunk; +import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.GemFireChunkType; +import com.gemstone.gemfire.internal.offheap.StoredObject; +import com.gemstone.gemfire.internal.offheap.annotations.Released; +import com.gemstone.gemfire.internal.offheap.annotations.Retained; +import com.gemstone.gemfire.internal.offheap.annotations.Unretained; import com.gemstone.gemfire.internal.util.BlobHelper; import com.gemstone.gemfire.internal.util.Versionable; import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap; @@ -54,7 +71,8 @@ import com.gemstone.gemfire.pdx.PdxInstance; import com.gemstone.gemfire.pdx.PdxSerializable; import com.gemstone.gemfire.pdx.PdxSerializationException; import com.gemstone.gemfire.pdx.PdxSerializer; - +import com.gemstone.gemfire.pdx.internal.ConvertableToBytes; +import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl; /** * Abstract implementation class of RegionEntry interface. @@ -100,10 +118,13 @@ public abstract class AbstractRegionEntry implements RegionEntry, protected static final long IN_USE_BY_TX = 0x40L<<56; + protected static final long MARKED_FOR_EVICTION = 0x80L<<56; // public Exception removeTrace; // debugging hot loop in AbstractRegionMap.basicPut() - protected AbstractRegionEntry(RegionEntryContext context, Object value) { - setValue(context,AbstractRegionMap.prepareValueForCache(context, value),false); + protected AbstractRegionEntry(RegionEntryContext context, + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object value) { + + setValue(context,this.prepareValueForCache(context, value, false),false); // setLastModified(System.currentTimeMillis()); [bruce] this must be set later so we can use ==0 to know this is a new entry in checkForConflicts } @@ -137,7 +158,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, return true; } finally { - if (isRemoved() && !isTombstone()) { + if (isRemoved() && !isTombstone() && !event.isEvicted()) { // Phase 2 of region entry removal is done here. The first phase is done // by the RegionMap. It is unclear why this code is needed. ARM destroy // does this also and we are now doing it as phase3 of the ARM destroy. @@ -226,7 +247,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, @Override - public void setValueWithTombstoneCheck(Object v, EntryEvent e) throws RegionClearedException { + public void setValueWithTombstoneCheck(@Unretained Object v, EntryEvent e) throws RegionClearedException { if (v == Token.TOMBSTONE) { makeTombstone((LocalRegion)e.getRegion(), ((EntryEventImpl)e).getVersionTag()); } else { @@ -266,15 +287,14 @@ public abstract class AbstractRegionEntry implements RegionEntry, return getValueAsToken() == Token.REMOVED_PHASE2; } - // RegionEntry.fillInValue(...) public boolean fillInValue(LocalRegion region, - InitialImageOperation.Entry dst, + @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry dst, ByteArrayDataInput in, DM mgr) { dst.setSerialized(false); // starting default value - final Object v; + @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) final Object v; if (isTombstone()) { v = Token.TOMBSTONE; } else { @@ -300,46 +320,59 @@ public abstract class AbstractRegionEntry implements RegionEntry, } else if (v instanceof CachedDeserializable) { // don't serialize here if it is not already serialized - { - Object tmp = ((CachedDeserializable)v).getValue(); - if (tmp instanceof byte[]) { - byte[] bb = (byte[])tmp; - dst.value = bb; - } - else if (isEagerDeserialize && tmp instanceof byte[][]) { - // optimize for byte[][] since it will need to be eagerly deserialized - // for SQLFabric - dst.value = tmp; - dst.setEagerDeserialize(); - } - else { - try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); - BlobHelper.serializeTo(tmp, hdos); - hdos.trim(); - dst.value = hdos; - } catch (IOException e) { - RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString()); - e2.initCause(e); - throw e2; +// if(v instanceof ByteSource && CachedDeserializableFactory.preferObject()) { +// // For SQLFire we prefer eager deserialized +// dst.setEagerDeserialize(); +// } + + if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) { + dst.value = ((StoredObject) v).getDeserializedForReading(); + } else { + /*if (v instanceof ByteSource && CachedDeserializableFactory.preferObject()) { + dst.value = v; + } else */ { + Object tmp = ((CachedDeserializable) v).getValue(); + if (tmp instanceof byte[]) { + byte[] bb = (byte[]) tmp; + dst.value = bb; + } else { + try { + HeapDataOutputStream hdos = new HeapDataOutputStream( + Version.CURRENT); + BlobHelper.serializeTo(tmp, hdos); + hdos.trim(); + dst.value = hdos; + } catch (IOException e) { + RuntimeException e2 = new IllegalArgumentException( + LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING + .toLocalizedString()); + e2.initCause(e); + throw e2; + } } + dst.setSerialized(true); } - dst.setSerialized(true); } } else if (v instanceof byte[]) { dst.value = v; } - else if (isEagerDeserialize && v instanceof byte[][]) { - // optimize for byte[][] since it will need to be eagerly deserialized - // for SQLFabric - dst.value = v; + else { + Object preparedValue = v; + if (preparedValue != null) { + preparedValue = prepareValueForGII(preparedValue); + if (preparedValue == null) { + return false; + } + } + if (CachedDeserializableFactory.preferObject()) { + dst.value = preparedValue; dst.setEagerDeserialize(); } else { try { HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); - BlobHelper.serializeTo(v, hdos); + BlobHelper.serializeTo(preparedValue, hdos); hdos.trim(); dst.value = hdos; dst.setSerialized(true); @@ -349,16 +382,32 @@ public abstract class AbstractRegionEntry implements RegionEntry, throw e2; } } + } return true; } + /** + * To fix bug 49901 if v is a GatewaySenderEventImpl then make + * a heap copy of it if it is offheap. + * @return the value to provide to the gii request; null if no value should be provided. + */ + public static Object prepareValueForGII(Object v) { + assert v != null; + if (v instanceof GatewaySenderEventImpl) { + return ((GatewaySenderEventImpl) v).makeHeapCopyIfOffHeap(); + } else { + return v; + } + } + public boolean isOverflowedToDisk(LocalRegion r, DistributedRegion.DiskPosition dp) { return false; } @Override public Object getValue(RegionEntryContext context) { - Object result = _getValueUse(context, true); + SimpleMemoryAllocatorImpl.createReferenceCountOwner(); + @Retained Object result = _getValueRetain(context, true); //Asif: If the thread is an Index Creation Thread & the value obtained is //Token.REMOVED , we can skip synchronization block. This is required to prevent // the dead lock caused if an Index Update Thread has gone into a wait holding the @@ -373,6 +422,21 @@ public abstract class AbstractRegionEntry implements RegionEntry, // } if (Token.isRemoved(result)) { + SimpleMemoryAllocatorImpl.setReferenceCountOwner(null); + return null; + } else { + result = OffHeapHelper.copyAndReleaseIfNeeded(result); // sqlf does not dec ref count in this call + SimpleMemoryAllocatorImpl.setReferenceCountOwner(null); + setRecentlyUsed(); + return result; + } + } + + @Override + @Retained + public Object getValueRetain(RegionEntryContext context) { + @Retained Object result = _getValueRetain(context, true); + if (Token.isRemoved(result)) { return null; } else { setRecentlyUsed(); @@ -381,7 +445,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, } @Override - public void setValue(RegionEntryContext context, Object value) throws RegionClearedException { + @Released + public void setValue(RegionEntryContext context, @Unretained Object value) throws RegionClearedException { // @todo darrel: This will mark new entries as being recently used // It might be better to only mark them when they are modified. // Or should we only mark them on reads? @@ -393,10 +458,14 @@ public abstract class AbstractRegionEntry implements RegionEntry, setValue(context,value); } - protected void setValue(RegionEntryContext context, Object value, boolean recentlyUsed) { - // value = compress(context,value); // compress is now called in AbstractRegionMap.prepareValueForCache - + @Released + protected void setValue(RegionEntryContext context, @Unretained Object value, boolean recentlyUsed) { _setValue(value); + if (value != null && context != null && (this instanceof OffHeapRegionEntry) + && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) { + ((OffHeapRegionEntry)this).release(); + ((LocalRegion)context).checkReadiness(); + } if (recentlyUsed) { setRecentlyUsed(); } @@ -473,21 +542,48 @@ public abstract class AbstractRegionEntry implements RegionEntry, return value; } + private static byte[] compressBytes(RegionEntryContext context, byte[] uncompressedBytes) { + byte[] result = uncompressedBytes; + if (isCompressible(context, uncompressedBytes)) { + long time = context.getCachePerfStats().startCompression(); + result = context.getCompressor().compress(uncompressedBytes); + context.getCachePerfStats().endCompression(time, uncompressedBytes.length, result.length); + } + return result; + } + + + @Retained public final Object getValueInVM(RegionEntryContext context) { - Object v = _getValueUse(context, true); + SimpleMemoryAllocatorImpl.createReferenceCountOwner(); + @Retained Object v = _getValueRetain(context, true); if (v == null) { // should only be possible if disk entry v = Token.NOT_AVAILABLE; } - return v; + @Retained Object result = OffHeapHelper.copyAndReleaseIfNeeded(v); // TODO OFFHEAP keep it offheap? + SimpleMemoryAllocatorImpl.setReferenceCountOwner(null); + return result; } + @Retained public Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) { return getValueInVM(owner); } + @Override + @Retained public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) { - return _getValueUse(owner, true); + @Retained Object result = _getValueRetain(owner, true); +// if (result instanceof ByteSource) { +// // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it +// Object deserVal = ((CachedDeserializable)result).getDeserializedForReading(); +// if (deserVal != result) { +// OffHeapHelper.release(result); +// result = deserVal; +// } +// } + return result; } public Object getValueOnDisk(LocalRegion r) @@ -542,7 +638,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, // Because the pr meta data region will not have an LRU. newValueToWrite = ((CachedDeserializable) newValueToWrite).getDeserializedValue(region, null); if (!create && newValueToWrite instanceof Versionable) { - final Object oldValue = getValueInVM(region); // Heap value should always be deserialized at this point // OFFHEAP will not be deserialized + @Retained @Released final Object oldValue = getValueInVM(region); // Heap value should always be deserialized at this point // OFFHEAP will not be deserialized + try { // BUGFIX for 35029. If oldValue is null the newValue should be put. if(oldValue == null) { putValue = true; @@ -552,6 +649,9 @@ public abstract class AbstractRegionEntry implements RegionEntry, Versionable ov = (Versionable) oldValue; putValue = nv.isNewerThan(ov); } + } finally { + OffHeapHelper.release(oldValue); + } } } @@ -593,7 +693,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, } } } - setValue(region, AbstractRegionMap.prepareValueForCache(region, newValueToWrite)); + setValue(region, this.prepareValueForCache(region, newValueToWrite, false)); result = true; if (newValueToWrite != Token.TOMBSTONE){ @@ -621,11 +721,12 @@ public abstract class AbstractRegionEntry implements RegionEntry, * @throws EntryNotFoundException if expectedOldValue is * not null and is not equal to current value */ + @Released public final boolean destroy(LocalRegion region, EntryEventImpl event, boolean inTokenMode, boolean cacheWrite, - Object expectedOldValue, + @Unretained Object expectedOldValue, boolean forceDestroy, boolean removeRecoveredEntry) throws CacheWriterException, @@ -648,7 +749,10 @@ public abstract class AbstractRegionEntry implements RegionEntry, // :ezoerner:20080814 We also read old value from disk or buffer // in the case where there is a non-null expectedOldValue // see PartitionedRegion#remove(Object key, Object value) - Object curValue = _getValueUse(region, true); + SimpleMemoryAllocatorImpl.skipRefCountTracking(); + @Retained @Released Object curValue = _getValueRetain(region, true); + SimpleMemoryAllocatorImpl.unskipRefCountTracking(); + try { if (curValue == null) curValue = Token.NOT_AVAILABLE; if (curValue == Token.NOT_AVAILABLE) { @@ -675,7 +779,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, } if (expectedOldValue != null) { - if (!checkExpectedOldValue(expectedOldValue, curValue)) { + if (!checkExpectedOldValue(expectedOldValue, curValue, region)) { throw new EntryNotFoundException( LocalizedStrings.AbstractRegionEntry_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString()); } @@ -685,12 +789,15 @@ public abstract class AbstractRegionEntry implements RegionEntry, proceed = true; } else { - proceed = event.setOldValue(curValue) || removeRecoveredEntry + proceed = event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl) || removeRecoveredEntry || forceDestroy || region.getConcurrencyChecksEnabled() // fix for bug #47868 - create a tombstone || (event.getOperation() == Operation.REMOVE // fix for bug #42242 && (curValue == null || curValue == Token.LOCAL_INVALID || curValue == Token.INVALID)); } + } finally { + OffHeapHelper.releaseWithNoTracking(curValue); + } } // end curValue block if (proceed) { @@ -718,7 +825,16 @@ public abstract class AbstractRegionEntry implements RegionEntry, if (indexManager != null) { try { if(isValueNull()) { - _setValue(AbstractRegionMap.prepareValueForCache(region, getValueInVMOrDiskWithoutFaultIn(region))); + @Released Object value = getValueOffHeapOrDiskWithoutFaultIn(region); + try { + _setValue(prepareValueForCache(region, value, false)); + if (value != null && region != null && (this instanceof OffHeapRegionEntry) && region.isThisRegionBeingClosedOrDestroyed()) { + ((OffHeapRegionEntry)this).release(); + region.checkReadiness(); + } + } finally { + OffHeapHelper.release(value); + } } indexManager.updateIndexes(this, IndexManager.REMOVE_ENTRY, @@ -750,9 +866,17 @@ public abstract class AbstractRegionEntry implements RegionEntry, removeEntry = true; } - if (removeEntry) { + // See #47887, we do not insert a tombstone for evicted HDFS + // entries since the value is still present in HDFS + // Check if we have to evict or just do destroy. + boolean forceRemoveEntry = + (event.isEviction() || event.isExpiration()) + && event.getRegion().isUsedForPartitionedRegionBucket() + && event.getRegion().getPartitionedRegion().isHDFSRegion(); + + if (removeEntry || forceRemoveEntry) { boolean isThisTombstone = isTombstone(); - if(inTokenMode) { + if(inTokenMode && !event.getOperation().isEviction()) { setValue(region, Token.DESTROYED); } else { removePhase1(region, false); @@ -770,35 +894,161 @@ public abstract class AbstractRegionEntry implements RegionEntry, return false; } } + + - static boolean checkExpectedOldValue(Object expectedOldValue, Object actualValue) { + static boolean checkExpectedOldValue(@Unretained Object expectedOldValue, @Unretained Object actualValue, LocalRegion lr) { if (Token.isInvalid(expectedOldValue)) { return (actualValue == null) || Token.isInvalid(actualValue); } else { - return checkEquals(expectedOldValue, actualValue); + boolean isCompressedOffHeap = lr.getAttributes().getOffHeap() && lr.getAttributes().getCompressor() != null; + return checkEquals(expectedOldValue, actualValue, isCompressedOffHeap); + } + } + + private static boolean basicEquals(Object v1, Object v2) { + if (v2 != null) { + if (v2.getClass().isArray()) { + // fix for 52093 + if (v2 instanceof byte[]) { + if (v1 instanceof byte[]) { + return Arrays.equals((byte[])v2, (byte[])v1); + } else { + return false; + } + } else if (v2 instanceof Object[]) { + if (v1 instanceof Object[]) { + return Arrays.deepEquals((Object[])v2, (Object[])v1); + } else { + return false; + } + } else if (v2 instanceof int[]) { + if (v1 instanceof int[]) { + return Arrays.equals((int[])v2, (int[])v1); + } else { + return false; + } + } else if (v2 instanceof long[]) { + if (v1 instanceof long[]) { + return Arrays.equals((long[])v2, (long[])v1); + } else { + return false; + } + } else if (v2 instanceof boolean[]) { + if (v1 instanceof boolean[]) { + return Arrays.equals((boolean[])v2, (boolean[])v1); + } else { + return false; + } + } else if (v2 instanceof short[]) { + if (v1 instanceof short[]) { + return Arrays.equals((short[])v2, (short[])v1); + } else { + return false; + } + } else if (v2 instanceof char[]) { + if (v1 instanceof char[]) { + return Arrays.equals((char[])v2, (char[])v1); + } else { + return false; + } + } else if (v2 instanceof float[]) { + if (v1 instanceof float[]) { + return Arrays.equals((float[])v2, (float[])v1); + } else { + return false; + } + } else if (v2 instanceof double[]) { + if (v1 instanceof double[]) { + return Arrays.equals((double[])v2, (double[])v1); + } else { + return false; + } + } + // fall through and call equals method + } + return v2.equals(v1); + } else { + return v1 == null; } } - static boolean checkEquals(Object v1, Object v2) { + static boolean checkEquals(@Unretained Object v1, @Unretained Object v2, boolean isCompressedOffHeap) { // need to give PdxInstance#equals priority if (v1 instanceof PdxInstance) { return checkPdxEquals((PdxInstance)v1, v2); } else if (v2 instanceof PdxInstance) { return checkPdxEquals((PdxInstance)v2, v1); + } else if (v1 instanceof OffHeapCachedDeserializable) { + return checkOffHeapEquals((OffHeapCachedDeserializable)v1, v2); + } else if (v2 instanceof OffHeapCachedDeserializable) { + return checkOffHeapEquals((OffHeapCachedDeserializable)v2, v1); } else if (v1 instanceof CachedDeserializable) { - return checkCDEquals((CachedDeserializable)v1, v2); + return checkCDEquals((CachedDeserializable)v1, v2, isCompressedOffHeap); } else if (v2 instanceof CachedDeserializable) { - return checkCDEquals((CachedDeserializable)v2, v1); + return checkCDEquals((CachedDeserializable)v2, v1, isCompressedOffHeap); } else { - if (v2 != null) { - return v2.equals(v1); + return basicEquals(v1, v2); + } + } + private static boolean checkOffHeapEquals(@Unretained OffHeapCachedDeserializable cd, @Unretained Object obj) { + if (cd.isSerializedPdxInstance()) { + PdxInstance pi = InternalDataSerializer.readPdxInstance(cd.getSerializedValue(), GemFireCacheImpl.getForPdx("Could not check value equality")); + return checkPdxEquals(pi, obj); + } + if (obj instanceof OffHeapCachedDeserializable) { + return cd.checkDataEquals((OffHeapCachedDeserializable)obj); + } else { + byte[] serializedObj; + if (obj instanceof CachedDeserializable) { + if (!cd.isSerialized()) { + if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) { + // both are byte[] + // obj must be DataAsAddress since it was not OffHeapCachedDeserializable + // so its byte[] will be small. + byte[] objBytes = (byte[]) ((StoredObject) obj).getDeserializedForReading(); + return cd.checkDataEquals(objBytes); + } else { + return false; + } + } + serializedObj = ((CachedDeserializable) obj).getSerializedValue(); + } else if (obj instanceof byte[]) { + if (cd.isSerialized()) { + return false; + } + serializedObj = (byte[]) obj; } else { - return v1 == null; + if (!cd.isSerialized()) { + return false; + } + if (obj == null || obj == Token.NOT_AVAILABLE + || Token.isInvalidOrRemoved(obj)) { + return false; + } + serializedObj = EntryEventImpl.serialize(obj); } + return cd.checkDataEquals(serializedObj); } } - private static boolean checkCDEquals(CachedDeserializable cd, Object obj) { + private static boolean checkCDEquals(CachedDeserializable cd, Object obj, boolean isCompressedOffHeap) { + if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) { + // cd is an actual byte[]. + byte[] ba2; + if (obj instanceof StoredObject) { + if (!((StoredObject) obj).isSerialized()) { + return false; + } + ba2 = (byte[]) ((StoredObject) obj).getDeserializedForReading(); + } else if (obj instanceof byte[]) { + ba2 = (byte[]) obj; + } else { + return false; + } + byte[] ba1 = (byte[]) cd.getDeserializedForReading(); + return Arrays.equals(ba1, ba2); + } Object cdVal = cd.getValue(); if (cdVal instanceof byte[]) { byte[] cdValBytes = (byte[])cdVal; @@ -806,14 +1056,21 @@ public abstract class AbstractRegionEntry implements RegionEntry, if (pi != null) { return checkPdxEquals(pi, obj); } - //byte[] serializedObj; - /** - * To be more compatible with previous releases do not compare the serialized forms here. - * Instead deserialize and call the equals method. - */ + if (isCompressedOffHeap) { // fix for bug 52248 + byte[] serializedObj; + if (obj instanceof CachedDeserializable) { + serializedObj = ((CachedDeserializable) obj).getSerializedValue(); + } else { + serializedObj = EntryEventImpl.serialize(obj); + } + return Arrays.equals(cdValBytes, serializedObj); + } else { + /** + * To be more compatible with previous releases do not compare the serialized forms here. + * Instead deserialize and call the equals method. + */ Object deserializedObj; if (obj instanceof CachedDeserializable) { - //serializedObj = ((CachedDeserializable) obj).getSerializedValue(); deserializedObj =((CachedDeserializable) obj).getDeserializedForReading(); } else { if (obj == null || obj == Token.NOT_AVAILABLE @@ -822,15 +1079,19 @@ public abstract class AbstractRegionEntry implements RegionEntry, } // TODO OPTIMIZE: Before serializing all of obj we could get the top // level class name of cdVal and compare it to the top level class name of obj. - //serializedObj = EntryEventImpl.serialize(obj); deserializedObj = obj; } - return cd.getDeserializedForReading().equals(deserializedObj); + return basicEquals(deserializedObj, cd.getDeserializedForReading()); + } // boolean result = Arrays.equals((byte[])cdVal, serializedObj); // if (!result) { // try { // Object o1 = BlobHelper.deserializeBlob((byte[])cdVal); // Object o2 = BlobHelper.deserializeBlob(serializedObj); +// SimpleMemoryAllocatorImpl.debugLog("checkCDEquals o1=<" + o1 + "> o2=<" + o2 + ">", false); +// if (o1.equals(o2)) { +// SimpleMemoryAllocatorImpl.debugLog("they are equal! a1=<" + Arrays.toString((byte[])cdVal) + "> a2=<" + Arrays.toString(serializedObj) + ">", false); +// } // } catch (IOException e) { // // TODO Auto-generated catch block // e.printStackTrace(); @@ -847,7 +1108,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, // class name of cdVal and the top level class name of obj and compare. obj = ((CachedDeserializable) obj).getDeserializedForReading(); } - return cdVal.equals(obj); + return basicEquals(cdVal, obj); } } /** @@ -857,6 +1118,10 @@ public abstract class AbstractRegionEntry implements RegionEntry, if (!(obj instanceof PdxInstance)) { // obj may be a CachedDeserializable in which case we want to convert it to a PdxInstance even if we are not readSerialized. if (obj instanceof CachedDeserializable) { + if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) { + // obj is actually a byte[] which will never be equal to a PdxInstance + return false; + } Object cdVal = ((CachedDeserializable) obj).getValue(); if (cdVal instanceof byte[]) { byte[] cdValBytes = (byte[]) cdVal; @@ -901,7 +1166,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, } } } - return obj.equals(pdx); + return basicEquals(obj, pdx); } @@ -937,6 +1202,16 @@ public abstract class AbstractRegionEntry implements RegionEntry, public abstract Object getKey(); + protected static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) { + if (v == null) return false; + if (Token.isInvalidOrRemoved(v)) return false; + if (v == Token.NOT_AVAILABLE) return false; + if (v instanceof DiskEntry.RecoveredEntry) return false; // The disk layer has special logic that ends up storing the nested value in the RecoveredEntry off heap + if (!(e instanceof OffHeapRegionEntry)) return false; + // TODO should we check for deltas here or is that a user error? + return true; + } + /** * Default implementation. Override in subclasses with primitive keys * to prevent creating an Object form of the key for each equality check. @@ -993,6 +1268,120 @@ public abstract class AbstractRegionEntry implements RegionEntry, } while(!done); } + @Override + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) + public Object prepareValueForCache(RegionEntryContext r, + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val, + boolean isEntryUpdate) { + return prepareValueForCache(r, val, null, isEntryUpdate); + } + + @Override + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) + public Object prepareValueForCache(RegionEntryContext r, + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val, + EntryEventImpl event, boolean isEntryUpdate) { + if (r != null && r.getOffHeap() && okToStoreOffHeap(val, this)) { + if (val instanceof StoredObject) { + // Check to see if val has the same compression settings as this region. + // The recursive calls in this section are safe because + // we only do it after copy the off-heap value to the heap. + // This is needed to fix bug 52057. + StoredObject soVal = (StoredObject) val; + assert !soVal.isCompressed(); + if (r.getCompressor() != null) { + // val is uncompressed and we need a compressed value. + // So copy the off-heap value to the heap in a form that can be compressed. + byte[] valAsBytes = soVal.getValueAsHeapByteArray(); + Object heapValue; + if (soVal.isSerialized()) { + heapValue = CachedDeserializableFactory.create(valAsBytes); + } else { + heapValue = valAsBytes; + } + return prepareValueForCache(r, heapValue, event, isEntryUpdate); + } + if (val instanceof Chunk) { + // if the reused guy has a refcount then need to inc it + if (!((Chunk)val).retain()) { + throw new IllegalStateException("Could not use an off heap value because it was freed"); + } + } + // else it is DataAsAddress. This code just returns it as prepared. + // TODO OFFHEAP: Review the callers to see if they will handle DataAsAddress correctly. + } else { + byte[] data; + boolean isSerialized = !(val instanceof byte[]); + if (isSerialized) { + if (event != null && event.getCachedSerializedNewValue() != null) { + data = event.getCachedSerializedNewValue(); + } else if (val instanceof CachedDeserializable) { + data = ((CachedDeserializable)val).getSerializedValue(); + // TODO OFFHEAP: cache data in event? + } else if (val instanceof PdxInstance) { + try { + data = ((ConvertableToBytes)val).toBytes(); + // TODO OFFHEAP: cache data in event? + } catch (IOException e) { + throw new PdxSerializationException("Could not convert " + val + " to bytes", e); + } + } else { + data = EntryEventImpl.serialize(val); + // TODO OFFHEAP: cache data in event? + } + } else { + data = (byte[]) val; + } + byte[] compressedData = compressBytes(r, data); + boolean isCompressed = compressedData != data; + SimpleMemoryAllocatorImpl.setReferenceCountOwner(this); + MemoryAllocator ma = SimpleMemoryAllocatorImpl.getAllocator(); // fix for bug 47875 + val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed, GemFireChunk.TYPE); // TODO:KIRK:48068 race happens right after this line + SimpleMemoryAllocatorImpl.setReferenceCountOwner(null); + if (val instanceof GemFireChunk) { + val = new com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.ChunkWithHeapForm((GemFireChunk)val, data); + } +// if (val instanceof Chunk && r instanceof LocalRegion) { +// Chunk c = (Chunk) val; +// LocalRegion lr = (LocalRegion) r; +// SimpleMemoryAllocatorImpl.debugLog("allocated @" + Long.toHexString(c.getMemoryAddress()) + " reg=" + lr.getFullPath(), false); +// } + } + return val; + } + @Unretained Object nv = val; + if (nv instanceof StoredObject) { + // This off heap value is being put into a on heap region. + byte[] data = ((StoredObject) nv).getSerializedValue(); + nv = CachedDeserializableFactory.create(data); + } + // don't bother checking for SQLFire + if (!GemFireCacheImpl.sqlfSystem() && nv instanceof PdxInstanceImpl) { + // We do not want to put PDXs in the cache as values. + // So get the serialized bytes and use a CachedDeserializable. + try { + byte[] data = ((ConvertableToBytes)nv).toBytes(); + byte[] compressedData = compressBytes(r, data); + if (data == compressedData) { + nv = CachedDeserializableFactory.create(data); + } else { + nv = compressedData; + } + } catch (IOException e) { + throw new PdxSerializationException("Could not convert " + nv + " to bytes", e); + } + } else { + nv = compress(r, nv, event); + } + return nv; + } + + @Override + @Unretained + public final Object _getValue() { + return getValueField(); + } + public final boolean isUpdateInProgress() { return areAnyBitsSet(UPDATE_IN_PROGRESS); } @@ -1037,6 +1426,29 @@ public abstract class AbstractRegionEntry implements RegionEntry, TXManagerImpl.incRefCount(this); setInUseByTransaction(true); } + /** + * {@inheritDoc} + */ + @Override + public final boolean isMarkedForEviction() { + return areAnyBitsSet(MARKED_FOR_EVICTION); + } + + /** + * {@inheritDoc} + */ + @Override + public final void setMarkedForEviction() { + setBits(MARKED_FOR_EVICTION); + } + + /** + * {@inheritDoc} + */ + @Override + public final void clearMarkedForEviction() { + clearBits(~MARKED_FOR_EVICTION); + } @Override public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) { @@ -1068,16 +1480,12 @@ public abstract class AbstractRegionEntry implements RegionEntry, * Instead of overriding this method; override areSetValue. */ protected final void _setValue(Object val) { - areSetValue(val); + setValueField(val); } @Override - public final Object _getValue() { - return areGetValue(); - } - @Override public Token getValueAsToken() { - Object v = areGetValue(); + Object v = getValueField(); if (v == null || v instanceof Token) { return (Token)v; } else { @@ -1085,11 +1493,23 @@ public abstract class AbstractRegionEntry implements RegionEntry, } } - protected abstract Object areGetValue(); - protected abstract void areSetValue(Object v); + /** + * Reads the value of this region entry. + * Provides low level access to the value field. + * @return possible OFF_HEAP_OBJECT (caller uses region entry reference) + */ + @Unretained + protected abstract Object getValueField(); + /** + * Set the value of this region entry. + * Provides low level access to the value field. + * @param v the new value to set + */ + protected abstract void setValueField(@Unretained Object v); + @Retained public Object getTransformedValue() { - return _getValueUse(null, false); + return _getValueRetain(null, false); } public final boolean getValueWasResultOfSearch() { @@ -1187,7 +1607,19 @@ public abstract class AbstractRegionEntry implements RegionEntry, VersionTag tag = VersionTag.create(mbr); tag.setEntryVersion(v); if (region.getVersionVector() != null) { - tag.setRegionVersion(region.getVersionVector().getNextVersion()); + // Use region version if already provided, else generate + long nextRegionVersion = event.getNextRegionVersion(); + if (nextRegionVersion != -1) { + // Set on the tag and record it locally + tag.setRegionVersion(nextRegionVersion); + RegionVersionVector rvv = region.getVersionVector(); + rvv.recordVersion(rvv.getOwnerId(),nextRegionVersion); + if (logger.isDebugEnabled()) { + logger.debug("recorded region version {}; region={}", nextRegionVersion, region.getFullPath()); + } + } else { + tag.setRegionVersion(region.getVersionVector().getNextVersion()); + } } if (withDelta) { tag.setPreviousMemberID(previous); @@ -1214,7 +1646,10 @@ public abstract class AbstractRegionEntry implements RegionEntry, stamp.setMemberID(mbr); event.setVersionTag(tag); if (logger.isDebugEnabled()) { - logger.debug("generated tag {}; region={}; rvv={}", tag, region.getFullPath(), region.getVersionVector()); + logger.debug("generated tag {}; key={}; oldvalue={} newvalue={} client={} region={}; rvv={}", tag, + event.getKey(), event.getOldValue(), event.getNewValue(), + (event.getContext() == null? "none" : event.getContext().getDistributedMember().getName()), + region.getFullPath(), region.getVersionVector()); } return tag; } @@ -1692,6 +2127,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, SystemFailure.checkFailure(); logger.error(LocalizedMessage.create(LocalizedStrings.LocalRegion_EXCEPTION_OCCURRED_IN_CONFLICTRESOLVER), t); thr = t; + } finally { + timestampedEvent.release(); } if (isDebugEnabled) { @@ -1775,8 +2212,13 @@ public abstract class AbstractRegionEntry implements RegionEntry, */ public static final int MAX_INLINE_STRING_KEY_BYTE_ENCODING = 15; + /** + * This is only retained in off-heap subclasses. However, it's marked as + * Retained here so that callers are aware that the value may be retained. + */ @Override - public Object _getValueUse(RegionEntryContext context, boolean decompress) { + @Retained + public Object _getValueRetain(RegionEntryContext context, boolean decompress) { if (decompress) { return decompress(context, _getValue()); } else {