From commits-return-119633-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Tue Aug 14 14:02:13 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0C2EC18067A for ; Tue, 14 Aug 2018 14:02:10 +0200 (CEST) Received: (qmail 75223 invoked by uid 500); 14 Aug 2018 12:02:10 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 75213 invoked by uid 99); 14 Aug 2018 12:02:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Aug 2018 12:02:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8AF73DFF92; Tue, 14 Aug 2018 12:02:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Tue, 14 Aug 2018 12:02:10 -0000 Message-Id: In-Reply-To: <9089a6b068174a97b528b24d9f877436@git.apache.org> References: <9089a6b068174a97b528b24d9f877436@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/10] ignite git commit: Update code to use StripedCompositeReadWriteLock Merge remote-tracking branch 'apache-ignite/master' into IGNITE-5960 http://git-wip-us.apache.org/repos/asf/ignite/blob/2021942c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ab1fff6,767c314..2e6dddd --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -24,6 -24,7 +24,8 @@@ import java.util.List import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; ++import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; @@@ -63,6 -66,6 +67,7 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter; import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.util.IgniteTree; ++import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@@ -154,6 -157,10 +159,13 @@@ public abstract class GridCacheMapEntr @GridToStringInclude private GridCacheEntryExtras extras; + /** */ + @GridToStringExclude - private final ReentrantLock lock = new ReentrantLock(); ++ private final ReentrantLock lock = new ReentrantLock(); ++ ++ /** Read Lock for continuous query listener */ ++ private final Lock listenerLock; + /** * Flags: *
    @@@ -182,6 -189,6 +194,7 @@@ this.key = key; this.hash = key.hashCode(); this.cctx = cctx; ++ this.listenerLock = cctx.continuousQueries().getListenerReadLock(); ver = cctx.versions().next(); @@@ -910,137 -960,142 +966,144 @@@ ensureFreeSpace(); - cctx.continuousQueries().getListenerReadLock().lock(); ++ lockListenerReadLock(); + lockEntry(); + try { - synchronized (this) { - checkObsolete(); + checkObsolete(); - if (isNear()) { - assert dhtVer != null; + if (isNear()) { + assert dhtVer != null; - // It is possible that 'get' could load more recent value. - if (!((GridNearCacheEntry) this).recordDhtVersion(dhtVer)) - return new GridCacheUpdateTxResult(false, null); - } + // It is possible that 'get' could load more recent value. + if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer)) + return new GridCacheUpdateTxResult(false, null, logPtr); + } - assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : - "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']'; + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : + "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']'; - // Load and remove from swap if it is new. - boolean startVer = isStartVersion(); + // Load and remove from swap if it is new. + boolean startVer = isStartVersion(); - boolean internal = isInternal() || !context().userCache(); + boolean internal = isInternal() || !context().userCache(); - Map lsnrCol = - notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; + Map lsnrCol = + notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; - if (startVer && (retval || intercept || lsnrCol != null)) - unswap(retval); + if (startVer && (retval || intercept || lsnrCol != null)) + unswap(retval); - newVer = explicitVer != null ? explicitVer : tx == null ? - nextVersion() : tx.writeVersion(); + newVer = explicitVer != null ? explicitVer : tx == null ? + nextVersion() : tx.writeVersion(); - assert newVer != null : "Failed to get write version for tx: " + tx; + assert newVer != null : "Failed to get write version for tx: " + tx; - old = oldValPresent ? oldVal : this.val; + old = oldValPresent ? oldVal : this.val; - if (intercept) { - val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false); + if (intercept) { + val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false); - CacheLazyEntry e = new CacheLazyEntry(cctx, key, old, keepBinary); + CacheLazyEntry e = new CacheLazyEntry(cctx, key, old, keepBinary); - Object interceptorVal = cctx.config().getInterceptor().onBeforePut( - new CacheLazyEntry(cctx, key, old, keepBinary), - val0); + Object interceptorVal = cctx.config().getInterceptor().onBeforePut( + new CacheLazyEntry(cctx, key, old, keepBinary), + val0); - key0 = e.key(); + key0 = e.key(); - if (interceptorVal == null) - return new GridCacheUpdateTxResult(false, (CacheObject) cctx.unwrapTemporary(old)); - else if (interceptorVal != val0) - val0 = cctx.unwrapTemporary(interceptorVal); + if (interceptorVal == null) + return new GridCacheUpdateTxResult(false, (CacheObject)cctx.unwrapTemporary(old), logPtr); + else if (interceptorVal != val0) + val0 = cctx.unwrapTemporary(interceptorVal); - val = cctx.toCacheObject(val0); - } + val = cctx.toCacheObject(val0); + } - // Determine new ttl and expire time. - long expireTime; + // Determine new ttl and expire time. + long expireTime; - if (drExpireTime >= 0) { - assert ttl >= 0 : ttl; + if (drExpireTime >= 0) { + assert ttl >= 0 : ttl; - expireTime = drExpireTime; - } else { - if (ttl == -1L) { - ttl = ttlExtras(); - expireTime = expireTimeExtras(); - } else - expireTime = CU.toExpireTime(ttl); + expireTime = drExpireTime; + } + else { + if (ttl == -1L) { + ttl = ttlExtras(); + expireTime = expireTimeExtras(); } + else + expireTime = CU.toExpireTime(ttl); + } - assert ttl >= 0 : ttl; - assert expireTime >= 0 : expireTime; + assert ttl >= 0 : ttl; + assert expireTime >= 0 : expireTime; - // Detach value before index update. - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); + // Detach value before index update. + val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - assert val != null; + assert val != null; - storeValue(val, expireTime, newVer, null); + storeValue(val, expireTime, newVer, null); - if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) - deletedUnlocked(false); + if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) + deletedUnlocked(false); - updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr); + updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr); - if (updateCntr != null && updateCntr != 0) - updateCntr0 = updateCntr; + if (updateCntr != null && updateCntr != 0) + updateCntr0 = updateCntr; - update(val, expireTime, ttl, newVer, true); + if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled()) + logPtr = logTxUpdate(tx, val, expireTime, updateCntr0); - drReplicate(drType, val, newVer, topVer); + update(val, expireTime, ttl, newVer, true); - recordNodeId(affNodeId, topVer); + drReplicate(drType, val, newVer, topVer); - if (metrics && cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onWrite(); + recordNodeId(affNodeId, topVer); - if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { - CacheObject evtOld = cctx.unwrapTemporary(old); + if (metrics && cctx.statisticsEnabled()) + cctx.cache().metrics0().onWrite(); - cctx.events().addEvent(partition(), - key, - evtNodeId, - tx == null ? null : tx.xid(), - newVer, - EVT_CACHE_OBJECT_PUT, - val, - val != null, - evtOld, - evtOld != null || hasValueUnlocked(), - subjId, null, taskName, - keepBinary); - } + if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + CacheObject evtOld = cctx.unwrapTemporary(old); - if (lsnrCol != null) { - cctx.continuousQueries().onEntryUpdated( - lsnrCol, - key, - val, - old, - internal, - partition(), - tx.local(), - false, - updateCntr0, - null, - topVer); - } + cctx.events().addEvent(partition(), + key, + evtNodeId, + tx == null ? null : tx.xid(), + newVer, + EVT_CACHE_OBJECT_PUT, + val, + val != null, + evtOld, + evtOld != null || hasValueUnlocked(), + subjId, null, taskName, + keepBinary); + } - cctx.dataStructures().onEntryUpdated(key, false, keepBinary); + if (lsnrCol != null) { + cctx.continuousQueries().onEntryUpdated( + lsnrCol, + key, + val, + old, + internal, + partition(), + tx.local(), + false, + updateCntr0, + null, + topVer); } - } finally { - cctx.continuousQueries().getListenerReadLock().unlock(); + + cctx.dataStructures().onEntryUpdated(key, false, keepBinary); + } + finally { + unlockEntry(); ++ unlockListenerReadLock(); } onUpdateFinished(updateCntr0); @@@ -1114,144 -1171,148 +1179,150 @@@ boolean marked = false; - cctx.continuousQueries().getListenerReadLock().lock(); ++ lockListenerReadLock(); + lockEntry(); + try { - synchronized (this) { - checkObsolete(); + checkObsolete(); - if (isNear()) { - assert dhtVer != null; + if (isNear()) { + assert dhtVer != null; - // It is possible that 'get' could load more recent value. - if (!((GridNearCacheEntry) this).recordDhtVersion(dhtVer)) - return new GridCacheUpdateTxResult(false, null); - } + // It is possible that 'get' could load more recent value. + if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer)) + return new GridCacheUpdateTxResult(false, null, logPtr); + } - assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : - "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; + assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : + "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; - boolean startVer = isStartVersion(); + boolean startVer = isStartVersion(); - newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); + newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); - boolean internal = isInternal() || !context().userCache(); + boolean internal = isInternal() || !context().userCache(); - Map lsnrCol = - notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; + Map lsnrCol = + notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; - if (startVer && (retval || intercept || lsnrCol != null)) - unswap(); + if (startVer && (retval || intercept || lsnrCol != null)) + unswap(); - old = oldValPresent ? oldVal : val; + old = oldValPresent ? oldVal : val; - if (intercept) { - entry0 = new CacheLazyEntry(cctx, key, old, keepBinary); + if (intercept) { + entry0 = new CacheLazyEntry(cctx, key, old, keepBinary); - interceptRes = cctx.config().getInterceptor().onBeforeRemove(entry0); + interceptRes = cctx.config().getInterceptor().onBeforeRemove(entry0); - if (cctx.cancelRemove(interceptRes)) { - CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); + if (cctx.cancelRemove(interceptRes)) { + CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())); - return new GridCacheUpdateTxResult(false, ret); - } + return new GridCacheUpdateTxResult(false, ret, logPtr); } + } - removeValue(); + removeValue(); - update(null, 0, 0, newVer, true); + update(null, 0, 0, newVer, true); - if (cctx.deferredDelete() && !detached() && !isInternal()) { - if (!deletedUnlocked()) { - deletedUnlocked(true); + if (cctx.deferredDelete() && !detached() && !isInternal()) { + if (!deletedUnlocked()) { + deletedUnlocked(true); - if (tx != null) { - GridCacheMvcc mvcc = mvccExtras(); + if (tx != null) { + GridCacheMvcc mvcc = mvccExtras(); - if (mvcc == null || mvcc.isEmpty(tx.xidVersion())) - clearReaders(); - else - clearReader(tx.originatingNodeId()); - } + if (mvcc == null || mvcc.isEmpty(tx.xidVersion())) + clearReaders(); + else + clearReader(tx.originatingNodeId()); } } + } - updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr); + updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr); - if (updateCntr != null && updateCntr != 0) - updateCntr0 = updateCntr; + if (updateCntr != null && updateCntr != 0) + updateCntr0 = updateCntr; - drReplicate(drType, null, newVer, topVer); + if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled()) + logPtr = logTxUpdate(tx, null, 0, updateCntr0); - if (metrics && cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRemove(); + drReplicate(drType, null, newVer, topVer); - if (tx == null) - obsoleteVer = newVer; - else { - // Only delete entry if the lock is not explicit. - if (lockedBy(tx.xidVersion())) - obsoleteVer = tx.xidVersion(); - else if (log.isDebugEnabled()) - log.debug("Obsolete version was not set because lock was explicit: " + this); - } + if (metrics && cctx.statisticsEnabled()) + cctx.cache().metrics0().onRemove(); - if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { - CacheObject evtOld = cctx.unwrapTemporary(old); + if (tx == null) + obsoleteVer = newVer; + else { + // Only delete entry if the lock is not explicit. + if (lockedBy(tx.xidVersion())) + obsoleteVer = tx.xidVersion(); + else if (log.isDebugEnabled()) + log.debug("Obsolete version was not set because lock was explicit: " + this); + } - cctx.events().addEvent(partition(), - key, - evtNodeId, - tx == null ? null : tx.xid(), newVer, - EVT_CACHE_OBJECT_REMOVED, - null, - false, - evtOld, - evtOld != null || hasValueUnlocked(), - subjId, - null, - taskName, - keepBinary); - } + if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + CacheObject evtOld = cctx.unwrapTemporary(old); - if (lsnrCol != null) { - cctx.continuousQueries().onEntryUpdated( - lsnrCol, - key, - null, - old, - internal, - partition(), - tx.local(), - false, - updateCntr0, - null, - topVer); - } + cctx.events().addEvent(partition(), + key, + evtNodeId, + tx == null ? null : tx.xid(), newVer, + EVT_CACHE_OBJECT_REMOVED, + null, + false, + evtOld, + evtOld != null || hasValueUnlocked(), + subjId, + null, + taskName, + keepBinary); + } - cctx.dataStructures().onEntryUpdated(key, true, keepBinary); + if (lsnrCol != null) { + cctx.continuousQueries().onEntryUpdated( + lsnrCol, + key, + null, + old, + internal, + partition(), + tx.local(), + false, + updateCntr0, + null, + topVer); + } - deferred = cctx.deferredDelete() && !detached() && !isInternal(); + cctx.dataStructures().onEntryUpdated(key, true, keepBinary); - if (intercept) - entry0.updateCounter(updateCntr0); + deferred = cctx.deferredDelete() && !detached() && !isInternal(); - if (!deferred) { - // If entry is still removed. - assert newVer == ver; + if (intercept) + entry0.updateCounter(updateCntr0); - if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) { - if (log.isDebugEnabled()) - log.debug("Entry could not be marked obsolete (it is still used): " + this); - } else { - recordNodeId(affNodeId, topVer); + if (!deferred) { + // If entry is still removed. + assert newVer == ver; - if (log.isDebugEnabled()) - log.debug("Entry was marked obsolete: " + this); - } + if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) { + if (log.isDebugEnabled()) + log.debug("Entry could not be marked obsolete (it is still used): " + this); + } + else { + recordNodeId(affNodeId, topVer); + + if (log.isDebugEnabled()) + log.debug("Entry was marked obsolete: " + this); } } - } finally { - cctx.continuousQueries().getListenerReadLock().unlock(); + } + finally { + unlockEntry(); ++ unlockListenerReadLock(); } if (deferred) @@@ -1320,659 -1381,655 +1391,659 @@@ IgniteBiTuple interceptorRes = null; EntryProcessorResult invokeRes = null; - - cctx.continuousQueries().getListenerReadLock().lock(); + ++ lockListenerReadLock(); + lockEntry(); + try { + boolean internal = isInternal() || !context().userCache(); - synchronized (this) { - boolean internal = isInternal() || !context().userCache(); - - Map lsnrCol = - cctx.continuousQueries().updateListeners(internal, false); - - boolean needVal = retval || - intercept || - op == GridCacheOperation.TRANSFORM || - !F.isEmpty(filter) || - lsnrCol != null; - - checkObsolete(); - - CacheDataRow oldRow = null; - - // Load and remove from swap if it is new. - if (isNew()) - oldRow = unswap(null, false); - - old = val; - - boolean readFromStore = false; - - Object old0 = null; - - if (readThrough && needVal && old == null && - (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { - old0 = readThrough(null, key, false, subjId, taskName); - - old = cctx.toCacheObject(old0); - - long ttl = CU.TTL_ETERNAL; - long expireTime = CU.EXPIRE_TIME_ETERNAL; - - if (expiryPlc != null && old != null) { - ttl = CU.toTtl(expiryPlc.getExpiryForCreation()); - - if (ttl == CU.TTL_ZERO) { - ttl = CU.TTL_MINIMUM; - expireTime = CU.expireTimeInPast(); - } - else if (ttl == CU.TTL_NOT_CHANGED) - ttl = CU.TTL_ETERNAL; - else - expireTime = CU.toExpireTime(ttl); - } - - // Detach value before index update. - old = cctx.kernalContext().cacheObjects().prepareForCache(old, cctx); - - if (old != null) - storeValue(old, expireTime, ver, oldRow); - else - removeValue(); - - update(old, expireTime, ttl, ver, true); - } - - // Apply metrics. - if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) { - // PutIfAbsent methods mustn't update hit/miss statistics - if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) - cctx.cache().metrics0().onRead(old != null); - } - - // Check filter inside of synchronization. - if (!F.isEmpty(filter)) { - boolean pass = cctx.isAllLocked(this, filter); - - if (!pass) { - if (expiryPlc != null && !readFromStore && !cctx.putIfAbsentFilter(filter) && hasValueUnlocked()) - updateTtl(expiryPlc); - - Object val = retval ? - cctx.cacheObjectContext().unwrapBinaryIfNeeded(CU.value(old, cctx, false), keepBinary, false) - : null; - - return new T3<>(false, val, null); - } - } - - String transformCloClsName = null; - - CacheObject updated; - - Object key0 = null; - Object updated0 = null; - - // Calculate new value. - if (op == GridCacheOperation.TRANSFORM) { - transformCloClsName = EntryProcessorResourceInjectorProxy.unwrap(writeObj).getClass().getName(); - - EntryProcessor entryProcessor = (EntryProcessor)writeObj; - - assert entryProcessor != null; - - CacheInvokeEntry entry = new CacheInvokeEntry<>(key, old, version(), keepBinary, this); - - try { - Object computed = entryProcessor.process(entry, invokeArgs); - - if (entry.modified()) { - updated0 = cctx.unwrapTemporary(entry.getValue()); - - updated = cctx.toCacheObject(updated0); - - if (updated != null) // no validation for remove case - cctx.validateKeyAndValue(key, updated); - } - else - updated = old; - - key0 = entry.key(); - - invokeRes = computed != null ? CacheInvokeResult.fromResult(cctx.unwrapTemporary(computed)) : null; - } - catch (Exception e) { - updated = old; - - invokeRes = CacheInvokeResult.fromError(e); - } - - if (!entry.modified()) { - if (expiryPlc != null && !readFromStore && hasValueUnlocked()) - updateTtl(expiryPlc); - - return new GridTuple3<>(false, null, invokeRes); - } - } - else - updated = (CacheObject)writeObj; - - op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE; - - if (intercept) { - CacheLazyEntry e; - - if (op == GridCacheOperation.UPDATE) { - updated0 = value(updated0, updated, keepBinary, false); - - e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary); - - Object interceptorVal = cctx.config().getInterceptor().onBeforePut(e, updated0); - - if (interceptorVal == null) - return new GridTuple3<>(false, cctx.unwrapTemporary(value(old0, old, keepBinary, false)), invokeRes); - else { - updated0 = cctx.unwrapTemporary(interceptorVal); - - updated = cctx.toCacheObject(updated0); - } - } - else { - e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary); - - interceptorRes = cctx.config().getInterceptor().onBeforeRemove(e); - - if (cctx.cancelRemove(interceptorRes)) - return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes); - } - - key0 = e.key(); - old0 = e.value(); - } - - boolean hadVal = hasValueUnlocked(); - - long ttl = CU.TTL_ETERNAL; - long expireTime = CU.EXPIRE_TIME_ETERNAL; - - if (op == GridCacheOperation.UPDATE) { - if (expiryPlc != null) { - ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation()); - - if (ttl == CU.TTL_NOT_CHANGED) { - ttl = ttlExtras(); - expireTime = expireTimeExtras(); - } - else if (ttl != CU.TTL_ZERO) - expireTime = CU.toExpireTime(ttl); - } - else { - ttl = ttlExtras(); - expireTime = expireTimeExtras(); - } - } - - if (ttl == CU.TTL_ZERO) - op = GridCacheOperation.DELETE; - - // Try write-through. - if (op == GridCacheOperation.UPDATE) { - // Detach value before index update. - updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx); - - if (writeThrough) - // Must persist inside synchronization in non-tx mode. - cctx.store().put(null, key, updated, ver); - - storeValue(updated, expireTime, ver, oldRow); - - assert ttl != CU.TTL_ZERO; - - update(updated, expireTime, ttl, ver, true); - - if (evt) { - CacheObject evtOld = null; - - if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - evtOld = cctx.unwrapTemporary(old); - - cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, - (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, - evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary); - } - - if (cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(old); - - cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, - (GridCacheVersion)null, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld, - evtOld != null || hadVal, subjId, null, taskName, keepBinary); - } - } - } - else { - if (writeThrough) - // Must persist inside synchronization in non-tx mode. - cctx.store().remove(null, key); - - removeValue(); - - update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true); - - if (evt) { - CacheObject evtOld = null; - - if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) - cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, - (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, - evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary); - - if (cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(old); - - cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, (GridCacheVersion)null, - EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, subjId, null, - taskName, keepBinary); - } - } - - res = hadVal; - } - - if (res) - updateMetrics(op, metrics); - - if (lsnrCol != null) { - long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null); - - cctx.continuousQueries().onEntryUpdated( - lsnrCol, - key, - val, - old, - internal, - partition(), - true, - false, - updateCntr, - null, - AffinityTopologyVersion.NONE); - - onUpdateFinished(updateCntr); - } - - cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); - - if (intercept) { - if (op == GridCacheOperation.UPDATE) - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L)); - else - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L)); - } - } - } finally { - cctx.continuousQueries().getListenerReadLock().unlock(); - } + Map lsnrCol = + cctx.continuousQueries().updateListeners(internal, false); - return new GridTuple3<>(res, - cctx.unwrapTemporary(interceptorRes != null ? - interceptorRes.get2() : - cctx.cacheObjectContext().unwrapBinaryIfNeeded(old, keepBinary, false)), - invokeRes); - } + boolean needVal = retval || + intercept || + op == GridCacheOperation.TRANSFORM || + !F.isEmpty(filter) || + lsnrCol != null; - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public GridCacheUpdateAtomicResult innerUpdate( - final GridCacheVersion newVer, - final UUID evtNodeId, - final UUID affNodeId, - final GridCacheOperation op, - @Nullable final Object writeObj, - @Nullable final Object[] invokeArgs, - final boolean writeThrough, - final boolean readThrough, - final boolean retval, - final boolean keepBinary, - @Nullable final IgniteCacheExpiryPolicy expiryPlc, - final boolean evt, - final boolean metrics, - final boolean primary, - final boolean verCheck, - final AffinityTopologyVersion topVer, - @Nullable final CacheEntryPredicate[] filter, - final GridDrType drType, - final long explicitTtl, - final long explicitExpireTime, - @Nullable final GridCacheVersion conflictVer, - final boolean conflictResolve, - final boolean intercept, - @Nullable final UUID subjId, - final String taskName, - @Nullable final CacheObject prevVal, - @Nullable final Long updateCntr, - @Nullable final GridDhtAtomicAbstractUpdateFuture fut - ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { - assert cctx.atomic() && !detached(); + checkObsolete(); - AtomicCacheUpdateClosure c; + CacheDataRow oldRow = null; - if (!primary && !isNear()) - ensureFreeSpace(); + // Load and remove from swap if it is new. + if (isNew()) + oldRow = unswap(null, false); - cctx.continuousQueries().getListenerReadLock().lock(); - try { - synchronized (this) { - checkObsolete(); - - boolean internal = isInternal() || !context().userCache(); - - Map lsnrs = cctx.continuousQueries().updateListeners(internal, false); - - boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM - || !F.isEmptyOrNulls(filter); - - // Possibly read value from store. - boolean readFromStore = readThrough && needVal && (cctx.readThrough() && - (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue())); - - c = new AtomicCacheUpdateClosure(this, - topVer, - newVer, - op, - writeObj, - invokeArgs, - readFromStore, - writeThrough, - keepBinary, - expiryPlc, - primary, - verCheck, - filter, - explicitTtl, - explicitExpireTime, - conflictVer, - conflictResolve, - intercept, - updateCntr); - - key.valueBytes(cctx.cacheObjectContext()); - - if (isNear()) { - CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null; - - c.call(dataRow); - } - else - cctx.offheap().invoke(cctx, key, localPartition(), c); - - GridCacheUpdateAtomicResult updateRes = c.updateRes; - - assert updateRes != null : c; - - CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null; - CacheObject updateVal = null; - GridCacheVersion updateVer = c.newVer; - - // Apply metrics. - if (metrics && - updateRes.outcome().updateReadMetrics() && - cctx.cache().configuration().isStatisticsEnabled() && - needVal) { - // PutIfAbsent methods must not update hit/miss statistics. - if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) - cctx.cache().metrics0().onRead(oldVal != null); - } - - switch (updateRes.outcome()) { - case VERSION_CHECK_FAILED: { - if (!cctx.isNear()) { - CacheObject evtVal; - - if (op == GridCacheOperation.TRANSFORM) { - EntryProcessor entryProcessor = - (EntryProcessor)writeObj; - - CacheInvokeEntry entry = - new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this); - - try { - entryProcessor.process(entry, invokeArgs); - - evtVal = entry.modified() ? - cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; - } - catch (Exception ignore) { - evtVal = prevVal; - } - } - else - evtVal = (CacheObject)writeObj; - - long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr); - - if (updateCntr != null) - updateCntr0 = updateCntr; - - onUpdateFinished(updateCntr0); - - cctx.continuousQueries().onEntryUpdated( - key, - evtVal, - prevVal, - isInternal() || !context().userCache(), - partition(), - primary, - false, - updateCntr0, - null, - topVer); - } - - return updateRes; - } - - case CONFLICT_USE_OLD: - case FILTER_FAILED: - case INVOKE_NO_OP: - case INTERCEPTOR_CANCEL: - return updateRes; - } - - assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL; - - CacheObject evtOld = null; - - if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { - assert writeObj instanceof EntryProcessor : writeObj; - - evtOld = cctx.unwrapTemporary(oldVal); - - Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj); - - cctx.events().addEvent(partition(), - key, - evtNodeId, - null, - newVer, - EVT_CACHE_OBJECT_READ, - evtOld, evtOld != null, - evtOld, evtOld != null, - subjId, - transformClo.getClass().getName(), - taskName, - keepBinary); - } - - if (c.op == GridCacheOperation.UPDATE) { - updateVal = val; - - assert updateVal != null : c; - - drReplicate(drType, updateVal, updateVer, topVer); - - recordNodeId(affNodeId, topVer); - - if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(oldVal); - - cctx.events().addEvent(partition(), - key, - evtNodeId, - null, - newVer, - EVT_CACHE_OBJECT_PUT, - updateVal, - true, - evtOld, - evtOld != null, - subjId, - null, - taskName, - keepBinary); - } - } - else { - assert c.op == GridCacheOperation.DELETE : c.op; - - clearReaders(); - - drReplicate(drType, null, newVer, topVer); - - recordNodeId(affNodeId, topVer); - - if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { - if (evtOld == null) - evtOld = cctx.unwrapTemporary(oldVal); - - cctx.events().addEvent(partition(), - key, - evtNodeId, - null, newVer, - EVT_CACHE_OBJECT_REMOVED, - null, false, - evtOld, evtOld != null, - subjId, - null, - taskName, - keepBinary); - } - } - - if (updateRes.success()) - updateMetrics(c.op, metrics); - - Map curLsnrs; - - // Continuous query filter should be perform under lock. - if (lsnrs != null) { - CacheObject evtVal = cctx.unwrapTemporary(updateVal); - CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); - - cctx.continuousQueries().onEntryUpdated(lsnrs, - key, - evtVal, - evtOldVal, - internal, - partition(), - primary, - false, - c.updateRes.updateCounter(), - fut, - topVer); - } - - cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary); - - if (intercept) { - if (c.op == GridCacheOperation.UPDATE) { - cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( - cctx, - key, - null, - updateVal, - null, - keepBinary, - c.updateRes.updateCounter())); - } - else { - assert c.op == GridCacheOperation.DELETE : c.op; - - cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( - cctx, - key, - null, - oldVal, - null, - keepBinary, - c.updateRes.updateCounter())); - } - } - } - } finally { - cctx.continuousQueries().getListenerReadLock().unlock(); - } + old = val; - onUpdateFinished(c.updateRes.updateCounter()); + boolean readFromStore = false; - return c.updateRes; - } + Object old0 = null; - /** - * @param val Value. - * @param cacheObj Cache object. - * @param keepBinary Keep binary flag. - * @param cpy Copy flag. - * @return Cache object value. - */ - @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) { - if (val != null) - return val; + if (readThrough && needVal && old == null && + (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { + old0 = readThrough(null, key, false, subjId, taskName); - return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy); - } + old = cctx.toCacheObject(old0); - /** - * @param expiry Expiration policy. - * @return Tuple holding initial TTL and expire time with the given expiry. - */ - private static IgniteBiTuple initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) { - assert expiry != null; + long ttl = CU.TTL_ETERNAL; + long expireTime = CU.EXPIRE_TIME_ETERNAL; - long initTtl = expiry.forCreate(); - long initExpireTime; + if (expiryPlc != null && old != null) { + ttl = CU.toTtl(expiryPlc.getExpiryForCreation()); - if (initTtl == CU.TTL_ZERO) { - initTtl = CU.TTL_MINIMUM; - initExpireTime = CU.expireTimeInPast(); - } - else if (initTtl == CU.TTL_NOT_CHANGED) { - initTtl = CU.TTL_ETERNAL; - initExpireTime = CU.EXPIRE_TIME_ETERNAL; - } - else - initExpireTime = CU.toExpireTime(initTtl); + if (ttl == CU.TTL_ZERO) { + ttl = CU.TTL_MINIMUM; + expireTime = CU.expireTimeInPast(); + } + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = CU.TTL_ETERNAL; + else + expireTime = CU.toExpireTime(ttl); + } - return F.t(initTtl, initExpireTime); - } + // Detach value before index update. + old = cctx.kernalContext().cacheObjects().prepareForCache(old, cctx); - /** - * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time. - * - * @param expiry Expiration policy. - * @param ttl Explicit TTL. - * @param expireTime Explicit expire time. - * @return Result. - */ - private GridTuple3 ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) { - assert !obsolete(); + if (old != null) + storeValue(old, expireTime, ver, oldRow); + else + removeValue(); - boolean rmv = false; + update(old, expireTime, ttl, ver, true); + } - // 1. If TTL is not changed, then calculate it based on expiry. + // Apply metrics. + if (metrics && cctx.statisticsEnabled() && needVal) { + // PutIfAbsent methods mustn't update hit/miss statistics + if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) + cctx.cache().metrics0().onRead(old != null); + } + + // Check filter inside of synchronization. + if (!F.isEmpty(filter)) { + boolean pass = cctx.isAllLocked(this, filter); + + if (!pass) { + if (expiryPlc != null && !readFromStore && !cctx.putIfAbsentFilter(filter) && hasValueUnlocked()) + updateTtl(expiryPlc); + + Object val = retval ? + cctx.cacheObjectContext().unwrapBinaryIfNeeded(CU.value(old, cctx, false), keepBinary, false) + : null; + + return new T3<>(false, val, null); + } + } + + String transformCloClsName = null; + + CacheObject updated; + + Object key0 = null; + Object updated0 = null; + + // Calculate new value. + if (op == GridCacheOperation.TRANSFORM) { + transformCloClsName = EntryProcessorResourceInjectorProxy.unwrap(writeObj).getClass().getName(); + + EntryProcessor entryProcessor = (EntryProcessor)writeObj; + + assert entryProcessor != null; + + CacheInvokeEntry entry = new CacheInvokeEntry<>(key, old, version(), keepBinary, this); + + try { + Object computed = entryProcessor.process(entry, invokeArgs); + + if (entry.modified()) { + updated0 = cctx.unwrapTemporary(entry.getValue()); + + updated = cctx.toCacheObject(updated0); + + cctx.validateKeyAndValue(key, updated); + } + else + updated = old; + + key0 = entry.key(); + + invokeRes = computed != null ? CacheInvokeResult.fromResult(cctx.unwrapTemporary(computed)) : null; + } + catch (Exception e) { + updated = old; + + invokeRes = CacheInvokeResult.fromError(e); + } + + if (!entry.modified()) { + if (expiryPlc != null && !readFromStore && hasValueUnlocked()) + updateTtl(expiryPlc); + + return new GridTuple3<>(false, null, invokeRes); + } + } + else + updated = (CacheObject)writeObj; + + op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE; + + if (intercept) { + CacheLazyEntry e; + + if (op == GridCacheOperation.UPDATE) { + updated0 = value(updated0, updated, keepBinary, false); + + e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary); + + Object interceptorVal = cctx.config().getInterceptor().onBeforePut(e, updated0); + + if (interceptorVal == null) + return new GridTuple3<>(false, cctx.unwrapTemporary(value(old0, old, keepBinary, false)), invokeRes); + else { + updated0 = cctx.unwrapTemporary(interceptorVal); + + updated = cctx.toCacheObject(updated0); + } + } + else { + e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary); + + interceptorRes = cctx.config().getInterceptor().onBeforeRemove(e); + + if (cctx.cancelRemove(interceptorRes)) + return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes); + } + + key0 = e.key(); + old0 = e.value(); + } + + boolean hadVal = hasValueUnlocked(); + + long ttl = CU.TTL_ETERNAL; + long expireTime = CU.EXPIRE_TIME_ETERNAL; + + if (op == GridCacheOperation.UPDATE) { + if (expiryPlc != null) { + ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation()); + + if (ttl == CU.TTL_NOT_CHANGED) { + ttl = ttlExtras(); + expireTime = expireTimeExtras(); + } + else if (ttl != CU.TTL_ZERO) + expireTime = CU.toExpireTime(ttl); + } + else { + ttl = ttlExtras(); + expireTime = expireTimeExtras(); + } + } + + if (ttl == CU.TTL_ZERO) + op = GridCacheOperation.DELETE; + + // Try write-through. + if (op == GridCacheOperation.UPDATE) { + // Detach value before index update. + updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx); + + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().put(null, key, updated, ver); + + storeValue(updated, expireTime, ver, oldRow); + + assert ttl != CU.TTL_ZERO; + + update(updated, expireTime, ttl, ver, true); + + if (evt) { + CacheObject evtOld = null; + + if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, + (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, + evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary); + } + + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, + (GridCacheVersion)null, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld, + evtOld != null || hadVal, subjId, null, taskName, keepBinary); + } + } + } + else { + if (writeThrough) + // Must persist inside synchronization in non-tx mode. + cctx.store().remove(null, key); + + removeValue(); + + update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true); + + if (evt) { + CacheObject evtOld = null; + + if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, + (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, + evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary); + + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(old); + + cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, (GridCacheVersion)null, + EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, subjId, null, + taskName, keepBinary); + } + } + + res = hadVal; + } + + if (res) + updateMetrics(op, metrics); + + if (lsnrCol != null) { + long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null); + + cctx.continuousQueries().onEntryUpdated( + lsnrCol, + key, + val, + old, + internal, + partition(), + true, + false, + updateCntr, + null, + AffinityTopologyVersion.NONE); + + onUpdateFinished(updateCntr); + } + + cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); + + if (intercept) { + if (op == GridCacheOperation.UPDATE) + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L)); + else + cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L)); + } + } + finally { + unlockEntry(); ++ unlockListenerReadLock(); + } + + return new GridTuple3<>(res, + cctx.unwrapTemporary(interceptorRes != null ? + interceptorRes.get2() : + cctx.cacheObjectContext().unwrapBinaryIfNeeded(old, keepBinary, false)), + invokeRes); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public GridCacheUpdateAtomicResult innerUpdate( + final GridCacheVersion newVer, + final UUID evtNodeId, + final UUID affNodeId, + final GridCacheOperation op, + @Nullable final Object writeObj, + @Nullable final Object[] invokeArgs, + final boolean writeThrough, + final boolean readThrough, + final boolean retval, + final boolean keepBinary, + @Nullable final IgniteCacheExpiryPolicy expiryPlc, + final boolean evt, + final boolean metrics, + final boolean primary, + final boolean verCheck, + final AffinityTopologyVersion topVer, + @Nullable final CacheEntryPredicate[] filter, + final GridDrType drType, + final long explicitTtl, + final long explicitExpireTime, + @Nullable final GridCacheVersion conflictVer, + final boolean conflictResolve, + final boolean intercept, + @Nullable final UUID subjId, + final String taskName, + @Nullable final CacheObject prevVal, + @Nullable final Long updateCntr, + @Nullable final GridDhtAtomicAbstractUpdateFuture fut + ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { + assert cctx.atomic() && !detached(); + + AtomicCacheUpdateClosure c; + + if (!primary && !isNear()) + ensureFreeSpace(); + ++ lockListenerReadLock(); + lockEntry(); + + try { + checkObsolete(); + + boolean internal = isInternal() || !context().userCache(); + + Map lsnrs = cctx.continuousQueries().updateListeners(internal, false); + + boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM + || !F.isEmptyOrNulls(filter); + + // Possibly read value from store. + boolean readFromStore = readThrough && needVal && (cctx.readThrough() && + (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue())); + + c = new AtomicCacheUpdateClosure(this, + topVer, + newVer, + op, + writeObj, + invokeArgs, + readFromStore, + writeThrough, + keepBinary, + expiryPlc, + primary, + verCheck, + filter, + explicitTtl, + explicitExpireTime, + conflictVer, + conflictResolve, + intercept, + updateCntr); + + key.valueBytes(cctx.cacheObjectContext()); + + if (isNear()) { + CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null; + + c.call(dataRow); + } + else + cctx.offheap().invoke(cctx, key, localPartition(), c); + + GridCacheUpdateAtomicResult updateRes = c.updateRes; + + assert updateRes != null : c; + + CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null; + CacheObject updateVal = null; + GridCacheVersion updateVer = c.newVer; + + // Apply metrics. + if (metrics && + updateRes.outcome().updateReadMetrics() && + cctx.statisticsEnabled() && + needVal) { + // PutIfAbsent methods must not update hit/miss statistics. + if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter)) + cctx.cache().metrics0().onRead(oldVal != null); + } + + switch (updateRes.outcome()) { + case VERSION_CHECK_FAILED: { + if (!cctx.isNear()) { + CacheObject evtVal; + + if (op == GridCacheOperation.TRANSFORM) { + EntryProcessor entryProcessor = + (EntryProcessor)writeObj; + + CacheInvokeEntry entry = + new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this); + + try { + entryProcessor.process(entry, invokeArgs); + + evtVal = entry.modified() ? + cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; + } + catch (Exception ignore) { + evtVal = prevVal; + } + } + else + evtVal = (CacheObject)writeObj; + + long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr); + + if (updateCntr != null) + updateCntr0 = updateCntr; + + onUpdateFinished(updateCntr0); + + cctx.continuousQueries().onEntryUpdated( + key, + evtVal, + prevVal, + isInternal() || !context().userCache(), + partition(), + primary, + false, + updateCntr0, + null, + topVer); + } + + return updateRes; + } + + case CONFLICT_USE_OLD: + case FILTER_FAILED: + case INVOKE_NO_OP: + case INTERCEPTOR_CANCEL: + return updateRes; + } + + assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL; + + CacheObject evtOld = null; + + if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + assert writeObj instanceof EntryProcessor : writeObj; + + evtOld = cctx.unwrapTemporary(oldVal); + + Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj); + + cctx.events().addEvent(partition(), + key, + evtNodeId, + null, + newVer, + EVT_CACHE_OBJECT_READ, + evtOld, evtOld != null, + evtOld, evtOld != null, + subjId, + transformClo.getClass().getName(), + taskName, + keepBinary); + } + + if (c.op == GridCacheOperation.UPDATE) { + updateVal = val; + + assert updateVal != null : c; + + drReplicate(drType, updateVal, updateVer, topVer); + + recordNodeId(affNodeId, topVer); + + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(oldVal); + + cctx.events().addEvent(partition(), + key, + evtNodeId, + null, + newVer, + EVT_CACHE_OBJECT_PUT, + updateVal, + true, + evtOld, + evtOld != null, + subjId, + null, + taskName, + keepBinary); + } + } + else { + assert c.op == GridCacheOperation.DELETE : c.op; + + clearReaders(); + + drReplicate(drType, null, newVer, topVer); + + recordNodeId(affNodeId, topVer); + + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) { + if (evtOld == null) + evtOld = cctx.unwrapTemporary(oldVal); + + cctx.events().addEvent(partition(), + key, + evtNodeId, + null, newVer, + EVT_CACHE_OBJECT_REMOVED, + null, false, + evtOld, evtOld != null, + subjId, + null, + taskName, + keepBinary); + } + } + + if (updateRes.success()) + updateMetrics(c.op, metrics); + + // Continuous query filter should be perform under lock. + if (lsnrs != null) { + CacheObject evtVal = cctx.unwrapTemporary(updateVal); + CacheObject evtOldVal = cctx.unwrapTemporary(oldVal); + + cctx.continuousQueries().onEntryUpdated(lsnrs, + key, + evtVal, + evtOldVal, + internal, + partition(), + primary, + false, + c.updateRes.updateCounter(), + fut, + topVer); + } + + cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary); + + if (intercept) { + if (c.op == GridCacheOperation.UPDATE) { + cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( + cctx, + key, + null, + updateVal, + null, + keepBinary, + c.updateRes.updateCounter())); + } + else { + assert c.op == GridCacheOperation.DELETE : c.op; + + cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry( + cctx, + key, + null, + oldVal, + null, + keepBinary, + c.updateRes.updateCounter())); + } + } + } + finally { + unlockEntry(); ++ unlockListenerReadLock(); + } + + onUpdateFinished(c.updateRes.updateCounter()); + + return c.updateRes; + } + + /** + * @param val Value. + * @param cacheObj Cache object. + * @param keepBinary Keep binary flag. + * @param cpy Copy flag. + * @return Cache object value. + */ + @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) { + if (val != null) + return val; + + return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy); + } + + /** + * @param expiry Expiration policy. + * @return Tuple holding initial TTL and expire time with the given expiry. + */ + private static IgniteBiTuple initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) { + assert expiry != null; + + long initTtl = expiry.forCreate(); + long initExpireTime; + + if (initTtl == CU.TTL_ZERO) { + initTtl = CU.TTL_MINIMUM; + initExpireTime = CU.expireTimeInPast(); + } + else if (initTtl == CU.TTL_NOT_CHANGED) { + initTtl = CU.TTL_ETERNAL; + initExpireTime = CU.EXPIRE_TIME_ETERNAL; + } + else + initExpireTime = CU.toExpireTime(initTtl); + + return F.t(initTtl, initExpireTime); + } + + /** + * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time. + * + * @param expiry Expiration policy. + * @param ttl Explicit TTL. + * @param expireTime Explicit expire time. + * @return Result. + */ + private GridTuple3 ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) { + assert !obsolete(); + + boolean rmv = false; + + // 1. If TTL is not changed, then calculate it based on expiry. if (ttl == CU.TTL_NOT_CHANGED) { if (expiry != null) ttl = hasValueUnlocked() ? expiry.forUpdate() : expiry.forCreate(); @@@ -2559,38 -2708,80 +2722,81 @@@ ) throws IgniteCheckedException, GridCacheEntryRemovedException { ensureFreeSpace(); - synchronized (this) { + boolean deferred = false; + boolean obsolete = false; + + GridCacheVersion oldVer = null; + ++ lockListenerReadLock(); + lockEntry(); + + try { checkObsolete(); + boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled(); + + long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; + + val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); + + final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0); + boolean update; - boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled(); + IgnitePredicate p = new IgnitePredicate() { + @Override public boolean apply(@Nullable CacheDataRow row) { + boolean update0; + + GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver; - if (cctx.group().persistenceEnabled()) { - unswap(false); + boolean isStartVer = currentVer.nodeOrder() == cctx.localNode().order() + && currentVer.order() == startVer; - if (!isNew()) { - if (cctx.atomic()) - update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) < 0; + if (cctx.group().persistenceEnabled()) { + if (!isStartVer) { + if (cctx.atomic()) + update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0; + else + update0 = currentVer.compareTo(ver) < 0; + } + else + update0 = true; + } else - update = this.ver.compareTo(ver) < 0; + update0 = isStartVer; + + update0 |= (!preload && deletedUnlocked()); + + return update0; } - else - update = true; - } - else - update = isNew() && !cctx.offheap().containsKey(this); + }; - update |= !preload && deletedUnlocked(); + if (unswapped) { + update = p.apply(null); - if (update) { - long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; + if (update) { + // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value. + long oldExpTime = expireTimeUnlocked(); + long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis()); - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); + if (delta < 0) { + if (onExpired(this.val, null)) { + if (cctx.deferredDelete()) { + deferred = true; + oldVer = this.ver; + } + else if (val == null) + obsolete = true; + } + } - if (val != null) storeValue(val, expTime, ver, null); + } + } + else // Optimization to access storage only once. + update = storeValue(val, expTime, ver, null, p); + if (update) { update(val, expTime, ttl, ver, true); boolean skipQryNtf = false; @@@ -2653,6 -2844,23 +2859,24 @@@ return false; } + finally { + unlockEntry(); ++ unlockListenerReadLock(); + + // It is necessary to execute these callbacks outside of lock to avoid deadlocks. + + if (obsolete) { + onMarkedObsolete(); + + cctx.cache().removeEntry(this); + } + + if (deferred) { + assert oldVer != null; + + cctx.onDeferredDelete(this, oldVer); + } + } } /** @@@ -3775,6 -4289,21 +4305,29 @@@ } /** {@inheritDoc} */ + @Override public void lockEntry() { + lock.lock(); + } + + /** {@inheritDoc} */ + @Override public void unlockEntry() { + lock.unlock(); + } - ++ ++ private void lockListenerReadLock() { ++ listenerLock.lock(); ++ } ++ ++ private void unlockListenerReadLock() { ++ listenerLock.unlock(); ++ } ++ + /** {@inheritDoc} */ + @Override public boolean lockedByCurrentThread() { + return lock.isHeldByCurrentThread(); + } + + /** {@inheritDoc} */ @Override public boolean equals(Object o) { // Identity comparison left on purpose. return o == this; http://git-wip-us.apache.org/repos/asf/ignite/blob/2021942c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index b3e2a11,55c44b4..94bd988 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@@ -61,9 -59,9 +63,10 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; - import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; + import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; ++import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; @@@ -124,9 -123,6 +128,9 @@@ public class CacheContinuousQueryManage /** Ordered topic prefix. */ private String topicPrefix; + + /** ReadWriteLock to control setup of listener */ - private final ReadWriteLock listenerLock = new ReentrantReadWriteLock() ; ++ private final StripedCompositeReadWriteLock listenerLock = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()) ; /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@@ -833,9 -852,9 +870,8 @@@ * @param internal Internal flag. * @return Whether listener was actually registered. */ -- GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId, -- CacheContinuousQueryListener lsnr, -- boolean internal) { ++ GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId, CacheContinuousQueryListener lsnr, ++ boolean internal) { boolean added; if (internal) { @@@ -843,10 -862,9 +879,9 @@@ if (added) intLsnrCnt.incrementAndGet(); -- } -- else { - listenerLock.writeLock().lock(); - try { - synchronized (this) { ++ } else { ++ listenerLock.writeLock().lock(); ++ try { if (lsnrCnt.get() == 0) { if (cctx.group().sharedGroup() && !cctx.isLocal()) cctx.group().addCacheWithContinuousQuery(cctx); @@@ -856,16 -874,14 +891,16 @@@ if (added) lsnrCnt.incrementAndGet(); + } finally { - listenerLock.writeLock().unlock(); ++ listenerLock.writeLock().unlock(); } if (added) lsnr.onExecution(); } -- return added ? GridContinuousHandler.RegisterStatus.REGISTERED : -- GridContinuousHandler.RegisterStatus.NOT_REGISTERED; ++ return added ? GridContinuousHandler.RegisterStatus.REGISTERED ++ : GridContinuousHandler.RegisterStatus.NOT_REGISTERED; } /** @@@ -881,18 -897,15 +916,17 @@@ lsnr.onUnregister(); } -- } -- else { - listenerLock.writeLock().lock(); - try { - synchronized (this) { ++ } else { ++ listenerLock.writeLock().lock(); ++ try { if ((lsnr = lsnrs.remove(id)) != null) { int cnt = lsnrCnt.decrementAndGet(); if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal()) cctx.group().removeCacheWithContinuousQuery(cctx); } + } finally { - listenerLock.writeLock().unlock(); ++ listenerLock.writeLock().unlock(); } if (lsnr != null)