Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E655718FA9 for ; Mon, 22 Feb 2016 18:35:47 +0000 (UTC) Received: (qmail 72499 invoked by uid 500); 22 Feb 2016 18:35:47 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 72472 invoked by uid 500); 22 Feb 2016 18:35:47 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 72460 invoked by uid 99); 22 Feb 2016 18:35:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 18:35:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 2A71BC3AD8 for ; Mon, 22 Feb 2016 18:35:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.021 X-Spam-Level: X-Spam-Status: No, score=-4.021 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id tTDrUZzeBHi6 for ; Mon, 22 Feb 2016 18:35:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 7E0985FB6D for ; Mon, 22 Feb 2016 18:35:26 +0000 (UTC) Received: (qmail 67958 invoked by uid 99); 22 Feb 2016 18:35:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 18:35:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1F170E04D9; Mon, 22 Feb 2016 18:35:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: upthewaterspout@apache.org To: commits@geode.incubator.apache.org Date: Mon, 22 Feb 2016 18:36:39 -0000 Message-Id: <55bc521c84e94feb968e88cce170dbb9@git.apache.org> In-Reply-To: <0806417cf36345658593943fc13e8a5d@git.apache.org> References: <0806417cf36345658593943fc13e8a5d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [77/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java ---------------------------------------------------------------------- diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java index 0000000,dd33b15..558ea37 mode 000000,100644..100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java @@@ -1,0 -1,2243 +1,2243 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package com.gemstone.gemfire.internal.cache; + + import java.io.IOException; + import java.util.Arrays; + + import org.apache.logging.log4j.Logger; + + import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE; + import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE; + + import com.gemstone.gemfire.CancelException; + import com.gemstone.gemfire.InvalidDeltaException; + import com.gemstone.gemfire.SystemFailure; + import com.gemstone.gemfire.cache.CacheWriterException; + import com.gemstone.gemfire.cache.EntryEvent; + import com.gemstone.gemfire.cache.EntryNotFoundException; + import com.gemstone.gemfire.cache.Operation; + import com.gemstone.gemfire.cache.TimeoutException; + import com.gemstone.gemfire.cache.query.IndexMaintenanceException; + import com.gemstone.gemfire.cache.query.QueryException; + import com.gemstone.gemfire.cache.query.internal.index.IndexManager; + import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol; + import com.gemstone.gemfire.cache.util.GatewayConflictHelper; + import com.gemstone.gemfire.cache.util.GatewayConflictResolver; + import com.gemstone.gemfire.distributed.internal.DM; + import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; + import com.gemstone.gemfire.internal.Assert; + import com.gemstone.gemfire.internal.ByteArrayDataInput; + import com.gemstone.gemfire.internal.HeapDataOutputStream; + import com.gemstone.gemfire.internal.InternalDataSerializer; + import com.gemstone.gemfire.internal.InternalStatisticsDisabledException; + import com.gemstone.gemfire.internal.Version; + import com.gemstone.gemfire.internal.cache.lru.LRUClockNode; + import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand; + import com.gemstone.gemfire.internal.cache.persistence.DiskStoreID; + import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException; + import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector; + import com.gemstone.gemfire.internal.cache.versions.VersionSource; + import com.gemstone.gemfire.internal.cache.versions.VersionStamp; + import com.gemstone.gemfire.internal.cache.versions.VersionTag; + import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; + import com.gemstone.gemfire.internal.i18n.LocalizedStrings; + import com.gemstone.gemfire.internal.lang.StringUtils; + import com.gemstone.gemfire.internal.logging.LogService; + import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + import com.gemstone.gemfire.internal.logging.log4j.LogMarker; -import com.gemstone.gemfire.internal.offheap.Chunk; -import com.gemstone.gemfire.internal.offheap.ChunkWithHeapForm; -import com.gemstone.gemfire.internal.offheap.GemFireChunk; ++import com.gemstone.gemfire.internal.offheap.ObjectChunk; ++import com.gemstone.gemfire.internal.offheap.ObjectChunkWithHeapForm; ++import com.gemstone.gemfire.internal.offheap.ObjectChunk; + import com.gemstone.gemfire.internal.offheap.MemoryAllocator; + import com.gemstone.gemfire.internal.offheap.OffHeapCachedDeserializable; + import com.gemstone.gemfire.internal.offheap.OffHeapHelper; + import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper; + import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl; + import com.gemstone.gemfire.internal.offheap.StoredObject; + import com.gemstone.gemfire.internal.offheap.annotations.Released; + import com.gemstone.gemfire.internal.offheap.annotations.Retained; + import com.gemstone.gemfire.internal.offheap.annotations.Unretained; + import com.gemstone.gemfire.internal.util.BlobHelper; + import com.gemstone.gemfire.internal.util.Versionable; + import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap; + import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry; + import com.gemstone.gemfire.pdx.PdxInstance; + import com.gemstone.gemfire.pdx.PdxSerializable; + import com.gemstone.gemfire.pdx.PdxSerializationException; + import com.gemstone.gemfire.pdx.PdxSerializer; + import com.gemstone.gemfire.pdx.internal.ConvertableToBytes; + import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl; + + /** + * Abstract implementation class of RegionEntry interface. + * This is the topmost implementation class so common behavior + * lives here. + * + * @since 3.5.1 + * + * @author Darrel Schneider + * @author bruce + * + */ + public abstract class AbstractRegionEntry implements RegionEntry, + HashEntry { + + private static final Logger logger = LogService.getLogger(); + + /** + * Whether to disable last access time update when a put occurs. The default + * is false (enable last access time update on put). To disable it, set the + * 'gemfire.disableAccessTimeUpdateOnPut' system property. + */ + protected static final boolean DISABLE_ACCESS_TIME_UPDATE_ON_PUT = Boolean + .getBoolean("gemfire.disableAccessTimeUpdateOnPut"); + + /* + * Flags for a Region Entry. + * These flags are stored in the msb of the long used to also store the lastModicationTime. + */ + private static final long VALUE_RESULT_OF_SEARCH = 0x01L<<56; + private static final long UPDATE_IN_PROGRESS = 0x02L<<56; + private static final long TOMBSTONE_SCHEDULED = 0x04L<<56; + private static final long LISTENER_INVOCATION_IN_PROGRESS = 0x08L<<56; + /** used for LRUEntry instances. */ + protected static final long RECENTLY_USED = 0x10L<<56; + /** used for LRUEntry instances. */ + protected static final long EVICTED = 0x20L<<56; + /** + * Set if the entry is being used by a transactions. + * Some features (eviction and expiration) will not modify an entry when a tx is using it + * to prevent the tx to fail do to conflict. + */ + protected static final long IN_USE_BY_TX = 0x40L<<56; + + + protected static final long MARKED_FOR_EVICTION = 0x80L<<56; + // public Exception removeTrace; // debugging hot loop in AbstractRegionMap.basicPut() + + protected AbstractRegionEntry(RegionEntryContext context, + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object value) { + + setValue(context,this.prepareValueForCache(context, value, false),false); + // setLastModified(System.currentTimeMillis()); [bruce] this must be set later so we can use ==0 to know this is a new entry in checkForConflicts + } + + ///////////////////////////////////////////////////////////////////// + ////////////////////////// instance methods ///////////////////////// + ///////////////////////////////////////////////////////////////////// + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IMSE_DONT_CATCH_IMSE") + public boolean dispatchListenerEvents(final EntryEventImpl event) throws InterruptedException { + final LocalRegion rgn = event.getRegion(); + + if (event.callbacksInvoked()) { + return true; + } + + // don't wait for certain events to reach the head of the queue before + // dispatching listeners. However, we must not notify the gateways for + // remote-origin ops out of order. Otherwise the other systems will have + // inconsistent content. + + event.setCallbacksInvokedByCurrentThread(); + + if (logger.isDebugEnabled()) { + logger.debug("{} dispatching event {}", this, event); + } + // All the following code that sets "thr" is to workaround + // spurious IllegalMonitorStateExceptions caused by JVM bugs. + try { + // call invokeCallbacks while synced on RegionEntry + event.invokeCallbacks(rgn, event.inhibitCacheListenerNotification(), false); + return true; + + } finally { + if (isRemoved() && !isTombstone() && !event.isEvicted()) { + // Phase 2 of region entry removal is done here. The first phase is done + // by the RegionMap. It is unclear why this code is needed. ARM destroy + // does this also and we are now doing it as phase3 of the ARM destroy. + removePhase2(); + rgn.getRegionMap().removeEntry(event.getKey(), this, true, event, rgn, rgn.getIndexUpdater()); + } + } + } + + public long getLastAccessed() throws InternalStatisticsDisabledException { + throw new InternalStatisticsDisabledException(); + } + + public long getHitCount() throws InternalStatisticsDisabledException { + throw new InternalStatisticsDisabledException(); + } + + public long getMissCount() throws InternalStatisticsDisabledException { + throw new InternalStatisticsDisabledException(); + } + + protected void setLastModified(long lastModified) { + _setLastModified(lastModified); + } + + public void txDidDestroy(long currTime) { + setLastModified(currTime); + } + + public final void updateStatsForPut(long lastModifiedTime) { + setLastModified(lastModifiedTime); + } + + public void setRecentlyUsed() { + // do nothing by default; only needed for LRU + } + + public void updateStatsForGet(boolean hit, long time) { + // nothing needed + } + + public void resetCounts() throws InternalStatisticsDisabledException { + throw new InternalStatisticsDisabledException(); + } + + public void _removePhase1() { + _setValue(Token.REMOVED_PHASE1); + // debugging for 38467 (hot thread in ARM.basicUpdate) + // this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread()); + } + public void removePhase1(LocalRegion r, boolean isClear) throws RegionClearedException { + _removePhase1(); + } + + public void removePhase2() { + _setValue(Token.REMOVED_PHASE2); + // this.removeTrace = new Exception("stack trace for thread " + Thread.currentThread()); + } + + public void makeTombstone(LocalRegion r, VersionTag version) throws RegionClearedException { + assert r.getVersionVector() != null; + assert version != null; + if (r.getServerProxy() == null && + r.getVersionVector().isTombstoneTooOld(version.getMemberID(), version.getRegionVersion())) { + // distributed gc with higher vector version preempts this operation + if (!isTombstone()) { + setValue(r, Token.TOMBSTONE); + r.incTombstoneCount(1); + } + r.getRegionMap().removeTombstone(this, version, false, true); + } else { + if (isTombstone()) { + // unschedule the old tombstone + r.unscheduleTombstone(this); + } + setRecentlyUsed(); + boolean newEntry = (getValueAsToken() == Token.REMOVED_PHASE1); + setValue(r, Token.TOMBSTONE); + r.scheduleTombstone(this, version); + if (newEntry) { + // bug #46631 - entry count is decremented by scheduleTombstone but this is a new entry + r.getCachePerfStats().incEntryCount(1); + } + } + } + + + @Override + public void setValueWithTombstoneCheck(@Unretained Object v, EntryEvent e) throws RegionClearedException { + if (v == Token.TOMBSTONE) { + makeTombstone((LocalRegion)e.getRegion(), ((EntryEventImpl)e).getVersionTag()); + } else { + setValue((LocalRegion)e.getRegion(), v, (EntryEventImpl)e); + } + } + + /** + * Return true if the object is removed. + * + * TODO this method does NOT return true if the object + * is Token.DESTROYED. dispatchListenerEvents relies on that + * fact to avoid removing destroyed tokens from the map. + * We should refactor so that this method calls Token.isRemoved, + * and places that don't want a destroyed Token can explicitly check + * for a DESTROY token. + */ + public final boolean isRemoved() { + Token o = getValueAsToken(); + return (o == Token.REMOVED_PHASE1) || (o == Token.REMOVED_PHASE2) || (o == Token.TOMBSTONE); + } + + public final boolean isDestroyedOrRemoved() { + return Token.isRemoved(getValueAsToken()); + } + + public final boolean isDestroyedOrRemovedButNotTombstone() { + Token o = getValueAsToken(); + return o == Token.DESTROYED || o == Token.REMOVED_PHASE1 || o == Token.REMOVED_PHASE2; + } + + public final boolean isTombstone() { + return getValueAsToken() == Token.TOMBSTONE; + } + + public final boolean isRemovedPhase2() { + return getValueAsToken() == Token.REMOVED_PHASE2; + } + + public boolean fillInValue(LocalRegion region, + @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry dst, + ByteArrayDataInput in, + DM mgr) + { + dst.setSerialized(false); // starting default value + + @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) final Object v; + if (isTombstone()) { + v = Token.TOMBSTONE; + } else { + v = getValue(region); // OFFHEAP: need to incrc, copy bytes, decrc + if (v == null) { + return false; + } + } + + final boolean isEagerDeserialize = dst.isEagerDeserialize(); + if (isEagerDeserialize) { + dst.clearEagerDeserialize(); + } + dst.setLastModified(mgr, getLastModified()); // fix for bug 31059 + if (v == Token.INVALID) { + dst.setInvalid(); + } + else if (v == Token.LOCAL_INVALID) { + dst.setLocalInvalid(); + } + else if (v == Token.TOMBSTONE) { + dst.setTombstone(); + } + else if (v instanceof CachedDeserializable) { + // don't serialize here if it is not already serialized + // if(v instanceof ByteSource && CachedDeserializableFactory.preferObject()) { + // // For SQLFire we prefer eager deserialized + // dst.setEagerDeserialize(); + // } + + if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) { + dst.value = ((StoredObject) v).getDeserializedForReading(); + } else { + /*if (v instanceof ByteSource && CachedDeserializableFactory.preferObject()) { + dst.value = v; + } else */ { + Object tmp = ((CachedDeserializable) v).getValue(); + if (tmp instanceof byte[]) { + byte[] bb = (byte[]) tmp; + dst.value = bb; + } else { + try { + HeapDataOutputStream hdos = new HeapDataOutputStream( + Version.CURRENT); + BlobHelper.serializeTo(tmp, hdos); + hdos.trim(); + dst.value = hdos; + } catch (IOException e) { + RuntimeException e2 = new IllegalArgumentException( + LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING + .toLocalizedString()); + e2.initCause(e); + throw e2; + } + } + dst.setSerialized(true); + } + } + } + else if (v instanceof byte[]) { + dst.value = v; + } + else { + Object preparedValue = v; + if (preparedValue != null) { + preparedValue = prepareValueForGII(preparedValue); + if (preparedValue == null) { + return false; + } + } + if (CachedDeserializableFactory.preferObject()) { + dst.value = preparedValue; + dst.setEagerDeserialize(); + } + else { + try { + HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + BlobHelper.serializeTo(preparedValue, hdos); + hdos.trim(); + dst.value = hdos; + dst.setSerialized(true); + } catch (IOException e) { + RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.AbstractRegionEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString()); + e2.initCause(e); + throw e2; + } + } + } + return true; + } + + /** + * To fix bug 49901 if v is a GatewaySenderEventImpl then make + * a heap copy of it if it is offheap. + * @return the value to provide to the gii request; null if no value should be provided. + */ + public static Object prepareValueForGII(Object v) { + assert v != null; + if (v instanceof GatewaySenderEventImpl) { + return ((GatewaySenderEventImpl) v).makeHeapCopyIfOffHeap(); + } else { + return v; + } + } + + public boolean isOverflowedToDisk(LocalRegion r, DistributedRegion.DiskPosition dp) { + return false; + } + + @Override + public Object getValue(RegionEntryContext context) { + ReferenceCountHelper.createReferenceCountOwner(); + @Retained Object result = _getValueRetain(context, true); + //Asif: If the thread is an Index Creation Thread & the value obtained is + //Token.REMOVED , we can skip synchronization block. This is required to prevent + // the dead lock caused if an Index Update Thread has gone into a wait holding the + // lock of the Entry object. There should not be an issue if the Index creation thread + // gets the temporary value of token.REMOVED as the correct value will get indexed + // by the Index Update Thread , once the index creation thread has exited. + // Part of Bugfix # 33336 + // if ((result == Token.REMOVED_PHASE1 || result == Token.REMOVED_PHASE2) && !r.isIndexCreationThread()) { + // synchronized (this) { + // result = _getValue(); + // } + // } + + if (Token.isRemoved(result)) { + ReferenceCountHelper.setReferenceCountOwner(null); + return null; + } else { + result = OffHeapHelper.copyAndReleaseIfNeeded(result); // sqlf does not dec ref count in this call + ReferenceCountHelper.setReferenceCountOwner(null); + setRecentlyUsed(); + return result; + } + } + + @Override + @Retained + public Object getValueRetain(RegionEntryContext context) { + @Retained Object result = _getValueRetain(context, true); + if (Token.isRemoved(result)) { + return null; + } else { + setRecentlyUsed(); + return result; + } + } + + @Override + @Released + public void setValue(RegionEntryContext context, @Unretained Object value) throws RegionClearedException { + // @todo darrel: This will mark new entries as being recently used + // It might be better to only mark them when they are modified. + // Or should we only mark them on reads? + setValue(context,value,true); + } + + @Override + public void setValue(RegionEntryContext context, Object value, EntryEventImpl event) throws RegionClearedException { + setValue(context,value); + } + + @Released + protected void setValue(RegionEntryContext context, @Unretained Object value, boolean recentlyUsed) { + _setValue(value); + if (value != null && context != null && (this instanceof OffHeapRegionEntry) + && context instanceof LocalRegion && ((LocalRegion)context).isThisRegionBeingClosedOrDestroyed()) { + ((OffHeapRegionEntry)this).release(); + ((LocalRegion)context).checkReadiness(); + } + if (recentlyUsed) { + setRecentlyUsed(); + } + } + + /** + * This method determines if the value is in a compressed representation and decompresses it if it is. + * + * @param context the values context. + * @param value a region entry value. + * + * @return the decompressed form of the value parameter. + */ + static Object decompress(RegionEntryContext context,Object value) { + if(isCompressible(context, value)) { + long time = context.getCachePerfStats().startDecompression(); + value = EntryEventImpl.deserialize(context.getCompressor().decompress((byte[]) value)); + context.getCachePerfStats().endDecompression(time); + } + + return value; + } + + static protected Object compress(RegionEntryContext context,Object value) { + return compress(context, value, null); + } + + /** + * This method determines if the value is compressible and compresses it if it is. + * + * @param context the values context. + * @param value a region entry value. + * + * @return the compressed form of the value parameter. + */ + static protected Object compress(RegionEntryContext context,Object value, EntryEventImpl event) { + if(isCompressible(context, value)) { + long time = context.getCachePerfStats().startCompression(); + byte[] serializedValue; + if (event != null && event.getCachedSerializedNewValue() != null) { + serializedValue = event.getCachedSerializedNewValue(); + if (value instanceof CachedDeserializable) { + CachedDeserializable cd = (CachedDeserializable) value; + if (!(cd.getValue() instanceof byte[])) { + // The cd now has the object form so use the cached serialized form in a new cd. + // This serialization is much cheaper than reserializing the object form. + serializedValue = EntryEventImpl.serialize(CachedDeserializableFactory.create(serializedValue)); + } else { + serializedValue = EntryEventImpl.serialize(cd); + } + } + } else { + serializedValue = EntryEventImpl.serialize(value); + if (event != null && !(value instanceof byte[])) { + // See if we can cache the serialized new value in the event. + // If value is a byte[] then we don't cache it since it is not serialized. + if (value instanceof CachedDeserializable) { + // For a CacheDeserializable we want to only cache the wrapped value; + // not the serialized CacheDeserializable. + CachedDeserializable cd = (CachedDeserializable) value; + Object cdVal = cd.getValue(); + if (cdVal instanceof byte[]) { + event.setCachedSerializedNewValue((byte[])cdVal); + } + } else { + event.setCachedSerializedNewValue(serializedValue); + } + } + } + value = context.getCompressor().compress(serializedValue); + context.getCachePerfStats().endCompression(time, serializedValue.length, ((byte []) value).length); + } + + return value; + } + + private static byte[] compressBytes(RegionEntryContext context, byte[] uncompressedBytes) { + byte[] result = uncompressedBytes; + if (isCompressible(context, uncompressedBytes)) { + long time = context.getCachePerfStats().startCompression(); + result = context.getCompressor().compress(uncompressedBytes); + context.getCachePerfStats().endCompression(time, uncompressedBytes.length, result.length); + } + return result; + } + + + @Retained + public final Object getValueInVM(RegionEntryContext context) { + ReferenceCountHelper.createReferenceCountOwner(); + @Retained Object v = _getValueRetain(context, true); + + if (v == null) { // should only be possible if disk entry + v = Token.NOT_AVAILABLE; + } + @Retained Object result = OffHeapHelper.copyAndReleaseIfNeeded(v); // TODO OFFHEAP keep it offheap? + ReferenceCountHelper.setReferenceCountOwner(null); + return result; + } + + @Retained + public Object getValueInVMOrDiskWithoutFaultIn(LocalRegion owner) { + return getValueInVM(owner); + } + + @Override + @Retained + public Object getValueOffHeapOrDiskWithoutFaultIn(LocalRegion owner) { + @Retained Object result = _getValueRetain(owner, true); + // if (result instanceof ByteSource) { + // // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it + // Object deserVal = ((CachedDeserializable)result).getDeserializedForReading(); + // if (deserVal != result) { + // OffHeapHelper.release(result); + // result = deserVal; + // } + // } + return result; + } + + public Object getValueOnDisk(LocalRegion r) + throws EntryNotFoundException + { + throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString()); + } + + public Object getSerializedValueOnDisk(final LocalRegion r) + throws EntryNotFoundException + { + throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString()); + } + + public Object getValueOnDiskOrBuffer(LocalRegion r) + throws EntryNotFoundException + { + throw new IllegalStateException(LocalizedStrings.AbstractRegionEntry_CANNOT_GET_VALUE_ON_DISK_FOR_A_REGION_THAT_DOES_NOT_ACCESS_THE_DISK.toLocalizedString()); + // @todo darrel if value is Token.REMOVED || Token.DESTROYED throw EntryNotFoundException + } + + public final boolean initialImagePut(final LocalRegion region, + final long lastModifiedTime, + Object newValue, + boolean wasRecovered, + boolean versionTagAccepted) throws RegionClearedException + { + // note that the caller has already write synced this RegionEntry + return initialImageInit(region, lastModifiedTime, newValue, this.isTombstone(), wasRecovered, versionTagAccepted); + } + + public boolean initialImageInit(final LocalRegion region, + final long lastModifiedTime, + final Object newValue, + final boolean create, + final boolean wasRecovered, + final boolean versionTagAccepted) throws RegionClearedException + { + // note that the caller has already write synced this RegionEntry + boolean result = false; + // if it has been destroyed then don't do anything + Token vTok = getValueAsToken(); + if (versionTagAccepted || create || (vTok != Token.DESTROYED || vTok != Token.TOMBSTONE)) { // OFFHEAP noop + Object newValueToWrite = newValue; + boolean putValue = versionTagAccepted || create + || (newValueToWrite != Token.LOCAL_INVALID + && (wasRecovered || (vTok == Token.LOCAL_INVALID))); // OFFHEAP noop + + if (region.isUsedForPartitionedRegionAdmin() && newValueToWrite instanceof CachedDeserializable) { + // Special case for partitioned region meta data + // We do not need the RegionEntry on this case. + // Because the pr meta data region will not have an LRU. + newValueToWrite = ((CachedDeserializable) newValueToWrite).getDeserializedValue(region, null); + if (!create && newValueToWrite instanceof Versionable) { + @Retained @Released final Object oldValue = getValueInVM(region); // Heap value should always be deserialized at this point // OFFHEAP will not be deserialized + try { + // BUGFIX for 35029. If oldValue is null the newValue should be put. + if(oldValue == null) { + putValue = true; + } + else if (oldValue instanceof Versionable) { + Versionable nv = (Versionable) newValueToWrite; + Versionable ov = (Versionable) oldValue; + putValue = nv.isNewerThan(ov); + } + } finally { + OffHeapHelper.release(oldValue); + } + } + } + + if (putValue) { + // change to INVALID if region itself has been invalidated, + // and current value is recovered + if (create || versionTagAccepted) { + // At this point, since we now always recover from disk first, + // we only care about "isCreate" since "isRecovered" is impossible + // if we had a regionInvalidate or regionClear + ImageState imageState = region.getImageState(); + // this method is called during loadSnapshot as well as getInitialImage + if (imageState.getRegionInvalidated()) { + if (newValueToWrite != Token.TOMBSTONE) { + newValueToWrite = Token.INVALID; + } + } + else if (imageState.getClearRegionFlag()) { + boolean entryOK = false; + RegionVersionVector rvv = imageState.getClearRegionVersionVector(); + if (rvv != null) { // a filtered clear + VersionSource id = getVersionStamp().getMemberID(); + if (id == null) { + id = region.getVersionMember(); + } + if (!rvv.contains(id, getVersionStamp().getRegionVersion())) { + entryOK = true; + } + } + if (!entryOK) { + //Asif: If the region has been issued cleared during + // the GII , then those entries loaded before this one would have + // been cleared from the Map due to clear operation & for the + // currententry whose key may have escaped the clearance , will be + // cleansed by the destroy token. + newValueToWrite = Token.DESTROYED; + imageState.addDestroyedEntry(this.getKey()); + throw new RegionClearedException(LocalizedStrings.AbstractRegionEntry_DURING_THE_GII_PUT_OF_ENTRY_THE_REGION_GOT_CLEARED_SO_ABORTING_THE_OPERATION.toLocalizedString()); + } + } + } + setValue(region, this.prepareValueForCache(region, newValueToWrite, false)); + result = true; + + if (newValueToWrite != Token.TOMBSTONE){ + if (create) { + region.getCachePerfStats().incCreates(); + } + region.updateStatsForPut(this, lastModifiedTime, false); + } + + if (logger.isTraceEnabled()) { + if (newValueToWrite instanceof CachedDeserializable) { + logger.trace("ProcessChunk: region={}; put a CachedDeserializable ({},{})", + region.getFullPath(), getKey(),((CachedDeserializable)newValueToWrite).getStringForm()); + } + else { + logger.trace("ProcessChunk: region={}; put({},{})", region.getFullPath(), getKey(), StringUtils.forceToString(newValueToWrite)); + } + } + } + } + return result; + } + + /** + * @throws EntryNotFoundException if expectedOldValue is + * not null and is not equal to current value + */ + @Released + public final boolean destroy(LocalRegion region, + EntryEventImpl event, + boolean inTokenMode, + boolean cacheWrite, + @Unretained Object expectedOldValue, + boolean forceDestroy, + boolean removeRecoveredEntry) + throws CacheWriterException, + EntryNotFoundException, + TimeoutException, + RegionClearedException { + boolean proceed = false; + { + // A design decision was made to not retrieve the old value from the disk + // if the entry has been evicted to only have the CacheListener afterDestroy + // method ignore it. We don't want to pay the performance penalty. The + // getValueInVM method does not retrieve the value from disk if it has been + // evicted. Instead, it uses the NotAvailable token. + // + // If the region is a WAN queue region, the old value is actually used by the + // afterDestroy callback on a secondary. It is not needed on a primary. + // Since the destroy that sets WAN_QUEUE_TOKEN always originates on the primary + // we only pay attention to WAN_QUEUE_TOKEN if the event is originRemote. + // + // :ezoerner:20080814 We also read old value from disk or buffer + // in the case where there is a non-null expectedOldValue + // see PartitionedRegion#remove(Object key, Object value) + ReferenceCountHelper.skipRefCountTracking(); + @Retained @Released Object curValue = _getValueRetain(region, true); + ReferenceCountHelper.unskipRefCountTracking(); + try { + if (curValue == null) curValue = Token.NOT_AVAILABLE; + + if (curValue == Token.NOT_AVAILABLE) { + // In some cases we need to get the current value off of disk. + + // if the event is transmitted during GII and has an old value, it was + // the state of the transmitting cache's entry & should be used here + if (event.getCallbackArgument() != null + && event.getCallbackArgument().equals(RegionQueue.WAN_QUEUE_TOKEN) + && event.isOriginRemote()) { // check originRemote for bug 40508 + //curValue = getValue(region); can cause deadlock if GII is occurring + curValue = getValueOnDiskOrBuffer(region); + } + else { + FilterProfile fp = region.getFilterProfile(); + // rdubey: Old value also required for SqlfIndexManager. + if (fp != null && ((fp.getCqCount() > 0) || expectedOldValue != null + || event.getRegion().getIndexUpdater() != null)) { + //curValue = getValue(region); can cause deadlock will fault in the value + // and will confuse LRU. rdubey. + curValue = getValueOnDiskOrBuffer(region); + } + } + } + + if (expectedOldValue != null) { + if (!checkExpectedOldValue(expectedOldValue, curValue, region)) { + throw new EntryNotFoundException( + LocalizedStrings.AbstractRegionEntry_THE_CURRENT_VALUE_WAS_NOT_EQUAL_TO_EXPECTED_VALUE.toLocalizedString()); + } + } + + if (inTokenMode && event.hasOldValue()) { + proceed = true; + } + else { + proceed = event.setOldValue(curValue, curValue instanceof GatewaySenderEventImpl) || removeRecoveredEntry + || forceDestroy || region.getConcurrencyChecksEnabled() // fix for bug #47868 - create a tombstone + || (event.getOperation() == Operation.REMOVE // fix for bug #42242 + && (curValue == null || curValue == Token.LOCAL_INVALID + || curValue == Token.INVALID)); + } + } finally { + OffHeapHelper.releaseWithNoTracking(curValue); + } + } // end curValue block + + if (proceed) { + //Generate the version tag if needed. This method should only be + //called if we are in fact going to destroy the entry, so it must be + //after the entry not found exception above. + if(!removeRecoveredEntry) { + region.generateAndSetVersionTag(event, this); + } + if (cacheWrite) { + region.cacheWriteBeforeDestroy(event, expectedOldValue); + if (event.getRegion().getServerProxy() != null) { // server will return a version tag + // update version information (may throw ConcurrentCacheModificationException) + VersionStamp stamp = getVersionStamp(); + if (stamp != null) { + stamp.processVersionTag(event); + } + } + } + region.recordEvent(event); + // don't do index maintenance on a destroy if the value in the + // RegionEntry (the old value) is invalid + if (!region.isProxy() && !isInvalid()) { + IndexManager indexManager = region.getIndexManager(); + if (indexManager != null) { + try { + if(isValueNull()) { + @Released Object value = getValueOffHeapOrDiskWithoutFaultIn(region); + try { + _setValue(prepareValueForCache(region, value, false)); + if (value != null && region != null && (this instanceof OffHeapRegionEntry) && region.isThisRegionBeingClosedOrDestroyed()) { + ((OffHeapRegionEntry)this).release(); + region.checkReadiness(); + } + } finally { + OffHeapHelper.release(value); + } + } + indexManager.updateIndexes(this, + IndexManager.REMOVE_ENTRY, + IndexProtocol.OTHER_OP); + } + catch (QueryException e) { + throw new IndexMaintenanceException(e); + } + } + } + + boolean removeEntry = false; + VersionTag v = event.getVersionTag(); + if (region.concurrencyChecksEnabled && !removeRecoveredEntry + && !event.isFromRILocalDestroy()) { // bug #46780, don't retain tombstones for entries destroyed for register-interest + // Destroy will write a tombstone instead + if (v == null || !v.hasValidVersion()) { + // localDestroy and eviction and ops received with no version tag + // should create a tombstone using the existing version stamp, as should + // (bug #45245) responses from servers that do not have valid version information + VersionStamp stamp = this.getVersionStamp(); + if (stamp != null) { // proxy has no stamps + v = stamp.asVersionTag(); + event.setVersionTag(v); + } + } + removeEntry = (v == null) || !v.hasValidVersion(); + } else { + removeEntry = true; + } + + // See #47887, we do not insert a tombstone for evicted HDFS + // entries since the value is still present in HDFS + // Check if we have to evict or just do destroy. + boolean forceRemoveEntry = + (event.isEviction() || event.isExpiration()) + && event.getRegion().isUsedForPartitionedRegionBucket() + && event.getRegion().getPartitionedRegion().isHDFSRegion(); + + if (removeEntry || forceRemoveEntry) { + boolean isThisTombstone = isTombstone(); + if(inTokenMode && !event.getOperation().isEviction()) { + setValue(region, Token.DESTROYED); + } else { + removePhase1(region, false); + } + if (isThisTombstone) { + region.unscheduleTombstone(this); + } + } else { + makeTombstone(region, v); + } + + return true; + } + else { + return false; + } + } + + + + static boolean checkExpectedOldValue(@Unretained Object expectedOldValue, @Unretained Object actualValue, LocalRegion lr) { + if (Token.isInvalid(expectedOldValue)) { + return (actualValue == null) || Token.isInvalid(actualValue); + } else { + boolean isCompressedOffHeap = lr.getAttributes().getOffHeap() && lr.getAttributes().getCompressor() != null; + return checkEquals(expectedOldValue, actualValue, isCompressedOffHeap); + } + } + + private static boolean basicEquals(Object v1, Object v2) { + if (v2 != null) { + if (v2.getClass().isArray()) { + // fix for 52093 + if (v2 instanceof byte[]) { + if (v1 instanceof byte[]) { + return Arrays.equals((byte[])v2, (byte[])v1); + } else { + return false; + } + } else if (v2 instanceof Object[]) { + if (v1 instanceof Object[]) { + return Arrays.deepEquals((Object[])v2, (Object[])v1); + } else { + return false; + } + } else if (v2 instanceof int[]) { + if (v1 instanceof int[]) { + return Arrays.equals((int[])v2, (int[])v1); + } else { + return false; + } + } else if (v2 instanceof long[]) { + if (v1 instanceof long[]) { + return Arrays.equals((long[])v2, (long[])v1); + } else { + return false; + } + } else if (v2 instanceof boolean[]) { + if (v1 instanceof boolean[]) { + return Arrays.equals((boolean[])v2, (boolean[])v1); + } else { + return false; + } + } else if (v2 instanceof short[]) { + if (v1 instanceof short[]) { + return Arrays.equals((short[])v2, (short[])v1); + } else { + return false; + } + } else if (v2 instanceof char[]) { + if (v1 instanceof char[]) { + return Arrays.equals((char[])v2, (char[])v1); + } else { + return false; + } + } else if (v2 instanceof float[]) { + if (v1 instanceof float[]) { + return Arrays.equals((float[])v2, (float[])v1); + } else { + return false; + } + } else if (v2 instanceof double[]) { + if (v1 instanceof double[]) { + return Arrays.equals((double[])v2, (double[])v1); + } else { + return false; + } + } + // fall through and call equals method + } + return v2.equals(v1); + } else { + return v1 == null; + } + } + + static boolean checkEquals(@Unretained Object v1, @Unretained Object v2, boolean isCompressedOffHeap) { + // need to give PdxInstance#equals priority + if (v1 instanceof PdxInstance) { + return checkPdxEquals((PdxInstance)v1, v2); + } else if (v2 instanceof PdxInstance) { + return checkPdxEquals((PdxInstance)v2, v1); + } else if (v1 instanceof OffHeapCachedDeserializable) { + return checkOffHeapEquals((OffHeapCachedDeserializable)v1, v2); + } else if (v2 instanceof OffHeapCachedDeserializable) { + return checkOffHeapEquals((OffHeapCachedDeserializable)v2, v1); + } else if (v1 instanceof CachedDeserializable) { + return checkCDEquals((CachedDeserializable)v1, v2, isCompressedOffHeap); + } else if (v2 instanceof CachedDeserializable) { + return checkCDEquals((CachedDeserializable)v2, v1, isCompressedOffHeap); + } else { + return basicEquals(v1, v2); + } + } + private static boolean checkOffHeapEquals(@Unretained OffHeapCachedDeserializable cd, @Unretained Object obj) { + if (cd.isSerializedPdxInstance()) { + PdxInstance pi = InternalDataSerializer.readPdxInstance(cd.getSerializedValue(), GemFireCacheImpl.getForPdx("Could not check value equality")); + return checkPdxEquals(pi, obj); + } + if (obj instanceof OffHeapCachedDeserializable) { + return cd.checkDataEquals((OffHeapCachedDeserializable)obj); + } else { + byte[] serializedObj; + if (obj instanceof CachedDeserializable) { + if (!cd.isSerialized()) { + if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) { + // both are byte[] + // obj must be DataAsAddress since it was not OffHeapCachedDeserializable + // so its byte[] will be small. + byte[] objBytes = (byte[]) ((StoredObject) obj).getDeserializedForReading(); + return cd.checkDataEquals(objBytes); + } else { + return false; + } + } + serializedObj = ((CachedDeserializable) obj).getSerializedValue(); + } else if (obj instanceof byte[]) { + if (cd.isSerialized()) { + return false; + } + serializedObj = (byte[]) obj; + } else { + if (!cd.isSerialized()) { + return false; + } + if (obj == null || obj == Token.NOT_AVAILABLE + || Token.isInvalidOrRemoved(obj)) { + return false; + } + serializedObj = EntryEventImpl.serialize(obj); + } + return cd.checkDataEquals(serializedObj); + } + } + + private static boolean checkCDEquals(CachedDeserializable cd, Object obj, boolean isCompressedOffHeap) { + if (cd instanceof StoredObject && !((StoredObject) cd).isSerialized()) { + // cd is an actual byte[]. + byte[] ba2; + if (obj instanceof StoredObject) { + if (!((StoredObject) obj).isSerialized()) { + return false; + } + ba2 = (byte[]) ((StoredObject) obj).getDeserializedForReading(); + } else if (obj instanceof byte[]) { + ba2 = (byte[]) obj; + } else { + return false; + } + byte[] ba1 = (byte[]) cd.getDeserializedForReading(); + return Arrays.equals(ba1, ba2); + } + Object cdVal = cd.getValue(); + if (cdVal instanceof byte[]) { + byte[] cdValBytes = (byte[])cdVal; + PdxInstance pi = InternalDataSerializer.readPdxInstance(cdValBytes, GemFireCacheImpl.getForPdx("Could not check value equality")); + if (pi != null) { + return checkPdxEquals(pi, obj); + } + if (isCompressedOffHeap) { // fix for bug 52248 + byte[] serializedObj; + if (obj instanceof CachedDeserializable) { + serializedObj = ((CachedDeserializable) obj).getSerializedValue(); + } else { + serializedObj = EntryEventImpl.serialize(obj); + } + return Arrays.equals(cdValBytes, serializedObj); + } else { + /** + * To be more compatible with previous releases do not compare the serialized forms here. + * Instead deserialize and call the equals method. + */ + Object deserializedObj; + if (obj instanceof CachedDeserializable) { + deserializedObj =((CachedDeserializable) obj).getDeserializedForReading(); + } else { + if (obj == null || obj == Token.NOT_AVAILABLE + || Token.isInvalidOrRemoved(obj)) { + return false; + } + // TODO OPTIMIZE: Before serializing all of obj we could get the top + // level class name of cdVal and compare it to the top level class name of obj. + deserializedObj = obj; + } + return basicEquals(deserializedObj, cd.getDeserializedForReading()); + } + // boolean result = Arrays.equals((byte[])cdVal, serializedObj); + // if (!result) { + // try { + // Object o1 = BlobHelper.deserializeBlob((byte[])cdVal); + // Object o2 = BlobHelper.deserializeBlob(serializedObj); + // SimpleMemoryAllocatorImpl.debugLog("checkCDEquals o1=<" + o1 + "> o2=<" + o2 + ">", false); + // if (o1.equals(o2)) { + // SimpleMemoryAllocatorImpl.debugLog("they are equal! a1=<" + Arrays.toString((byte[])cdVal) + "> a2=<" + Arrays.toString(serializedObj) + ">", false); + // } + // } catch (IOException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } catch (ClassNotFoundException e) { + // // TODO Auto-generated catch block + // e.printStackTrace(); + // } + // } + // return result; + } else { + // prefer object form + if (obj instanceof CachedDeserializable) { + // TODO OPTIMIZE: Before deserializing all of obj we could get the top + // class name of cdVal and the top level class name of obj and compare. + obj = ((CachedDeserializable) obj).getDeserializedForReading(); + } + return basicEquals(cdVal, obj); + } + } + /** + * This method fixes bug 43643 + */ + private static boolean checkPdxEquals(PdxInstance pdx, Object obj) { + if (!(obj instanceof PdxInstance)) { + // obj may be a CachedDeserializable in which case we want to convert it to a PdxInstance even if we are not readSerialized. + if (obj instanceof CachedDeserializable) { + if (obj instanceof StoredObject && !((StoredObject) obj).isSerialized()) { + // obj is actually a byte[] which will never be equal to a PdxInstance + return false; + } + Object cdVal = ((CachedDeserializable) obj).getValue(); + if (cdVal instanceof byte[]) { + byte[] cdValBytes = (byte[]) cdVal; + PdxInstance pi = InternalDataSerializer.readPdxInstance(cdValBytes, GemFireCacheImpl.getForPdx("Could not check value equality")); + if (pi != null) { + return pi.equals(pdx); + } else { + // since obj is serialized as something other than pdx it must not equal our pdx + return false; + } + } else { + // remove the cd wrapper so that obj is the actual value we want to compare. + obj = cdVal; + } + } + if (obj.getClass().getName().equals(pdx.getClassName())) { + GemFireCacheImpl gfc = GemFireCacheImpl.getForPdx("Could not access Pdx registry"); + if (gfc != null) { + PdxSerializer pdxSerializer; + if (obj instanceof PdxSerializable) { + pdxSerializer = null; + } else { + pdxSerializer = gfc.getPdxSerializer(); + } + if (pdxSerializer != null || obj instanceof PdxSerializable) { + // try to convert obj to a PdxInstance + HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + try { + if (InternalDataSerializer.autoSerialized(obj, hdos) || + InternalDataSerializer.writePdx(hdos, gfc, obj, pdxSerializer)) { + PdxInstance pi = InternalDataSerializer.readPdxInstance(hdos.toByteArray(), gfc); + if (pi != null) { + obj = pi; + } + } + } catch (IOException ignore) { + // we are not able to convert it so just fall through + } catch (PdxSerializationException ignore) { + // we are not able to convert it so just fall through + } + } + } + } + } + return basicEquals(obj, pdx); + } + + + ///////////////////////////////////////////////////////////// + /////////////////////////// fields ////////////////////////// + ///////////////////////////////////////////////////////////// + // Do not add any instance fields to this class. + // Instead add them to LeafRegionEntry.cpp + + public static class HashRegionEntryCreator implements + CustomEntryConcurrentHashMap.HashEntryCreator { + + public HashEntry newEntry(final Object key, final int hash, + final HashEntry next, final Object value) { + final AbstractRegionEntry entry = (AbstractRegionEntry)value; + // if hash is already set then assert that the two should be same + final int entryHash = entry.getEntryHash(); + if (hash == 0 || entryHash != 0) { + if (entryHash != hash) { + Assert.fail("unexpected mismatch of hash, expected=" + hash + + ", actual=" + entryHash + " for " + entry); + } + } + entry.setEntryHash(hash); + entry.setNextEntry(next); + return entry; + } + + public int keyHashCode(final Object key, final boolean compareValues) { + return CustomEntryConcurrentHashMap.keyHash(key, compareValues); + } + }; + + public abstract Object getKey(); + + protected static boolean okToStoreOffHeap(Object v, AbstractRegionEntry e) { + if (v == null) return false; + if (Token.isInvalidOrRemoved(v)) return false; + if (v == Token.NOT_AVAILABLE) return false; + if (v instanceof DiskEntry.RecoveredEntry) return false; // The disk layer has special logic that ends up storing the nested value in the RecoveredEntry off heap + if (!(e instanceof OffHeapRegionEntry)) return false; + // TODO should we check for deltas here or is that a user error? + return true; + } + + /** + * Default implementation. Override in subclasses with primitive keys + * to prevent creating an Object form of the key for each equality check. + */ + @Override + public boolean isKeyEqual(Object k) { + return k.equals(getKey()); + } + + private static final long LAST_MODIFIED_MASK = 0x00FFFFFFFFFFFFFFL; + + protected final void _setLastModified(long lastModifiedTime) { + if (lastModifiedTime < 0 || lastModifiedTime > LAST_MODIFIED_MASK) { + throw new IllegalStateException("Expected lastModifiedTime " + lastModifiedTime + " to be >= 0 and <= " + LAST_MODIFIED_MASK); + } + long storedValue; + long newValue; + do { + storedValue = getlastModifiedField(); + newValue = storedValue & ~LAST_MODIFIED_MASK; + newValue |= lastModifiedTime; + } while (!compareAndSetLastModifiedField(storedValue, newValue)); + } + protected abstract long getlastModifiedField(); + protected abstract boolean compareAndSetLastModifiedField(long expectedValue, long newValue); + public final long getLastModified() { + return getlastModifiedField() & LAST_MODIFIED_MASK; + } + protected final boolean areAnyBitsSet(long bitMask) { + return ( getlastModifiedField() & bitMask ) != 0L; + } + /** + * Any bits in "bitMask" that are 1 will be set. + */ + protected final void setBits(long bitMask) { + boolean done = false; + do { + long bits = getlastModifiedField(); + long newBits = bits | bitMask; + if (bits == newBits) return; + done = compareAndSetLastModifiedField(bits, newBits); + } while(!done); + } + /** + * Any bits in "bitMask" that are 0 will be cleared. + */ + protected final void clearBits(long bitMask) { + boolean done = false; + do { + long bits = getlastModifiedField(); + long newBits = bits & bitMask; + if (bits == newBits) return; + done = compareAndSetLastModifiedField(bits, newBits); + } while(!done); + } + + @Override + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) + public Object prepareValueForCache(RegionEntryContext r, + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val, + boolean isEntryUpdate) { + return prepareValueForCache(r, val, null, isEntryUpdate); + } + + @Override + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) + public Object prepareValueForCache(RegionEntryContext r, + @Retained(ABSTRACT_REGION_ENTRY_PREPARE_VALUE_FOR_CACHE) Object val, + EntryEventImpl event, boolean isEntryUpdate) { + if (r != null && r.getOffHeap() && okToStoreOffHeap(val, this)) { + if (val instanceof StoredObject) { + // Check to see if val has the same compression settings as this region. + // The recursive calls in this section are safe because + // we only do it after copy the off-heap value to the heap. + // This is needed to fix bug 52057. + StoredObject soVal = (StoredObject) val; + assert !soVal.isCompressed(); + if (r.getCompressor() != null) { + // val is uncompressed and we need a compressed value. + // So copy the off-heap value to the heap in a form that can be compressed. + byte[] valAsBytes = soVal.getValueAsHeapByteArray(); + Object heapValue; + if (soVal.isSerialized()) { + heapValue = CachedDeserializableFactory.create(valAsBytes); + } else { + heapValue = valAsBytes; + } + return prepareValueForCache(r, heapValue, event, isEntryUpdate); + } - if (val instanceof Chunk) { ++ if (val instanceof ObjectChunk) { + // if the reused guy has a refcount then need to inc it - if (!((Chunk)val).retain()) { ++ if (!((ObjectChunk)val).retain()) { + throw new IllegalStateException("Could not use an off heap value because it was freed"); + } + } + // else it is DataAsAddress. This code just returns it as prepared. + // TODO OFFHEAP: Review the callers to see if they will handle DataAsAddress correctly. + } else { + byte[] data; + boolean isSerialized = !(val instanceof byte[]); + if (isSerialized) { + if (event != null && event.getCachedSerializedNewValue() != null) { + data = event.getCachedSerializedNewValue(); + } else if (val instanceof CachedDeserializable) { + data = ((CachedDeserializable)val).getSerializedValue(); + // TODO OFFHEAP: cache data in event? + } else if (val instanceof PdxInstance) { + try { + data = ((ConvertableToBytes)val).toBytes(); + // TODO OFFHEAP: cache data in event? + } catch (IOException e) { + throw new PdxSerializationException("Could not convert " + val + " to bytes", e); + } + } else { + data = EntryEventImpl.serialize(val); + // TODO OFFHEAP: cache data in event? + } + } else { + data = (byte[]) val; + } + byte[] compressedData = compressBytes(r, data); + boolean isCompressed = compressedData != data; + ReferenceCountHelper.setReferenceCountOwner(this); + MemoryAllocator ma = SimpleMemoryAllocatorImpl.getAllocator(); // fix for bug 47875 - val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed, GemFireChunk.TYPE); // TODO:KIRK:48068 race happens right after this line ++ val = ma.allocateAndInitialize(compressedData, isSerialized, isCompressed); // TODO:KIRK:48068 race happens right after this line + ReferenceCountHelper.setReferenceCountOwner(null); - if (val instanceof GemFireChunk) { - val = new com.gemstone.gemfire.internal.offheap.ChunkWithHeapForm((GemFireChunk)val, data); ++ if (val instanceof ObjectChunk) { ++ val = new ObjectChunkWithHeapForm((ObjectChunk)val, data); + } + // if (val instanceof Chunk && r instanceof LocalRegion) { + // Chunk c = (Chunk) val; + // LocalRegion lr = (LocalRegion) r; + // SimpleMemoryAllocatorImpl.debugLog("allocated @" + Long.toHexString(c.getMemoryAddress()) + " reg=" + lr.getFullPath(), false); + // } + } + return val; + } + @Unretained Object nv = val; + if (nv instanceof StoredObject) { + // This off heap value is being put into a on heap region. + byte[] data = ((StoredObject) nv).getSerializedValue(); + nv = CachedDeserializableFactory.create(data); + } + // don't bother checking for SQLFire + if (!GemFireCacheImpl.sqlfSystem() && nv instanceof PdxInstanceImpl) { + // We do not want to put PDXs in the cache as values. + // So get the serialized bytes and use a CachedDeserializable. + try { + byte[] data = ((ConvertableToBytes)nv).toBytes(); + byte[] compressedData = compressBytes(r, data); + if (data == compressedData) { + nv = CachedDeserializableFactory.create(data); + } else { + nv = compressedData; + } + } catch (IOException e) { + throw new PdxSerializationException("Could not convert " + nv + " to bytes", e); + } + } else { + nv = compress(r, nv, event); + } + return nv; + } + + @Override + @Unretained + public final Object _getValue() { + return getValueField(); + } + + public final boolean isUpdateInProgress() { + return areAnyBitsSet(UPDATE_IN_PROGRESS); + } + + public final void setUpdateInProgress(final boolean underUpdate) { + if (underUpdate) { + setBits(UPDATE_IN_PROGRESS); + } else { + clearBits(~UPDATE_IN_PROGRESS); + } + } + + + public final boolean isCacheListenerInvocationInProgress() { + return areAnyBitsSet(LISTENER_INVOCATION_IN_PROGRESS); + } + + public final void setCacheListenerInvocationInProgress(final boolean listenerInvoked) { + if (listenerInvoked) { + setBits(LISTENER_INVOCATION_IN_PROGRESS); + } else { + clearBits(~LISTENER_INVOCATION_IN_PROGRESS); + } + } + + @Override + public final boolean isInUseByTransaction() { + return areAnyBitsSet(IN_USE_BY_TX); + } + + @Override + public final void setInUseByTransaction(final boolean v) { + if (v) { + setBits(IN_USE_BY_TX); + } else { + clearBits(~IN_USE_BY_TX); + } + } + + @Override + public final synchronized void incRefCount() { + TXManagerImpl.incRefCount(this); + setInUseByTransaction(true); + } + /** + * {@inheritDoc} + */ + @Override + public final boolean isMarkedForEviction() { + return areAnyBitsSet(MARKED_FOR_EVICTION); + } + + /** + * {@inheritDoc} + */ + @Override + public final void setMarkedForEviction() { + setBits(MARKED_FOR_EVICTION); + } + + /** + * {@inheritDoc} + */ + @Override + public final void clearMarkedForEviction() { + clearBits(~MARKED_FOR_EVICTION); + } + + @Override + public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) { + if (TXManagerImpl.decRefCount(this)) { + if (isInUseByTransaction()) { + setInUseByTransaction(false); + if (lruList != null) { + // No more transactions, place in lru list + lruList.appendEntry((LRUClockNode)this); + } + if (lr != null && lr.isEntryExpiryPossible()) { + lr.addExpiryTaskIfAbsent(this); + } + } + } + } + + @Override + public final synchronized void resetRefCount(NewLRUClockHand lruList) { + if (isInUseByTransaction()) { + setInUseByTransaction(false); + if (lruList != null) { + lruList.appendEntry((LRUClockNode)this); + } + } + } + /** + * soubhik: this method is overridden in sqlf flavor of entries. + * Instead of overriding this method; override areSetValue. + */ + protected final void _setValue(Object val) { + setValueField(val); + } + + @Override + public Token getValueAsToken() { + Object v = getValueField(); + if (v == null || v instanceof Token) { + return (Token)v; + } else { + return Token.NOT_A_TOKEN; + } + } + + /** + * Reads the value of this region entry. + * Provides low level access to the value field. + * @return possible OFF_HEAP_OBJECT (caller uses region entry reference) + */ + @Unretained + protected abstract Object getValueField(); + /** + * Set the value of this region entry. + * Provides low level access to the value field. + * @param v the new value to set + */ + protected abstract void setValueField(@Unretained Object v); + + @Retained + public Object getTransformedValue() { + return _getValueRetain(null, false); + } + + public final boolean getValueWasResultOfSearch() { + return areAnyBitsSet(VALUE_RESULT_OF_SEARCH); + } + + public final void setValueResultOfSearch(boolean v) { + if (v) { + setBits(VALUE_RESULT_OF_SEARCH); + } else { + clearBits(~VALUE_RESULT_OF_SEARCH); + } + } + + public boolean hasValidVersion() { + VersionStamp stamp = (VersionStamp)this; + boolean has = stamp.getRegionVersion() != 0 || stamp.getEntryVersion() != 0; + return has; + } + + public boolean hasStats() { + // override this in implementations that have stats + return false; + } + + /** + * @see HashEntry#getMapValue() + */ + public final Object getMapValue() { + return this; + } + + /** + * @see HashEntry#setMapValue(Object) + */ + public final void setMapValue(final Object newValue) { + if (this != newValue) { + Assert.fail("AbstractRegionEntry#setMapValue: unexpected setMapValue " + + "with newValue=" + newValue + ", this=" + this); + } + } + + protected abstract void setEntryHash(int v); + + @Override + public final String toString() { + final StringBuilder sb = new StringBuilder(this.getClass().getSimpleName()) + .append('@').append(Integer.toHexString(System.identityHashCode(this))) + .append(" ("); + return appendFieldsToString(sb).append(')').toString(); + } + + protected StringBuilder appendFieldsToString(final StringBuilder sb) { + sb.append("key=").append(getKey()).append("; rawValue=") + .append(_getValue()); // OFFHEAP _getValue ok: the current toString on OffHeapCachedDeserializable is safe to use without incing refcount. + VersionStamp stamp = getVersionStamp(); + if (stamp != null) { + sb.append("; version=").append(stamp.asVersionTag()+";member="+stamp.getMemberID()); + } + return sb; + } + + /* + * (non-Javadoc) + * This generates version tags for outgoing messages for all subclasses + * supporting concurrency versioning. It also sets the entry's version + * stamp to the tag's values. + * + * @see com.gemstone.gemfire.internal.cache.RegionEntry#generateVersionTag(com.gemstone.gemfire.distributed.DistributedMember, boolean) + */ + public VersionTag generateVersionTag(VersionSource mbr, boolean withDelta, LocalRegion region, EntryEventImpl event) { + VersionStamp stamp = this.getVersionStamp(); + if (stamp != null && region.getServerProxy() == null) { // clients do not generate versions + int v = stamp.getEntryVersion()+1; + if (v > 0xFFFFFF) { + v -= 0x1000000; // roll-over + } + VersionSource previous = stamp.getMemberID(); + + + //For non persistent regions, we allow the member to be null and + //when we send a message and the remote side can determine the member + //from the sender. For persistent regions, we need to send + //the persistent id to the remote side. + // + //TODO - RVV - optimize the way we send the persistent id to save + //space. + if(mbr == null) { + VersionSource regionMember = region.getVersionMember(); + if(regionMember instanceof DiskStoreID) { + mbr = regionMember; + } + } + + VersionTag tag = VersionTag.create(mbr); + tag.setEntryVersion(v); + if (region.getVersionVector() != null) { + // Use region version if already provided, else generate + long nextRegionVersion = event.getNextRegionVersion(); + if (nextRegionVersion != -1) { + // Set on the tag and record it locally + tag.setRegionVersion(nextRegionVersion); + RegionVersionVector rvv = region.getVersionVector(); + rvv.recordVersion(rvv.getOwnerId(),nextRegionVersion); + if (logger.isDebugEnabled()) { + logger.debug("recorded region version {}; region={}", nextRegionVersion, region.getFullPath()); + } + } else { + tag.setRegionVersion(region.getVersionVector().getNextVersion()); + } + } + if (withDelta) { + tag.setPreviousMemberID(previous); + } + VersionTag remoteTag = event.getVersionTag(); + if (remoteTag != null && remoteTag.isGatewayTag()) { + // if this event was received from a gateway we use the remote system's + // timestamp and dsid. + tag.setVersionTimeStamp(remoteTag.getVersionTimeStamp()); + tag.setDistributedSystemId(remoteTag.getDistributedSystemId()); + tag.setAllowedByResolver(remoteTag.isAllowedByResolver()); + } else { + long time = region.cacheTimeMillis(); + int dsid = region.getDistributionManager().getDistributedSystemId(); + // a locally generated change should always have a later timestamp than + // one received from a wan gateway, so fake a timestamp if necessary + if (time <= stamp.getVersionTimeStamp() && dsid != tag.getDistributedSystemId()) { + time = stamp.getVersionTimeStamp() + 1; + } + tag.setVersionTimeStamp(time); + tag.setDistributedSystemId(dsid); + } + stamp.setVersions(tag); + stamp.setMemberID(mbr); + event.setVersionTag(tag); + if (logger.isDebugEnabled()) { + logger.debug("generated tag {}; key={}; oldvalue={} newvalue={} client={} region={}; rvv={}", tag, + event.getKey(), event.getOldValueStringForm(), event.getNewValueStringForm(), + (event.getContext() == null? "none" : event.getContext().getDistributedMember().getName()), + region.getFullPath(), region.getVersionVector()); + } + return tag; + } + return null; + } + + /** set/unset the flag noting that a tombstone has been scheduled for this entry */ + public void setTombstoneScheduled(boolean scheduled) { + if (scheduled) { + setBits(TOMBSTONE_SCHEDULED); + } else { + clearBits(~TOMBSTONE_SCHEDULED); + } + } + + /** + * return the flag noting whether a tombstone has been scheduled for this entry. This should + * be called under synchronization on the region entry if you want an accurate result. + */ + public boolean isTombstoneScheduled() { + return areAnyBitsSet(TOMBSTONE_SCHEDULED); + } + + /* + * (non-Javadoc) + * This performs a concurrency check. + * + * This check compares the version number first, followed by the member ID. + * + * Wraparound of the version number is detected and handled by extending the + * range of versions by one bit. + * + * The normal membership ID comparison method is used.

+ * + * Note that a tag from a remote (WAN) system may be in the event. If this + * is the case this method will either invoke a user plugin that allows/disallows + * the event (and may modify the value) or it determines whether to allow + * or disallow the event based on timestamps and distributedSystemIDs. + * + * @throws ConcurrentCacheModificationException if the event conflicts with + * an event that has already been applied to the entry. + * + * @see com.gemstone.gemfire.internal.cache.RegionEntry#concurrencyCheck(com.gemstone.gemfire.cache.EntryEvent) + */ + public void processVersionTag(EntryEvent cacheEvent) { + processVersionTag(cacheEvent, true); + } + + + protected void processVersionTag(EntryEvent cacheEvent, boolean conflictCheck) { + EntryEventImpl event = (EntryEventImpl)cacheEvent; + VersionTag tag = event.getVersionTag(); + if (tag == null) { + return; + } + + try { + if (tag.isGatewayTag()) { + // this may throw ConcurrentCacheModificationException or modify the event + if (processGatewayTag(cacheEvent)) { + return; + } + assert false : "processGatewayTag failure - returned false"; + } + + if (!tag.isFromOtherMember()) { + if (!event.getOperation().isNetSearch()) { + // except for netsearch, all locally-generated tags can be ignored + return; + } + } + + final InternalDistributedMember originator = (InternalDistributedMember)event.getDistributedMember(); + final VersionSource dmId = event.getRegion().getVersionMember(); + LocalRegion r = event.getLocalRegion(); + boolean eventHasDelta = event.getDeltaBytes() != null && event.getRawNewValue() == null; + + VersionStamp stamp = getVersionStamp(); + // bug #46223, an event received from a peer or a server may be from a different + // distributed system than the last modification made to this entry so we must + // perform a gateway conflict check + if (stamp != null && !tag.isAllowedByResolver()) { + int stampDsId = stamp.getDistributedSystemId(); + int tagDsId = tag.getDistributedSystemId(); + + if (stampDsId != 0 && stampDsId != tagDsId && stampDsId != -1) { + StringBuilder verbose = null; + if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { + verbose = new StringBuilder(); + verbose.append("processing tag for key " + getKey() + ", stamp=" + stamp.asVersionTag() + ", tag=").append(tag); + } + long stampTime = stamp.getVersionTimeStamp(); + long tagTime = tag.getVersionTimeStamp(); + if (stampTime > 0 && (tagTime > stampTime + || (tagTime == stampTime && tag.getDistributedSystemId() >= stamp.getDistributedSystemId()))) { + if (verbose != null) { + verbose.append(" - allowing event"); + logger.trace(LogMarker.TOMBSTONE, verbose); + } + // Update the stamp with event's version information. + applyVersionTag(r, stamp, tag, originator); + return; + } + + if (stampTime > 0) { + if (verbose != null) { + verbose.append(" - disallowing event"); + logger.trace(LogMarker.TOMBSTONE, verbose); + } + r.getCachePerfStats().incConflatedEventsCount(); + persistConflictingTag(r, tag); + throw new ConcurrentCacheModificationException("conflicting event detected"); + } + } + } + + if (r.getVersionVector() != null && + r.getServerProxy() == null && + (r.getDataPolicy().withPersistence() || + !r.getScope().isLocal())) { // bug #45258 - perf degradation for local regions and RVV + VersionSource who = tag.getMemberID(); + if (who == null) { + who = originator; + } + r.getVersionVector().recordVersion(who, tag); + } + + assert !tag.isFromOtherMember() || tag.getMemberID() != null : "remote tag is missing memberID"; + + + // [bruce] for a long time I had conflict checks turned off in clients when + // receiving a response from a server and applying it to the cache. This lowered + // the CPU cost of versioning but eventually had to be pulled for bug #45453 + // if (r.getServerProxy() != null && conflictCheck) { + // // events coming from servers while a local sync is held on the entry + // // do not require a conflict check. Conflict checks were already + // // performed on the server and here we just consume whatever was sent back. + // // Event.isFromServer() returns true for client-update messages and + // // for putAll/getAll, which do not hold syncs during the server operation. + // conflictCheck = event.isFromServer(); + // } + // else + + // [bruce] for a very long time we had conflict checks turned off for PR buckets. + // Bug 45669 showed a primary dying in the middle of distribution. This caused + // one backup bucket to have a v2. The other bucket was promoted to primary and + // generated a conflicting v2. We need to do the check so that if this second + // v2 loses to the original one in the delta-GII operation that the original v2 + // will be the winner in both buckets. + // if (r.isUsedForPartitionedRegionBucket()) { + // conflictCheck = false; // primary/secondary model + // } + + // The new value in event is not from GII, even it could be tombstone + basicProcessVersionTag(r, tag, false, eventHasDelta, dmId, originator, conflictCheck); + } catch (ConcurrentCacheModificationException ex) { + event.isConcurrencyConflict(true); + throw ex; + } + } + + protected final void basicProcessVersionTag(LocalRegion region, VersionTag tag, boolean isTombstoneFromGII, + boolean deltaCheck, VersionSource dmId, InternalDistributedMember sender, boolean checkForConflict) { + + StringBuilder verbose = null; + + if (tag != null) { + VersionStamp stamp = getVersionStamp(); + + if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) { + VersionTag stampTag = stamp.asVersionTag(); + if (stampTag.hasValidVersion() && checkForConflict) { // only be verbose here if there's a possibility we might reject the operation + verbose = new StringBuilder(); + verbose.append("processing tag for key " + getKey() + ", stamp=" + stamp.asVersionTag() + ", tag=").append(tag) + .append(", checkForConflict=").append(checkForConflict); //.append(", current value=").append(_getValue()); + } + } + + if (stamp == null) { + throw new IllegalStateException("message contained a version tag but this region has no version storage"); + } + + boolean apply = true; + + try { + if (checkForConflict) { + apply = checkForConflict(region, stamp, tag, isTombstoneFromGII, deltaCheck, dmId, sender, verbose); + } + } catch (ConcurrentCacheModificationException e) { + // Even if we don't apply the operation we should always retain the + // highest timestamp in order for WAN conflict checks to work correctly + // because the operation may have been sent to other systems and been + // applied there + if (!tag.isGatewayTag() + && stamp.getDistributedSystemId() == tag.getDistributedSystemId() + && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) { + stamp.setVersionTimeStamp(tag.getVersionTimeStamp()); + tag.setTimeStampApplied(true); + if (verbose != null) { + verbose.append("\nThough in conflict the tag timestamp was more recent and was recorded."); + } + } + throw e; + } finally { + if (verbose != null) { + logger.trace(LogMarker.TOMBSTONE, verbose); + } + } + + if (apply) { + applyVersionTag(region, stamp, tag, sender); + } + } + } + + + private void applyVersionTag(LocalRegion region, VersionStamp stamp, VersionTag tag, InternalDistributedMember sender) { + // stamp.setPreviousMemberID(stamp.getMemberID()); + VersionSource mbr = tag.getMemberID(); + if (mbr == null) { + mbr = sender; + } + mbr = region.getVersionVector().getCanonicalId(mbr); + tag.setMemberID(mbr); + stamp.setVersions(tag); + if (tag.hasPreviousMemberID()) { + if (tag.getPreviousMemberID() == null) { + tag.setPreviousMemberID(stamp.getMemberID()); + } else { + tag.setPreviousMemberID(region.getVersionVector().getCanonicalId( + tag.getPreviousMemberID())); + } + } + } + + /** perform conflict checking for a stamp/tag */ + protected boolean checkForConflict(LocalRegion region, + VersionStamp stamp, VersionTag tag, + boolean isTombstoneFromGII, + boolean deltaCheck, VersionSource dmId, + InternalDistributedMember sender, StringBuilder verbose) { + + int stampVersion = stamp.getEntryVersion(); + int tagVersion = tag.getEntryVersion(); + + boolean throwex = false; + boolean apply = false; + + if (stamp.getVersionTimeStamp() != 0) { // new entries have no timestamp + // check for wrap-around on the version number + long difference = tagVersion - stampVersion; + if (0x10000 < difference || difference < -0x10000) { + if (verbose != null) { + verbose.append("\nversion rollover detected: tag="+tagVersion + " stamp=" + stampVersion); + } + if (difference < 0) { + tagVersion += 0x1000000L; + } else { + stampVersion += 0x1000000L; + } + } + } + if (verbose != null) { + verbose.append("\nstamp=v").append(stampVersion) + .append(" tag=v").append(tagVersion); + } + + if (deltaCheck) { + checkForDeltaConflict(region, stampVersion, tagVersion, stamp, tag, dmId, sender, verbose); + } + + if (stampVersion == 0 || stampVersion < tagVersion) { + if (verbose != null) { verbose.append(" - applying change"); } + apply = true; + } else if (stampVersion > tagVersion) { + if (overwritingOldTombstone(region, stamp, tag, verbose) && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) { + apply = true; + } else { + // check for an incoming expired tombstone from an initial image chunk. + if (tagVersion > 0 + && isExpiredTombstone(region, tag.getVersionTimeStamp(), isTombstoneFromGII) + && tag.getVersionTimeStamp() > stamp.getVersionTimeStamp()) { + // A special case to apply: when remote entry is expired tombstone, then let local vs remote with newer timestamp to win + if (verbose != null) { verbose.append(" - applying change in Delta GII"); } + apply = true; + } else { + if (verbose != null) { verbose.append(" - disallowing"); } + throwex= true; + } + } + } else { + if (overwritingOldTombstone(region, stamp, tag, verbose)) { + apply = true; + } else { + // compare member IDs + VersionSource stampID = stamp.getMemberID(); + if (stampID == null) { + stampID = dmId; + } + VersionSource tagID = tag.getMemberID(); + if (tagID == null) { + tagID = sender; + } + if (verbose != null) { verbose.append("\ncomparing IDs"); } + int compare = stampID.compareTo(tagID); + if (compare < 0) { + if (verbose != null) { verbose.append(" - applying change"); } + apply = true; + } else if (compare > 0) { + if (verbose != null) { verbose.append(" - disallowing"); } + throwex = true; + } else if (tag.isPosDup()) { + if (verbose != null) { verbose.append(" - disallowing duplicate marked with posdup"); } + throwex = true; + } else /* if (isTombstoneFromGII && isTombstone()) { + if (verbose != null) { verbose.append(" - disallowing duplicate tombstone from GII"); } + return false; // bug #49601 don't schedule tombstones from GII if there's already one here + } else */ { + if (verbose != null) { verbose.append(" - allowing duplicate"); } + } + } + } + + if (!apply && throwex) { + region.getCachePerfStats().incConflatedEventsCount(); + persistConflictingTag(region, tag); + throw new ConcurrentCacheModificationException(); + } + + return apply; + } + + private boolean isExpiredTombstone(LocalRegion region, long timestamp, boolean isTombstone) { + return isTombstone && (timestamp + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT) <= region.cacheTimeMillis(); + } + + private boolean overwritingOldTombstone(LocalRegion region, VersionStamp stamp, VersionTag tag, StringBuilder verbose) { + // Tombstone GC does not use locking to stop operations when old tombstones + // are being removed. Because of this we might get an operation that was applied + // in another VM that has just reaped a tombstone and is now using a reset + // entry version number. Because of this we check the timestamp on the current + // local entry and see if it is old enough to have expired. If this is the case + // we accept the change and allow the tag to be recorded + long stampTime = stamp.getVersionTimeStamp(); + if (isExpiredTombstone(region, stampTime, this.isTombstone())) { + // no local change since the tombstone would have timed out - accept the change + if (verbose != null) { verbose.append(" - accepting because local timestamp is old"); } + return true; + } else { + return false; + } + } + + protected void persistConflictingTag(LocalRegion region, VersionTag tag) { + // only persist region needs to persist conflict tag + } + + /** + * for an event containing a delta we must check to see if the tag's + * previous member id is the stamp's member id and ensure that the + * version is only incremented by 1. Otherwise the delta is being + * applied to a value that does not match the source of the delta. + * + * @throws InvalidDeltaException + */ + private void checkForDeltaConflict(LocalRegion region, + long stampVersion, long tagVersion, + VersionStamp stamp, VersionTag tag, + VersionSource dmId, InternalDistributedMember sender, + StringBuilder verbose) { + + if (tagVersion != stampVersion+1) { + if (verbose != null) { + verbose.append("\ndelta requires full value due to version mismatch"); + } + region.getCachePerfStats().incDeltaFailedUpdates(); + throw new InvalidDeltaException("delta cannot be applied due to version mismatch"); + + } else { + // make sure the tag was based on the value in this entry by checking the + // tag's previous-changer ID against this stamp's current ID + VersionSource stampID = stamp.getMemberID(); + if (stampID == null) { + stampID = dmId; + } + VersionSource tagID = tag.getPreviousMemberID(); + if (tagID == null) { + tagID = sender; + } + if (!tagID.equals(stampID)) { + if (verbose != null) { + verbose.append("\ndelta requires full value. tag.previous=") + .append(tagID).append(" but stamp.current=").append(stampID); + } + region.getCachePerfStats().incDeltaFailedUpdates(); + throw new InvalidDeltaException("delta cannot be applied due to version ID mismatch"); + } + } + } + + private boolean processGatewayTag(EntryEvent cacheEvent) { + // Gateway tags are installed in the server-side LocalRegion cache + // modification methods. They do not have version numbers or distributed + // member IDs. Instead they only have timestamps and distributed system IDs. + + // If there is a resolver plug-in, invoke it. Otherwise we use the timestamps and + // distributed system IDs to determine whether to allow the event to proceed. + + final boolean isDebugEnabled = logger.isDebugEnabled(); + + if (this.isRemoved() && !this.isTombstone()) { + return true; // no conflict on a new entry + } + EntryEventImpl event = (EntryEventImpl)cacheEvent; + VersionTag tag = event.getVersionTag(); + long stampTime = getVersionStamp().getVersionTimeStamp(); + long tagTime = tag.getVersionTimeStamp(); + int stampDsid = getVersionStamp().getDistributedSystemId(); + int tagDsid = tag.getDistributedSystemId(); + if (isDebugEnabled) { + logger.debug("processing gateway version information for {}. Stamp dsid={} time={} Tag dsid={} time={}", + event.getKey(), stampDsid, stampTime, tagDsid, tagTime); + } + if (tagTime == VersionTag.ILLEGAL_VERSION_TIMESTAMP) { + return true; // no timestamp received from other system - just apply it + } + if (tagDsid == stampDsid || stampDsid == -1) { + return true; + } + GatewayConflictResolver resolver = event.getRegion().getCache().getGatewayConflictResolver(); + if (resolver != null) { + if (isDebugEnabled) { + logger.debug("invoking gateway conflict resolver"); + } + final boolean[] disallow = new boolean[1]; + final Object[] newValue = new Object[] { this }; + GatewayConflictHelper helper = new GatewayConflictHelper() { + @Override + public void disallowEvent() { + disallow[0] = true; + } + + @Override + public void changeEventValue(Object v) { + newValue[0] = v; + } + }; + TimestampedEntryEventImpl timestampedEvent = + (TimestampedEntryEventImpl)event.getTimestampedEvent(tagDsid, stampDsid, tagTime, stampTime); + + // gateway conflict resolvers will usually want to see the old value + if (!time