ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [02/10] ignite git commit: Update code to use StripedCompositeReadWriteLock Merge remote-tracking branch 'apache-ignite/master' into IGNITE-5960
Date Tue, 14 Aug 2018 12:02:10 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/2021942c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index ab1fff6,767c314..2e6dddd
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@@ -24,6 -24,7 +24,8 @@@ import java.util.List
  import java.util.Map;
  import java.util.UUID;
  import java.util.concurrent.atomic.AtomicReference;
++import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
  import javax.cache.Cache;
  import javax.cache.expiry.ExpiryPolicy;
  import javax.cache.processor.EntryProcessor;
@@@ -63,6 -66,6 +67,7 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheFilter;
  import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
  import org.apache.ignite.internal.util.IgniteTree;
++import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
  import org.apache.ignite.internal.util.lang.GridClosureException;
  import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
  import org.apache.ignite.internal.util.lang.GridTuple;
@@@ -154,6 -157,10 +159,13 @@@ public abstract class GridCacheMapEntr
      @GridToStringInclude
      private GridCacheEntryExtras extras;
  
+     /** */
+     @GridToStringExclude
 -    private final ReentrantLock lock = new ReentrantLock();
++    private final ReentrantLock lock = new ReentrantLock();   
++    
++    /** Read Lock for continuous query listener */
++    private final Lock listenerLock;
+ 
      /**
       * Flags:
       * <ul>
@@@ -182,6 -189,6 +194,7 @@@
          this.key = key;
          this.hash = key.hashCode();
          this.cctx = cctx;
++        this.listenerLock = cctx.continuousQueries().getListenerReadLock();
  
          ver = cctx.versions().next();
  
@@@ -910,137 -960,142 +966,144 @@@
  
          ensureFreeSpace();
  
-         cctx.continuousQueries().getListenerReadLock().lock();
++        lockListenerReadLock();
+         lockEntry();
+ 
          try {
-             synchronized (this) {
-                 checkObsolete();
+             checkObsolete();
  
-                 if (isNear()) {
-                     assert dhtVer != null;
+             if (isNear()) {
+                 assert dhtVer != null;
  
-                     // It is possible that 'get' could load more recent value.
-                     if (!((GridNearCacheEntry) this).recordDhtVersion(dhtVer))
-                         return new GridCacheUpdateTxResult(false, null);
-                 }
+                 // It is possible that 'get' could load more recent value.
+                 if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer))
+                     return new GridCacheUpdateTxResult(false, null, logPtr);
+             }
  
-                 assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
-                         "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']';
+             assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
+                 "Transaction does not own lock for update [entry=" + this + ", tx=" + tx + ']';
  
-                 // Load and remove from swap if it is new.
-                 boolean startVer = isStartVersion();
+             // Load and remove from swap if it is new.
+             boolean startVer = isStartVersion();
  
-                 boolean internal = isInternal() || !context().userCache();
+             boolean internal = isInternal() || !context().userCache();
  
-                 Map<UUID, CacheContinuousQueryListener> lsnrCol =
-                         notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+             Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                 notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
  
-                 if (startVer && (retval || intercept || lsnrCol != null))
-                     unswap(retval);
+             if (startVer && (retval || intercept || lsnrCol != null))
+                 unswap(retval);
  
-                 newVer = explicitVer != null ? explicitVer : tx == null ?
-                         nextVersion() : tx.writeVersion();
+             newVer = explicitVer != null ? explicitVer : tx == null ?
+                 nextVersion() : tx.writeVersion();
  
-                 assert newVer != null : "Failed to get write version for tx: " + tx;
+             assert newVer != null : "Failed to get write version for tx: " + tx;
  
-                 old = oldValPresent ? oldVal : this.val;
+             old = oldValPresent ? oldVal : this.val;
  
-                 if (intercept) {
-                     val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false);
+             if (intercept) {
+                 val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false);
  
-                     CacheLazyEntry e = new CacheLazyEntry(cctx, key, old, keepBinary);
+                 CacheLazyEntry e = new CacheLazyEntry(cctx, key, old, keepBinary);
  
-                     Object interceptorVal = cctx.config().getInterceptor().onBeforePut(
-                             new CacheLazyEntry(cctx, key, old, keepBinary),
-                             val0);
+                 Object interceptorVal = cctx.config().getInterceptor().onBeforePut(
+                     new CacheLazyEntry(cctx, key, old, keepBinary),
+                     val0);
  
-                     key0 = e.key();
+                 key0 = e.key();
  
-                     if (interceptorVal == null)
-                         return new GridCacheUpdateTxResult(false, (CacheObject) cctx.unwrapTemporary(old));
-                     else if (interceptorVal != val0)
-                         val0 = cctx.unwrapTemporary(interceptorVal);
+                 if (interceptorVal == null)
+                     return new GridCacheUpdateTxResult(false, (CacheObject)cctx.unwrapTemporary(old), logPtr);
+                 else if (interceptorVal != val0)
+                     val0 = cctx.unwrapTemporary(interceptorVal);
  
-                     val = cctx.toCacheObject(val0);
-                 }
+                 val = cctx.toCacheObject(val0);
+             }
  
-                 // Determine new ttl and expire time.
-                 long expireTime;
+             // Determine new ttl and expire time.
+             long expireTime;
  
-                 if (drExpireTime >= 0) {
-                     assert ttl >= 0 : ttl;
+             if (drExpireTime >= 0) {
+                 assert ttl >= 0 : ttl;
  
-                     expireTime = drExpireTime;
-                 } else {
-                     if (ttl == -1L) {
-                         ttl = ttlExtras();
-                         expireTime = expireTimeExtras();
-                     } else
-                         expireTime = CU.toExpireTime(ttl);
+                 expireTime = drExpireTime;
+             }
+             else {
+                 if (ttl == -1L) {
+                     ttl = ttlExtras();
+                     expireTime = expireTimeExtras();
                  }
+                 else
+                     expireTime = CU.toExpireTime(ttl);
+             }
  
-                 assert ttl >= 0 : ttl;
-                 assert expireTime >= 0 : expireTime;
+             assert ttl >= 0 : ttl;
+             assert expireTime >= 0 : expireTime;
  
-                 // Detach value before index update.
-                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+             // Detach value before index update.
+             val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
  
-                 assert val != null;
+             assert val != null;
  
-                 storeValue(val, expireTime, newVer, null);
+             storeValue(val, expireTime, newVer, null);
  
-                 if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
-                     deletedUnlocked(false);
+             if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached())
+                 deletedUnlocked(false);
  
-                 updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr);
+             updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr);
  
-                 if (updateCntr != null && updateCntr != 0)
-                     updateCntr0 = updateCntr;
+             if (updateCntr != null && updateCntr != 0)
+                 updateCntr0 = updateCntr;
  
-                 update(val, expireTime, ttl, newVer, true);
+             if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
+                 logPtr = logTxUpdate(tx, val, expireTime, updateCntr0);
  
-                 drReplicate(drType, val, newVer, topVer);
+             update(val, expireTime, ttl, newVer, true);
  
-                 recordNodeId(affNodeId, topVer);
+             drReplicate(drType, val, newVer, topVer);
  
-                 if (metrics && cctx.cache().configuration().isStatisticsEnabled())
-                     cctx.cache().metrics0().onWrite();
+             recordNodeId(affNodeId, topVer);
  
-                 if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
-                     CacheObject evtOld = cctx.unwrapTemporary(old);
+             if (metrics && cctx.statisticsEnabled())
+                 cctx.cache().metrics0().onWrite();
  
-                     cctx.events().addEvent(partition(),
-                             key,
-                             evtNodeId,
-                             tx == null ? null : tx.xid(),
-                             newVer,
-                             EVT_CACHE_OBJECT_PUT,
-                             val,
-                             val != null,
-                             evtOld,
-                             evtOld != null || hasValueUnlocked(),
-                             subjId, null, taskName,
-                             keepBinary);
-                 }
+             if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+                 CacheObject evtOld = cctx.unwrapTemporary(old);
  
-                 if (lsnrCol != null) {
-                     cctx.continuousQueries().onEntryUpdated(
-                             lsnrCol,
-                             key,
-                             val,
-                             old,
-                             internal,
-                             partition(),
-                             tx.local(),
-                             false,
-                             updateCntr0,
-                             null,
-                             topVer);
-                 }
+                 cctx.events().addEvent(partition(),
+                     key,
+                     evtNodeId,
+                     tx == null ? null : tx.xid(),
+                     newVer,
+                     EVT_CACHE_OBJECT_PUT,
+                     val,
+                     val != null,
+                     evtOld,
+                     evtOld != null || hasValueUnlocked(),
+                     subjId, null, taskName,
+                     keepBinary);
+             }
  
-                 cctx.dataStructures().onEntryUpdated(key, false, keepBinary);
+             if (lsnrCol != null) {
+                 cctx.continuousQueries().onEntryUpdated(
+                     lsnrCol,
+                     key,
+                     val,
+                     old,
+                     internal,
+                     partition(),
+                     tx.local(),
+                     false,
+                     updateCntr0,
+                     null,
+                     topVer);
              }
-         } finally {
-             cctx.continuousQueries().getListenerReadLock().unlock();
+ 
+             cctx.dataStructures().onEntryUpdated(key, false, keepBinary);
+         }
+         finally {
+             unlockEntry();
++            unlockListenerReadLock();
          }
  
          onUpdateFinished(updateCntr0);
@@@ -1114,144 -1171,148 +1179,150 @@@
  
          boolean marked = false;
  
-         cctx.continuousQueries().getListenerReadLock().lock();
++        lockListenerReadLock();
+         lockEntry();
+ 
          try {
-             synchronized (this) {
-                 checkObsolete();
+             checkObsolete();
  
-                 if (isNear()) {
-                     assert dhtVer != null;
+             if (isNear()) {
+                 assert dhtVer != null;
  
-                     // It is possible that 'get' could load more recent value.
-                     if (!((GridNearCacheEntry) this).recordDhtVersion(dhtVer))
-                         return new GridCacheUpdateTxResult(false, null);
-                 }
+                 // It is possible that 'get' could load more recent value.
+                 if (!((GridNearCacheEntry)this).recordDhtVersion(dhtVer))
+                     return new GridCacheUpdateTxResult(false, null, logPtr);
+             }
  
-                 assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
-                         "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
+             assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) :
+                 "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']';
  
-                 boolean startVer = isStartVersion();
+             boolean startVer = isStartVersion();
  
-                 newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
+             newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion();
  
-                 boolean internal = isInternal() || !context().userCache();
+             boolean internal = isInternal() || !context().userCache();
  
-                 Map<UUID, CacheContinuousQueryListener> lsnrCol =
-                         notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
+             Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                 notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null;
  
-                 if (startVer && (retval || intercept || lsnrCol != null))
-                     unswap();
+             if (startVer && (retval || intercept || lsnrCol != null))
+                 unswap();
  
-                 old = oldValPresent ? oldVal : val;
+             old = oldValPresent ? oldVal : val;
  
-                 if (intercept) {
-                     entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
+             if (intercept) {
+                 entry0 = new CacheLazyEntry(cctx, key, old, keepBinary);
  
-                     interceptRes = cctx.config().getInterceptor().onBeforeRemove(entry0);
+                 interceptRes = cctx.config().getInterceptor().onBeforeRemove(entry0);
  
-                     if (cctx.cancelRemove(interceptRes)) {
-                         CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
+                 if (cctx.cancelRemove(interceptRes)) {
+                     CacheObject ret = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
  
-                         return new GridCacheUpdateTxResult(false, ret);
-                     }
+                     return new GridCacheUpdateTxResult(false, ret, logPtr);
                  }
+             }
  
-                 removeValue();
+             removeValue();
  
-                 update(null, 0, 0, newVer, true);
+             update(null, 0, 0, newVer, true);
  
-                 if (cctx.deferredDelete() && !detached() && !isInternal()) {
-                     if (!deletedUnlocked()) {
-                         deletedUnlocked(true);
+             if (cctx.deferredDelete() && !detached() && !isInternal()) {
+                 if (!deletedUnlocked()) {
+                     deletedUnlocked(true);
  
-                         if (tx != null) {
-                             GridCacheMvcc mvcc = mvccExtras();
+                     if (tx != null) {
+                         GridCacheMvcc mvcc = mvccExtras();
  
-                             if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
-                                 clearReaders();
-                             else
-                                 clearReader(tx.originatingNodeId());
-                         }
+                         if (mvcc == null || mvcc.isEmpty(tx.xidVersion()))
+                             clearReaders();
+                         else
+                             clearReader(tx.originatingNodeId());
                      }
                  }
+             }
  
-                 updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr);
+             updateCntr0 = nextPartitionCounter(topVer, tx == null || tx.local(), updateCntr);
  
-                 if (updateCntr != null && updateCntr != 0)
-                     updateCntr0 = updateCntr;
+             if (updateCntr != null && updateCntr != 0)
+                 updateCntr0 = updateCntr;
  
-                 drReplicate(drType, null, newVer, topVer);
+             if (tx != null && cctx.group().persistenceEnabled() && cctx.group().walEnabled())
+                 logPtr = logTxUpdate(tx, null, 0, updateCntr0);
  
-                 if (metrics && cctx.cache().configuration().isStatisticsEnabled())
-                     cctx.cache().metrics0().onRemove();
+             drReplicate(drType, null, newVer, topVer);
  
-                 if (tx == null)
-                     obsoleteVer = newVer;
-                 else {
-                     // Only delete entry if the lock is not explicit.
-                     if (lockedBy(tx.xidVersion()))
-                         obsoleteVer = tx.xidVersion();
-                     else if (log.isDebugEnabled())
-                         log.debug("Obsolete version was not set because lock was explicit: " + this);
-                 }
+             if (metrics && cctx.statisticsEnabled())
+                 cctx.cache().metrics0().onRemove();
  
-                 if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
-                     CacheObject evtOld = cctx.unwrapTemporary(old);
+             if (tx == null)
+                 obsoleteVer = newVer;
+             else {
+                 // Only delete entry if the lock is not explicit.
+                 if (lockedBy(tx.xidVersion()))
+                     obsoleteVer = tx.xidVersion();
+                 else if (log.isDebugEnabled())
+                     log.debug("Obsolete version was not set because lock was explicit: " + this);
+             }
  
-                     cctx.events().addEvent(partition(),
-                             key,
-                             evtNodeId,
-                             tx == null ? null : tx.xid(), newVer,
-                             EVT_CACHE_OBJECT_REMOVED,
-                             null,
-                             false,
-                             evtOld,
-                             evtOld != null || hasValueUnlocked(),
-                             subjId,
-                             null,
-                             taskName,
-                             keepBinary);
-                 }
+             if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+                 CacheObject evtOld = cctx.unwrapTemporary(old);
  
-                 if (lsnrCol != null) {
-                     cctx.continuousQueries().onEntryUpdated(
-                             lsnrCol,
-                             key,
-                             null,
-                             old,
-                             internal,
-                             partition(),
-                             tx.local(),
-                             false,
-                             updateCntr0,
-                             null,
-                             topVer);
-                 }
+                 cctx.events().addEvent(partition(),
+                     key,
+                     evtNodeId,
+                     tx == null ? null : tx.xid(), newVer,
+                     EVT_CACHE_OBJECT_REMOVED,
+                     null,
+                     false,
+                     evtOld,
+                     evtOld != null || hasValueUnlocked(),
+                     subjId,
+                     null,
+                     taskName,
+                     keepBinary);
+             }
  
-                 cctx.dataStructures().onEntryUpdated(key, true, keepBinary);
+             if (lsnrCol != null) {
+                 cctx.continuousQueries().onEntryUpdated(
+                     lsnrCol,
+                     key,
+                     null,
+                     old,
+                     internal,
+                     partition(),
+                     tx.local(),
+                     false,
+                     updateCntr0,
+                     null,
+                     topVer);
+             }
  
-                 deferred = cctx.deferredDelete() && !detached() && !isInternal();
+             cctx.dataStructures().onEntryUpdated(key, true, keepBinary);
  
-                 if (intercept)
-                     entry0.updateCounter(updateCntr0);
+             deferred = cctx.deferredDelete() && !detached() && !isInternal();
  
-                 if (!deferred) {
-                     // If entry is still removed.
-                     assert newVer == ver;
+             if (intercept)
+                 entry0.updateCounter(updateCntr0);
  
-                     if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
-                         if (log.isDebugEnabled())
-                             log.debug("Entry could not be marked obsolete (it is still used): " + this);
-                     } else {
-                         recordNodeId(affNodeId, topVer);
+             if (!deferred) {
+                 // If entry is still removed.
+                 assert newVer == ver;
  
-                         if (log.isDebugEnabled())
-                             log.debug("Entry was marked obsolete: " + this);
-                     }
+                 if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) {
+                     if (log.isDebugEnabled())
+                         log.debug("Entry could not be marked obsolete (it is still used): " + this);
+                 }
+                 else {
+                     recordNodeId(affNodeId, topVer);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Entry was marked obsolete: " + this);
                  }
              }
-         } finally {
-             cctx.continuousQueries().getListenerReadLock().unlock();
+         }
+         finally {
+             unlockEntry();
++            unlockListenerReadLock();
          }
  
          if (deferred)
@@@ -1320,659 -1381,655 +1391,659 @@@
          IgniteBiTuple<Boolean, ?> interceptorRes = null;
  
          EntryProcessorResult<Object> invokeRes = null;
-         
-         cctx.continuousQueries().getListenerReadLock().lock();
+ 
++        lockListenerReadLock();
+         lockEntry();
+ 
          try {
+             boolean internal = isInternal() || !context().userCache();
  
- 	        synchronized (this) {
- 	            boolean internal = isInternal() || !context().userCache();
- 	
- 	            Map<UUID, CacheContinuousQueryListener> lsnrCol =
- 	                cctx.continuousQueries().updateListeners(internal, false);
- 	
- 	            boolean needVal = retval ||
- 	                intercept ||
- 	                op == GridCacheOperation.TRANSFORM ||
- 	                !F.isEmpty(filter) ||
- 	                lsnrCol != null;
- 	
- 	            checkObsolete();
- 	
- 	            CacheDataRow oldRow = null;
- 	
- 	            // Load and remove from swap if it is new.
- 	            if (isNew())
- 	                oldRow = unswap(null, false);
- 	
- 	            old = val;
- 	
- 	            boolean readFromStore = false;
- 	
- 	            Object old0 = null;
- 	
- 	            if (readThrough && needVal && old == null &&
- 	                (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
- 	                    old0 = readThrough(null, key, false, subjId, taskName);
- 	
- 	                old = cctx.toCacheObject(old0);
- 	
- 	                long ttl = CU.TTL_ETERNAL;
- 	                long expireTime = CU.EXPIRE_TIME_ETERNAL;
- 	
- 	                if (expiryPlc != null && old != null) {
- 	                    ttl = CU.toTtl(expiryPlc.getExpiryForCreation());
- 	
- 	                    if (ttl == CU.TTL_ZERO) {
- 	                        ttl = CU.TTL_MINIMUM;
- 	                        expireTime = CU.expireTimeInPast();
- 	                    }
- 	                    else if (ttl == CU.TTL_NOT_CHANGED)
- 	                        ttl = CU.TTL_ETERNAL;
- 	                    else
- 	                        expireTime = CU.toExpireTime(ttl);
- 	                }
- 	
- 	                // Detach value before index update.
- 	                old = cctx.kernalContext().cacheObjects().prepareForCache(old, cctx);
- 	
- 	                if (old != null)
- 	                    storeValue(old, expireTime, ver, oldRow);
- 	                else
- 	                    removeValue();
- 	
- 	                update(old, expireTime, ttl, ver, true);
- 	            }
- 	
- 	            // Apply metrics.
- 	            if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
- 	                // PutIfAbsent methods mustn't update hit/miss statistics
- 	                if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
- 	                    cctx.cache().metrics0().onRead(old != null);
- 	            }
- 	
- 	            // Check filter inside of synchronization.
- 	            if (!F.isEmpty(filter)) {
- 	                boolean pass = cctx.isAllLocked(this, filter);
- 	
- 	                if (!pass) {
- 	                    if (expiryPlc != null && !readFromStore && !cctx.putIfAbsentFilter(filter) && hasValueUnlocked())
- 	                        updateTtl(expiryPlc);
- 	
- 	                    Object val = retval ?
- 	                        cctx.cacheObjectContext().unwrapBinaryIfNeeded(CU.value(old, cctx, false), keepBinary, false)
- 	                        : null;
- 	
- 	                    return new T3<>(false, val, null);
- 	                }
- 	            }
- 	
- 	            String transformCloClsName = null;
- 	
- 	            CacheObject updated;
- 	
- 	            Object key0 = null;
- 	            Object updated0 = null;
- 	
- 	            // Calculate new value.
- 	            if (op == GridCacheOperation.TRANSFORM) {
- 	                transformCloClsName = EntryProcessorResourceInjectorProxy.unwrap(writeObj).getClass().getName();
- 	
- 	                EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
- 	
- 	                assert entryProcessor != null;
- 	
- 	                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(key, old, version(), keepBinary, this);
- 	
- 	                try {
- 	                    Object computed = entryProcessor.process(entry, invokeArgs);
- 	
- 	                    if (entry.modified()) {
- 	                        updated0 = cctx.unwrapTemporary(entry.getValue());
- 	
- 	                        updated = cctx.toCacheObject(updated0);
- 	
- 	                        if (updated != null) // no validation for remove case
- 	                            cctx.validateKeyAndValue(key, updated);
- 	                    }
- 	                    else
- 	                        updated = old;
- 	
- 	                    key0 = entry.key();
- 	
- 	                    invokeRes = computed != null ? CacheInvokeResult.fromResult(cctx.unwrapTemporary(computed)) : null;
- 	                }
- 	                catch (Exception e) {
- 	                    updated = old;
- 	
- 	                    invokeRes = CacheInvokeResult.fromError(e);
- 	                }
- 	
- 	                if (!entry.modified()) {
- 	                    if (expiryPlc != null && !readFromStore && hasValueUnlocked())
- 	                        updateTtl(expiryPlc);
- 	
- 	                    return new GridTuple3<>(false, null, invokeRes);
- 	                }
- 	            }
- 	            else
- 	                updated = (CacheObject)writeObj;
- 	
- 	            op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
- 	
- 	            if (intercept) {
- 	                CacheLazyEntry e;
- 	
- 	                if (op == GridCacheOperation.UPDATE) {
- 	                    updated0 = value(updated0, updated, keepBinary, false);
- 	
- 	                    e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary);
- 	
- 	                    Object interceptorVal = cctx.config().getInterceptor().onBeforePut(e, updated0);
- 	
- 	                    if (interceptorVal == null)
- 	                        return new GridTuple3<>(false, cctx.unwrapTemporary(value(old0, old, keepBinary, false)), invokeRes);
- 	                    else {
- 	                        updated0 = cctx.unwrapTemporary(interceptorVal);
- 	
- 	                        updated = cctx.toCacheObject(updated0);
- 	                    }
- 	                }
- 	                else {
- 	                    e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary);
- 	
- 	                    interceptorRes = cctx.config().getInterceptor().onBeforeRemove(e);
- 	
- 	                    if (cctx.cancelRemove(interceptorRes))
- 	                        return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes);
- 	                }
- 	
- 	                key0 = e.key();
- 	                old0 = e.value();
- 	            }
- 	
- 	            boolean hadVal = hasValueUnlocked();
- 	
- 	            long ttl = CU.TTL_ETERNAL;
- 	            long expireTime = CU.EXPIRE_TIME_ETERNAL;
- 	
- 	            if (op == GridCacheOperation.UPDATE) {
- 	                if (expiryPlc != null) {
- 	                    ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation());
- 	
- 	                    if (ttl == CU.TTL_NOT_CHANGED) {
- 	                        ttl = ttlExtras();
- 	                        expireTime = expireTimeExtras();
- 	                    }
- 	                    else if (ttl != CU.TTL_ZERO)
- 	                        expireTime = CU.toExpireTime(ttl);
- 	                }
- 	                else {
- 	                    ttl = ttlExtras();
- 	                    expireTime = expireTimeExtras();
- 	                }
- 	            }
- 	
- 	            if (ttl == CU.TTL_ZERO)
- 	                op = GridCacheOperation.DELETE;
- 	
- 	            // Try write-through.
- 	            if (op == GridCacheOperation.UPDATE) {
- 	                // Detach value before index update.
- 	                updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
- 	
- 	                if (writeThrough)
- 	                    // Must persist inside synchronization in non-tx mode.
- 	                    cctx.store().put(null, key, updated, ver);
- 	
- 	                storeValue(updated, expireTime, ver, oldRow);
- 	
- 	                assert ttl != CU.TTL_ZERO;
- 	
- 	                update(updated, expireTime, ttl, ver, true);
- 	
- 	                if (evt) {
- 	                    CacheObject evtOld = null;
- 	
- 	                    if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- 	                        evtOld = cctx.unwrapTemporary(old);
- 	
- 	                        cctx.events().addEvent(partition(), key, cctx.localNodeId(), null,
- 	                            (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
- 	                            evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary);
- 	                    }
- 	
- 	                    if (cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
- 	                        if (evtOld == null)
- 	                            evtOld = cctx.unwrapTemporary(old);
- 	
- 	                        cctx.events().addEvent(partition(), key, cctx.localNodeId(), null,
- 	                            (GridCacheVersion)null, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
- 	                            evtOld != null || hadVal, subjId, null, taskName, keepBinary);
- 	                    }
- 	                }
- 	            }
- 	            else {
- 	                if (writeThrough)
- 	                    // Must persist inside synchronization in non-tx mode.
- 	                    cctx.store().remove(null, key);
- 	
- 	                removeValue();
- 	
- 	                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
- 	
- 	                if (evt) {
- 	                    CacheObject evtOld = null;
- 	
- 	                    if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
- 	                        cctx.events().addEvent(partition(), key, cctx.localNodeId(), null,
- 	                            (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
- 	                            evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary);
- 	
- 	                    if (cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
- 	                        if (evtOld == null)
- 	                            evtOld = cctx.unwrapTemporary(old);
- 	
- 	                        cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, (GridCacheVersion)null,
- 	                            EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, subjId, null,
- 	                            taskName, keepBinary);
- 	                    }
- 	                }
- 	
- 	                res = hadVal;
- 	            }
- 	
- 	            if (res)
- 	                updateMetrics(op, metrics);
- 	
- 	            if (lsnrCol != null) {
- 	                long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null);
- 	
- 	                cctx.continuousQueries().onEntryUpdated(
- 	                    lsnrCol,
- 	                    key,
- 	                    val,
- 	                    old,
- 	                    internal,
- 	                    partition(),
- 	                    true,
- 	                    false,
- 	                    updateCntr,
- 	                    null,
- 	                    AffinityTopologyVersion.NONE);
- 	
- 	                onUpdateFinished(updateCntr);
- 	            }
- 	
- 	            cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
- 	
- 	            if (intercept) {
- 	                if (op == GridCacheOperation.UPDATE)
- 	                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L));
- 	                else
- 	                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L));
- 	            }
- 	        } 
-         } finally {
-         	cctx.continuousQueries().getListenerReadLock().unlock();
-         }
+             Map<UUID, CacheContinuousQueryListener> lsnrCol =
+                 cctx.continuousQueries().updateListeners(internal, false);
  
-         return new GridTuple3<>(res,
-             cctx.unwrapTemporary(interceptorRes != null ?
-                 interceptorRes.get2() :
-                 cctx.cacheObjectContext().unwrapBinaryIfNeeded(old, keepBinary, false)),
-             invokeRes);
-     }
+             boolean needVal = retval ||
+                 intercept ||
+                 op == GridCacheOperation.TRANSFORM ||
+                 !F.isEmpty(filter) ||
+                 lsnrCol != null;
  
-     /** {@inheritDoc} */
-     @SuppressWarnings("unchecked")
-     @Override public GridCacheUpdateAtomicResult innerUpdate(
-         final GridCacheVersion newVer,
-         final UUID evtNodeId,
-         final UUID affNodeId,
-         final GridCacheOperation op,
-         @Nullable final Object writeObj,
-         @Nullable final Object[] invokeArgs,
-         final boolean writeThrough,
-         final boolean readThrough,
-         final boolean retval,
-         final boolean keepBinary,
-         @Nullable final IgniteCacheExpiryPolicy expiryPlc,
-         final boolean evt,
-         final boolean metrics,
-         final boolean primary,
-         final boolean verCheck,
-         final AffinityTopologyVersion topVer,
-         @Nullable final CacheEntryPredicate[] filter,
-         final GridDrType drType,
-         final long explicitTtl,
-         final long explicitExpireTime,
-         @Nullable final GridCacheVersion conflictVer,
-         final boolean conflictResolve,
-         final boolean intercept,
-         @Nullable final UUID subjId,
-         final String taskName,
-         @Nullable final CacheObject prevVal,
-         @Nullable final Long updateCntr,
-         @Nullable final GridDhtAtomicAbstractUpdateFuture fut
-     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
-         assert cctx.atomic() && !detached();
+             checkObsolete();
  
-         AtomicCacheUpdateClosure c;
+             CacheDataRow oldRow = null;
  
-         if (!primary && !isNear())
-             ensureFreeSpace();
+             // Load and remove from swap if it is new.
+             if (isNew())
+                 oldRow = unswap(null, false);
  
-         cctx.continuousQueries().getListenerReadLock().lock();
- 	    try {
- 	        synchronized (this) {
- 	            checkObsolete();
- 	
- 	            boolean internal = isInternal() || !context().userCache();
- 	
- 	            Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
- 	
- 	            boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
- 	                || !F.isEmptyOrNulls(filter);
- 	
- 	            // Possibly read value from store.
- 	            boolean readFromStore = readThrough && needVal && (cctx.readThrough() &&
- 	                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()));
- 	
- 	            c = new AtomicCacheUpdateClosure(this,
- 	                topVer,
- 	                newVer,
- 	                op,
- 	                writeObj,
- 	                invokeArgs,
- 	                readFromStore,
- 	                writeThrough,
- 	                keepBinary,
- 	                expiryPlc,
- 	                primary,
- 	                verCheck,
- 	                filter,
- 	                explicitTtl,
- 	                explicitExpireTime,
- 	                conflictVer,
- 	                conflictResolve,
- 	                intercept,
- 	                updateCntr);
- 	
- 	            key.valueBytes(cctx.cacheObjectContext());
- 	
- 	            if (isNear()) {
- 	                CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null;
- 	
- 	                c.call(dataRow);
- 	            }
- 	            else
- 	                cctx.offheap().invoke(cctx, key, localPartition(), c);
- 	
- 	            GridCacheUpdateAtomicResult updateRes = c.updateRes;
- 	
- 	            assert updateRes != null : c;
- 	
- 	            CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null;
- 	            CacheObject updateVal = null;
- 	            GridCacheVersion updateVer = c.newVer;
- 	
- 	            // Apply metrics.
- 	            if (metrics &&
- 	                updateRes.outcome().updateReadMetrics() &&
- 	                cctx.cache().configuration().isStatisticsEnabled() &&
- 	                needVal) {
- 	                // PutIfAbsent methods must not update hit/miss statistics.
- 	                if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
- 	                    cctx.cache().metrics0().onRead(oldVal != null);
- 	            }
- 	
- 	            switch (updateRes.outcome()) {
- 	                case VERSION_CHECK_FAILED: {
- 	                    if (!cctx.isNear()) {
- 	                        CacheObject evtVal;
- 	
- 	                        if (op == GridCacheOperation.TRANSFORM) {
- 	                            EntryProcessor<Object, Object, ?> entryProcessor =
- 	                                (EntryProcessor<Object, Object, ?>)writeObj;
- 	
- 	                            CacheInvokeEntry<Object, Object> entry =
- 	                                new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
- 	
- 	                            try {
- 	                                entryProcessor.process(entry, invokeArgs);
- 	
- 	                                evtVal = entry.modified() ?
- 	                                    cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
- 	                            }
- 	                            catch (Exception ignore) {
- 	                                evtVal = prevVal;
- 	                            }
- 	                        }
- 	                        else
- 	                            evtVal = (CacheObject)writeObj;
- 	
- 	                        long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr);
- 	
- 	                        if (updateCntr != null)
- 	                            updateCntr0 = updateCntr;
- 	
- 	                        onUpdateFinished(updateCntr0);
- 	
- 	                        cctx.continuousQueries().onEntryUpdated(
- 	                            key,
- 	                            evtVal,
- 	                            prevVal,
- 	                            isInternal() || !context().userCache(),
- 	                            partition(),
- 	                            primary,
- 	                            false,
- 	                            updateCntr0,
- 	                            null,
- 	                            topVer);
- 	                    }
- 	
- 	                    return updateRes;
- 	                }
- 	
- 	                case CONFLICT_USE_OLD:
- 	                case FILTER_FAILED:
- 	                case INVOKE_NO_OP:
- 	                case INTERCEPTOR_CANCEL:
- 	                    return updateRes;
- 	            }
- 	
- 	            assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL;
- 	
- 	            CacheObject evtOld = null;
- 	
- 	            if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
- 	                assert writeObj instanceof EntryProcessor : writeObj;
- 	
- 	                evtOld = cctx.unwrapTemporary(oldVal);
- 	
- 	                Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj);
- 	
- 	                cctx.events().addEvent(partition(),
- 	                    key,
- 	                    evtNodeId,
- 	                    null,
- 	                    newVer,
- 	                    EVT_CACHE_OBJECT_READ,
- 	                    evtOld, evtOld != null,
- 	                    evtOld, evtOld != null,
- 	                    subjId,
- 	                    transformClo.getClass().getName(),
- 	                    taskName,
- 	                    keepBinary);
- 	            }
- 	
- 	            if (c.op == GridCacheOperation.UPDATE) {
- 	                updateVal = val;
- 	
- 	                assert updateVal != null : c;
- 	
- 	                drReplicate(drType, updateVal, updateVer, topVer);
- 	
- 	                recordNodeId(affNodeId, topVer);
- 	
- 	                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
- 	                    if (evtOld == null)
- 	                        evtOld = cctx.unwrapTemporary(oldVal);
- 	
- 	                    cctx.events().addEvent(partition(),
- 	                        key,
- 	                        evtNodeId,
- 	                        null,
- 	                        newVer,
- 	                        EVT_CACHE_OBJECT_PUT,
- 	                        updateVal,
- 	                        true,
- 	                        evtOld,
- 	                        evtOld != null,
- 	                        subjId,
- 	                        null,
- 	                        taskName,
- 	                        keepBinary);
- 	                }
- 	            }
- 	            else {
- 	                assert c.op == GridCacheOperation.DELETE : c.op;
- 	
- 	                clearReaders();
- 	
- 	                drReplicate(drType, null, newVer, topVer);
- 	
- 	                recordNodeId(affNodeId, topVer);
- 	
- 	                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
- 	                    if (evtOld == null)
- 	                        evtOld = cctx.unwrapTemporary(oldVal);
- 	
- 	                    cctx.events().addEvent(partition(),
- 	                        key,
- 	                        evtNodeId,
- 	                        null, newVer,
- 	                        EVT_CACHE_OBJECT_REMOVED,
- 	                        null, false,
- 	                        evtOld, evtOld != null,
- 	                        subjId,
- 	                        null,
- 	                        taskName,
- 	                        keepBinary);
- 	                }
- 	            }
- 	
- 	            if (updateRes.success())
- 	                updateMetrics(c.op, metrics);
- 	
- 	            Map<UUID, CacheContinuousQueryListener> curLsnrs;
- 	
- 	            // Continuous query filter should be perform under lock.
- 	            if (lsnrs != null) {
- 	                CacheObject evtVal = cctx.unwrapTemporary(updateVal);
- 	                CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
- 	
- 	                cctx.continuousQueries().onEntryUpdated(lsnrs,
- 	                    key,
- 	                    evtVal,
- 	                    evtOldVal,
- 	                    internal,
- 	                    partition(),
- 	                    primary,
- 	                    false,
- 	                    c.updateRes.updateCounter(),
- 	                    fut,
- 	                    topVer);
- 	            }
- 	
- 	            cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
- 	
- 	            if (intercept) {
- 	                if (c.op == GridCacheOperation.UPDATE) {
- 	                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
- 	                        cctx,
- 	                        key,
- 	                        null,
- 	                        updateVal,
- 	                        null,
- 	                        keepBinary,
- 	                        c.updateRes.updateCounter()));
- 	                }
- 	                else {
- 	                    assert c.op == GridCacheOperation.DELETE : c.op;
- 	
- 	                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
- 	                        cctx,
- 	                        key,
- 	                        null,
- 	                        oldVal,
- 	                        null,
- 	                        keepBinary,
- 	                        c.updateRes.updateCounter()));
- 	                }
- 	            }
- 	        }
- 	    } finally {
-             cctx.continuousQueries().getListenerReadLock().unlock();
-         }
+             old = val;
  
-         onUpdateFinished(c.updateRes.updateCounter());
+             boolean readFromStore = false;
  
-         return c.updateRes;
-     }
+             Object old0 = null;
  
-     /**
-      * @param val Value.
-      * @param cacheObj Cache object.
-      * @param keepBinary Keep binary flag.
-      * @param cpy Copy flag.
-      * @return Cache object value.
-      */
-     @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
-         if (val != null)
-             return val;
+             if (readThrough && needVal && old == null &&
+                 (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
+                 old0 = readThrough(null, key, false, subjId, taskName);
  
-         return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
-     }
+                 old = cctx.toCacheObject(old0);
  
-     /**
-      * @param expiry Expiration policy.
-      * @return Tuple holding initial TTL and expire time with the given expiry.
-      */
-     private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
-         assert expiry != null;
+                 long ttl = CU.TTL_ETERNAL;
+                 long expireTime = CU.EXPIRE_TIME_ETERNAL;
  
-         long initTtl = expiry.forCreate();
-         long initExpireTime;
+                 if (expiryPlc != null && old != null) {
+                     ttl = CU.toTtl(expiryPlc.getExpiryForCreation());
  
-         if (initTtl == CU.TTL_ZERO) {
-             initTtl = CU.TTL_MINIMUM;
-             initExpireTime = CU.expireTimeInPast();
-         }
-         else if (initTtl == CU.TTL_NOT_CHANGED) {
-             initTtl = CU.TTL_ETERNAL;
-             initExpireTime = CU.EXPIRE_TIME_ETERNAL;
-         }
-         else
-             initExpireTime = CU.toExpireTime(initTtl);
+                     if (ttl == CU.TTL_ZERO) {
+                         ttl = CU.TTL_MINIMUM;
+                         expireTime = CU.expireTimeInPast();
+                     }
+                     else if (ttl == CU.TTL_NOT_CHANGED)
+                         ttl = CU.TTL_ETERNAL;
+                     else
+                         expireTime = CU.toExpireTime(ttl);
+                 }
  
-         return F.t(initTtl, initExpireTime);
-     }
+                 // Detach value before index update.
+                 old = cctx.kernalContext().cacheObjects().prepareForCache(old, cctx);
  
-     /**
-      * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time.
-      *
-      * @param expiry Expiration policy.
-      * @param ttl Explicit TTL.
-      * @param expireTime Explicit expire time.
-      * @return Result.
-      */
-     private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
-         assert !obsolete();
+                 if (old != null)
+                     storeValue(old, expireTime, ver, oldRow);
+                 else
+                     removeValue();
  
-         boolean rmv = false;
+                 update(old, expireTime, ttl, ver, true);
+             }
  
-         // 1. If TTL is not changed, then calculate it based on expiry.
+             // Apply metrics.
+             if (metrics && cctx.statisticsEnabled() && needVal) {
+                 // PutIfAbsent methods mustn't update hit/miss statistics
+                 if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
+                     cctx.cache().metrics0().onRead(old != null);
+             }
+ 
+             // Check filter inside of synchronization.
+             if (!F.isEmpty(filter)) {
+                 boolean pass = cctx.isAllLocked(this, filter);
+ 
+                 if (!pass) {
+                     if (expiryPlc != null && !readFromStore && !cctx.putIfAbsentFilter(filter) && hasValueUnlocked())
+                         updateTtl(expiryPlc);
+ 
+                     Object val = retval ?
+                         cctx.cacheObjectContext().unwrapBinaryIfNeeded(CU.value(old, cctx, false), keepBinary, false)
+                         : null;
+ 
+                     return new T3<>(false, val, null);
+                 }
+             }
+ 
+             String transformCloClsName = null;
+ 
+             CacheObject updated;
+ 
+             Object key0 = null;
+             Object updated0 = null;
+ 
+             // Calculate new value.
+             if (op == GridCacheOperation.TRANSFORM) {
+                 transformCloClsName = EntryProcessorResourceInjectorProxy.unwrap(writeObj).getClass().getName();
+ 
+                 EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+ 
+                 assert entryProcessor != null;
+ 
+                 CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(key, old, version(), keepBinary, this);
+ 
+                 try {
+                     Object computed = entryProcessor.process(entry, invokeArgs);
+ 
+                     if (entry.modified()) {
+                         updated0 = cctx.unwrapTemporary(entry.getValue());
+ 
+                         updated = cctx.toCacheObject(updated0);
+ 
+                         cctx.validateKeyAndValue(key, updated);
+                     }
+                     else
+                         updated = old;
+ 
+                     key0 = entry.key();
+ 
+                     invokeRes = computed != null ? CacheInvokeResult.fromResult(cctx.unwrapTemporary(computed)) : null;
+                 }
+                 catch (Exception e) {
+                     updated = old;
+ 
+                     invokeRes = CacheInvokeResult.fromError(e);
+                 }
+ 
+                 if (!entry.modified()) {
+                     if (expiryPlc != null && !readFromStore && hasValueUnlocked())
+                         updateTtl(expiryPlc);
+ 
+                     return new GridTuple3<>(false, null, invokeRes);
+                 }
+             }
+             else
+                 updated = (CacheObject)writeObj;
+ 
+             op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
+ 
+             if (intercept) {
+                 CacheLazyEntry e;
+ 
+                 if (op == GridCacheOperation.UPDATE) {
+                     updated0 = value(updated0, updated, keepBinary, false);
+ 
+                     e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary);
+ 
+                     Object interceptorVal = cctx.config().getInterceptor().onBeforePut(e, updated0);
+ 
+                     if (interceptorVal == null)
+                         return new GridTuple3<>(false, cctx.unwrapTemporary(value(old0, old, keepBinary, false)), invokeRes);
+                     else {
+                         updated0 = cctx.unwrapTemporary(interceptorVal);
+ 
+                         updated = cctx.toCacheObject(updated0);
+                     }
+                 }
+                 else {
+                     e = new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary);
+ 
+                     interceptorRes = cctx.config().getInterceptor().onBeforeRemove(e);
+ 
+                     if (cctx.cancelRemove(interceptorRes))
+                         return new GridTuple3<>(false, cctx.unwrapTemporary(interceptorRes.get2()), invokeRes);
+                 }
+ 
+                 key0 = e.key();
+                 old0 = e.value();
+             }
+ 
+             boolean hadVal = hasValueUnlocked();
+ 
+             long ttl = CU.TTL_ETERNAL;
+             long expireTime = CU.EXPIRE_TIME_ETERNAL;
+ 
+             if (op == GridCacheOperation.UPDATE) {
+                 if (expiryPlc != null) {
+                     ttl = CU.toTtl(hadVal ? expiryPlc.getExpiryForUpdate() : expiryPlc.getExpiryForCreation());
+ 
+                     if (ttl == CU.TTL_NOT_CHANGED) {
+                         ttl = ttlExtras();
+                         expireTime = expireTimeExtras();
+                     }
+                     else if (ttl != CU.TTL_ZERO)
+                         expireTime = CU.toExpireTime(ttl);
+                 }
+                 else {
+                     ttl = ttlExtras();
+                     expireTime = expireTimeExtras();
+                 }
+             }
+ 
+             if (ttl == CU.TTL_ZERO)
+                 op = GridCacheOperation.DELETE;
+ 
+             // Try write-through.
+             if (op == GridCacheOperation.UPDATE) {
+                 // Detach value before index update.
+                 updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
+ 
+                 if (writeThrough)
+                     // Must persist inside synchronization in non-tx mode.
+                     cctx.store().put(null, key, updated, ver);
+ 
+                 storeValue(updated, expireTime, ver, oldRow);
+ 
+                 assert ttl != CU.TTL_ZERO;
+ 
+                 update(updated, expireTime, ttl, ver, true);
+ 
+                 if (evt) {
+                     CacheObject evtOld = null;
+ 
+                     if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                         evtOld = cctx.unwrapTemporary(old);
+ 
+                         cctx.events().addEvent(partition(), key, cctx.localNodeId(), null,
+                             (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
+                             evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary);
+                     }
+ 
+                     if (cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+                         if (evtOld == null)
+                             evtOld = cctx.unwrapTemporary(old);
+ 
+                         cctx.events().addEvent(partition(), key, cctx.localNodeId(), null,
+                             (GridCacheVersion)null, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
+                             evtOld != null || hadVal, subjId, null, taskName, keepBinary);
+                     }
+                 }
+             }
+             else {
+                 if (writeThrough)
+                     // Must persist inside synchronization in non-tx mode.
+                     cctx.store().remove(null, key);
+ 
+                 removeValue();
+ 
+                 update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver, true);
+ 
+                 if (evt) {
+                     CacheObject evtOld = null;
+ 
+                     if (transformCloClsName != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ))
+                         cctx.events().addEvent(partition(), key, cctx.localNodeId(), null,
+                             (GridCacheVersion)null, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
+                             evtOld != null || hadVal, subjId, transformCloClsName, taskName, keepBinary);
+ 
+                     if (cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+                         if (evtOld == null)
+                             evtOld = cctx.unwrapTemporary(old);
+ 
+                         cctx.events().addEvent(partition(), key, cctx.localNodeId(), null, (GridCacheVersion)null,
+                             EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal, subjId, null,
+                             taskName, keepBinary);
+                     }
+                 }
+ 
+                 res = hadVal;
+             }
+ 
+             if (res)
+                 updateMetrics(op, metrics);
+ 
+             if (lsnrCol != null) {
+                 long updateCntr = nextPartitionCounter(AffinityTopologyVersion.NONE, true, null);
+ 
+                 cctx.continuousQueries().onEntryUpdated(
+                     lsnrCol,
+                     key,
+                     val,
+                     old,
+                     internal,
+                     partition(),
+                     true,
+                     false,
+                     updateCntr,
+                     null,
+                     AffinityTopologyVersion.NONE);
+ 
+                 onUpdateFinished(updateCntr);
+             }
+ 
+             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
+ 
+             if (intercept) {
+                 if (op == GridCacheOperation.UPDATE)
+                     cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L));
+                 else
+                     cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(cctx, key, key0, old, old0, keepBinary, 0L));
+             }
+         }
+         finally {
+             unlockEntry();
++            unlockListenerReadLock();
+         }
+ 
+         return new GridTuple3<>(res,
+             cctx.unwrapTemporary(interceptorRes != null ?
+                 interceptorRes.get2() :
+                 cctx.cacheObjectContext().unwrapBinaryIfNeeded(old, keepBinary, false)),
+             invokeRes);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public GridCacheUpdateAtomicResult innerUpdate(
+         final GridCacheVersion newVer,
+         final UUID evtNodeId,
+         final UUID affNodeId,
+         final GridCacheOperation op,
+         @Nullable final Object writeObj,
+         @Nullable final Object[] invokeArgs,
+         final boolean writeThrough,
+         final boolean readThrough,
+         final boolean retval,
+         final boolean keepBinary,
+         @Nullable final IgniteCacheExpiryPolicy expiryPlc,
+         final boolean evt,
+         final boolean metrics,
+         final boolean primary,
+         final boolean verCheck,
+         final AffinityTopologyVersion topVer,
+         @Nullable final CacheEntryPredicate[] filter,
+         final GridDrType drType,
+         final long explicitTtl,
+         final long explicitExpireTime,
+         @Nullable final GridCacheVersion conflictVer,
+         final boolean conflictResolve,
+         final boolean intercept,
+         @Nullable final UUID subjId,
+         final String taskName,
+         @Nullable final CacheObject prevVal,
+         @Nullable final Long updateCntr,
+         @Nullable final GridDhtAtomicAbstractUpdateFuture fut
+     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
+         assert cctx.atomic() && !detached();
+ 
+         AtomicCacheUpdateClosure c;
+ 
+         if (!primary && !isNear())
+             ensureFreeSpace();
+ 
++        lockListenerReadLock();
+         lockEntry();
+ 
+         try {
+             checkObsolete();
+ 
+             boolean internal = isInternal() || !context().userCache();
+ 
+             Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
+ 
+             boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
+                 || !F.isEmptyOrNulls(filter);
+ 
+             // Possibly read value from store.
+             boolean readFromStore = readThrough && needVal && (cctx.readThrough() &&
+                 (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()));
+ 
+             c = new AtomicCacheUpdateClosure(this,
+                 topVer,
+                 newVer,
+                 op,
+                 writeObj,
+                 invokeArgs,
+                 readFromStore,
+                 writeThrough,
+                 keepBinary,
+                 expiryPlc,
+                 primary,
+                 verCheck,
+                 filter,
+                 explicitTtl,
+                 explicitExpireTime,
+                 conflictVer,
+                 conflictResolve,
+                 intercept,
+                 updateCntr);
+ 
+             key.valueBytes(cctx.cacheObjectContext());
+ 
+             if (isNear()) {
+                 CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null;
+ 
+                 c.call(dataRow);
+             }
+             else
+                 cctx.offheap().invoke(cctx, key, localPartition(), c);
+ 
+             GridCacheUpdateAtomicResult updateRes = c.updateRes;
+ 
+             assert updateRes != null : c;
+ 
+             CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null;
+             CacheObject updateVal = null;
+             GridCacheVersion updateVer = c.newVer;
+ 
+             // Apply metrics.
+             if (metrics &&
+                 updateRes.outcome().updateReadMetrics() &&
+                 cctx.statisticsEnabled() &&
+                 needVal) {
+                 // PutIfAbsent methods must not update hit/miss statistics.
+                 if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
+                     cctx.cache().metrics0().onRead(oldVal != null);
+             }
+ 
+             switch (updateRes.outcome()) {
+                 case VERSION_CHECK_FAILED: {
+                     if (!cctx.isNear()) {
+                         CacheObject evtVal;
+ 
+                         if (op == GridCacheOperation.TRANSFORM) {
+                             EntryProcessor<Object, Object, ?> entryProcessor =
+                                 (EntryProcessor<Object, Object, ?>)writeObj;
+ 
+                             CacheInvokeEntry<Object, Object> entry =
+                                 new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
+ 
+                             try {
+                                 entryProcessor.process(entry, invokeArgs);
+ 
+                                 evtVal = entry.modified() ?
+                                     cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+                             }
+                             catch (Exception ignore) {
+                                 evtVal = prevVal;
+                             }
+                         }
+                         else
+                             evtVal = (CacheObject)writeObj;
+ 
+                         long updateCntr0 = nextPartitionCounter(topVer, primary, updateCntr);
+ 
+                         if (updateCntr != null)
+                             updateCntr0 = updateCntr;
+ 
+                         onUpdateFinished(updateCntr0);
+ 
+                         cctx.continuousQueries().onEntryUpdated(
+                             key,
+                             evtVal,
+                             prevVal,
+                             isInternal() || !context().userCache(),
+                             partition(),
+                             primary,
+                             false,
+                             updateCntr0,
+                             null,
+                             topVer);
+                     }
+ 
+                     return updateRes;
+                 }
+ 
+                 case CONFLICT_USE_OLD:
+                 case FILTER_FAILED:
+                 case INVOKE_NO_OP:
+                 case INTERCEPTOR_CANCEL:
+                     return updateRes;
+             }
+ 
+             assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL;
+ 
+             CacheObject evtOld = null;
+ 
+             if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                 assert writeObj instanceof EntryProcessor : writeObj;
+ 
+                 evtOld = cctx.unwrapTemporary(oldVal);
+ 
+                 Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj);
+ 
+                 cctx.events().addEvent(partition(),
+                     key,
+                     evtNodeId,
+                     null,
+                     newVer,
+                     EVT_CACHE_OBJECT_READ,
+                     evtOld, evtOld != null,
+                     evtOld, evtOld != null,
+                     subjId,
+                     transformClo.getClass().getName(),
+                     taskName,
+                     keepBinary);
+             }
+ 
+             if (c.op == GridCacheOperation.UPDATE) {
+                 updateVal = val;
+ 
+                 assert updateVal != null : c;
+ 
+                 drReplicate(drType, updateVal, updateVer, topVer);
+ 
+                 recordNodeId(affNodeId, topVer);
+ 
+                 if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+                     if (evtOld == null)
+                         evtOld = cctx.unwrapTemporary(oldVal);
+ 
+                     cctx.events().addEvent(partition(),
+                         key,
+                         evtNodeId,
+                         null,
+                         newVer,
+                         EVT_CACHE_OBJECT_PUT,
+                         updateVal,
+                         true,
+                         evtOld,
+                         evtOld != null,
+                         subjId,
+                         null,
+                         taskName,
+                         keepBinary);
+                 }
+             }
+             else {
+                 assert c.op == GridCacheOperation.DELETE : c.op;
+ 
+                 clearReaders();
+ 
+                 drReplicate(drType, null, newVer, topVer);
+ 
+                 recordNodeId(affNodeId, topVer);
+ 
+                 if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+                     if (evtOld == null)
+                         evtOld = cctx.unwrapTemporary(oldVal);
+ 
+                     cctx.events().addEvent(partition(),
+                         key,
+                         evtNodeId,
+                         null, newVer,
+                         EVT_CACHE_OBJECT_REMOVED,
+                         null, false,
+                         evtOld, evtOld != null,
+                         subjId,
+                         null,
+                         taskName,
+                         keepBinary);
+                 }
+             }
+ 
+             if (updateRes.success())
+                 updateMetrics(c.op, metrics);
+ 
+             // Continuous query filter should be perform under lock.
+             if (lsnrs != null) {
+                 CacheObject evtVal = cctx.unwrapTemporary(updateVal);
+                 CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
+ 
+                 cctx.continuousQueries().onEntryUpdated(lsnrs,
+                     key,
+                     evtVal,
+                     evtOldVal,
+                     internal,
+                     partition(),
+                     primary,
+                     false,
+                     c.updateRes.updateCounter(),
+                     fut,
+                     topVer);
+             }
+ 
+             cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
+ 
+             if (intercept) {
+                 if (c.op == GridCacheOperation.UPDATE) {
+                     cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+                         cctx,
+                         key,
+                         null,
+                         updateVal,
+                         null,
+                         keepBinary,
+                         c.updateRes.updateCounter()));
+                 }
+                 else {
+                     assert c.op == GridCacheOperation.DELETE : c.op;
+ 
+                     cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+                         cctx,
+                         key,
+                         null,
+                         oldVal,
+                         null,
+                         keepBinary,
+                         c.updateRes.updateCounter()));
+                 }
+             }
+         }
+         finally {
+             unlockEntry();
++            unlockListenerReadLock();
+         }
+ 
+         onUpdateFinished(c.updateRes.updateCounter());
+ 
+         return c.updateRes;
+     }
+ 
+     /**
+      * @param val Value.
+      * @param cacheObj Cache object.
+      * @param keepBinary Keep binary flag.
+      * @param cpy Copy flag.
+      * @return Cache object value.
+      */
+     @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
+         if (val != null)
+             return val;
+ 
+         return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
+     }
+ 
+     /**
+      * @param expiry Expiration policy.
+      * @return Tuple holding initial TTL and expire time with the given expiry.
+      */
+     private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
+         assert expiry != null;
+ 
+         long initTtl = expiry.forCreate();
+         long initExpireTime;
+ 
+         if (initTtl == CU.TTL_ZERO) {
+             initTtl = CU.TTL_MINIMUM;
+             initExpireTime = CU.expireTimeInPast();
+         }
+         else if (initTtl == CU.TTL_NOT_CHANGED) {
+             initTtl = CU.TTL_ETERNAL;
+             initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+         }
+         else
+             initExpireTime = CU.toExpireTime(initTtl);
+ 
+         return F.t(initTtl, initExpireTime);
+     }
+ 
+     /**
+      * Get TTL, expire time and remove flag for the given entry, expiration policy and explicit TTL and expire time.
+      *
+      * @param expiry Expiration policy.
+      * @param ttl Explicit TTL.
+      * @param expireTime Explicit expire time.
+      * @return Result.
+      */
+     private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
+         assert !obsolete();
+ 
+         boolean rmv = false;
+ 
+         // 1. If TTL is not changed, then calculate it based on expiry.
          if (ttl == CU.TTL_NOT_CHANGED) {
              if (expiry != null)
                  ttl = hasValueUnlocked() ? expiry.forUpdate() : expiry.forCreate();
@@@ -2559,38 -2708,80 +2722,81 @@@
      ) throws IgniteCheckedException, GridCacheEntryRemovedException {
          ensureFreeSpace();
  
-         synchronized (this) {
+         boolean deferred = false;
+         boolean obsolete = false;
+ 
+         GridCacheVersion oldVer = null;
+ 
++        lockListenerReadLock();
+         lockEntry();
+ 
+         try {
              checkObsolete();
  
+             boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled() && cctx.group().walEnabled();
+ 
+             long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+ 
+             val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+ 
+             final boolean unswapped = ((flags & IS_UNSWAPPED_MASK) != 0);
+ 
              boolean update;
  
-             boolean walEnabled = !cctx.isNear() && cctx.group().persistenceEnabled();
+             IgnitePredicate<CacheDataRow> p = new IgnitePredicate<CacheDataRow>() {
+                 @Override public boolean apply(@Nullable CacheDataRow row) {
+                     boolean update0;
+ 
+                     GridCacheVersion currentVer = row != null ? row.version() : GridCacheMapEntry.this.ver;
  
-             if (cctx.group().persistenceEnabled()) {
-                 unswap(false);
+                     boolean isStartVer = currentVer.nodeOrder() == cctx.localNode().order()
+                         && currentVer.order() == startVer;
  
-                 if (!isNew()) {
-                     if (cctx.atomic())
-                         update = ATOMIC_VER_COMPARATOR.compare(this.ver, ver) < 0;
+                     if (cctx.group().persistenceEnabled()) {
+                         if (!isStartVer) {
+                             if (cctx.atomic())
+                                 update0 = ATOMIC_VER_COMPARATOR.compare(currentVer, ver) < 0;
+                             else
+                                 update0 = currentVer.compareTo(ver) < 0;
+                         }
+                         else
+                             update0 = true;
+                     }
                      else
-                         update = this.ver.compareTo(ver) < 0;
+                         update0 = isStartVer;
+ 
+                     update0 |= (!preload && deletedUnlocked());
+ 
+                     return update0;
                  }
-                 else
-                     update = true;
-             }
-             else
-                 update = isNew() && !cctx.offheap().containsKey(this);
+             };
  
-             update |= !preload && deletedUnlocked();
+             if (unswapped) {
+                 update = p.apply(null);
  
-             if (update) {
-                 long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;
+                 if (update) {
+                     // If entry is already unswapped and we are modifying it, we must run deletion callbacks for old value.
+                     long oldExpTime = expireTimeUnlocked();
+                     long delta = (oldExpTime == 0 ? 0 : oldExpTime - U.currentTimeMillis());
  
-                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+                     if (delta < 0) {
+                         if (onExpired(this.val, null)) {
+                             if (cctx.deferredDelete()) {
+                                 deferred = true;
+                                 oldVer = this.ver;
+                             }
+                             else if (val == null)
+                                 obsolete = true;
+                         }
+                     }
  
-                 if (val != null)
                      storeValue(val, expTime, ver, null);
+                 }
+             }
+             else // Optimization to access storage only once.
+                 update = storeValue(val, expTime, ver, null, p);
  
+             if (update) {
                  update(val, expTime, ttl, ver, true);
  
                  boolean skipQryNtf = false;
@@@ -2653,6 -2844,23 +2859,24 @@@
  
              return false;
          }
+         finally {
+             unlockEntry();
++            unlockListenerReadLock();
+ 
+             // It is necessary to execute these callbacks outside of lock to avoid deadlocks.
+ 
+             if (obsolete) {
+                 onMarkedObsolete();
+ 
+                 cctx.cache().removeEntry(this);
+             }
+ 
+             if (deferred) {
+                 assert oldVer != null;
+ 
+                 cctx.onDeferredDelete(this, oldVer);
+             }
+         }
      }
  
      /**
@@@ -3775,6 -4289,21 +4305,29 @@@
      }
  
      /** {@inheritDoc} */
+     @Override public void lockEntry() {
+         lock.lock();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void unlockEntry() {
+         lock.unlock();
+     }
 -
++        
++    private void lockListenerReadLock() {
++        listenerLock.lock();
++    }        
++    
++    private void unlockListenerReadLock() {
++        listenerLock.unlock();
++    }
++        
+     /** {@inheritDoc} */
+     @Override public boolean lockedByCurrentThread() {
+         return lock.isHeldByCurrentThread();
+     }
+ 
+     /** {@inheritDoc} */
      @Override public boolean equals(Object o) {
          // Identity comparison left on purpose.
          return o == this;

http://git-wip-us.apache.org/repos/asf/ignite/blob/2021942c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b3e2a11,55c44b4..94bd988
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@@ -61,9 -59,9 +63,10 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
  import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
  import org.apache.ignite.internal.processors.cache.KeyCacheObject;
- import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
  import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+ import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
  import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
++import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
  import org.apache.ignite.internal.util.tostring.GridToStringInclude;
  import org.apache.ignite.internal.util.typedef.CI2;
  import org.apache.ignite.internal.util.typedef.F;
@@@ -124,9 -123,6 +128,9 @@@ public class CacheContinuousQueryManage
  
      /** Ordered topic prefix. */
      private String topicPrefix;
 +    
 +    /** ReadWriteLock to control setup of listener */
-     private final ReadWriteLock listenerLock = new ReentrantReadWriteLock() ;
++    private final StripedCompositeReadWriteLock listenerLock = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors()) ;
  
      /** {@inheritDoc} */
      @Override protected void start0() throws IgniteCheckedException {
@@@ -833,9 -852,9 +870,8 @@@
       * @param internal Internal flag.
       * @return Whether listener was actually registered.
       */
--    GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId,
--        CacheContinuousQueryListener lsnr,
--        boolean internal) {
++    GridContinuousHandler.RegisterStatus registerListener(UUID lsnrId, CacheContinuousQueryListener lsnr,
++            boolean internal) {
          boolean added;
  
          if (internal) {
@@@ -843,10 -862,9 +879,9 @@@
  
              if (added)
                  intLsnrCnt.incrementAndGet();
--        }
--        else {
-         	listenerLock.writeLock().lock();
-         	try {        		
 -            synchronized (this) {
++        } else {
++            listenerLock.writeLock().lock();
++            try {
                  if (lsnrCnt.get() == 0) {
                      if (cctx.group().sharedGroup() && !cctx.isLocal())
                          cctx.group().addCacheWithContinuousQuery(cctx);
@@@ -856,16 -874,14 +891,16 @@@
  
                  if (added)
                      lsnrCnt.incrementAndGet();
 +            } finally {
-             	listenerLock.writeLock().unlock();
++                listenerLock.writeLock().unlock();
              }
  
              if (added)
                  lsnr.onExecution();
          }
  
--        return added ? GridContinuousHandler.RegisterStatus.REGISTERED :
--            GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
++        return added ? GridContinuousHandler.RegisterStatus.REGISTERED
++                : GridContinuousHandler.RegisterStatus.NOT_REGISTERED;
      }
  
      /**
@@@ -881,18 -897,15 +916,17 @@@
  
                  lsnr.onUnregister();
              }
--        }
--        else {
-         	listenerLock.writeLock().lock();
-         	try {
 -            synchronized (this) {
++        } else {
++            listenerLock.writeLock().lock();
++            try {
                  if ((lsnr = lsnrs.remove(id)) != null) {
                      int cnt = lsnrCnt.decrementAndGet();
  
                      if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal())
                          cctx.group().removeCacheWithContinuousQuery(cctx);
                  }
 +            } finally {
-             	listenerLock.writeLock().unlock();
++                listenerLock.writeLock().unlock();
              }
  
              if (lsnr != null)


Mime
View raw message