Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2A25E180A1 for ; Mon, 23 Nov 2015 14:02:13 +0000 (UTC) Received: (qmail 83220 invoked by uid 500); 23 Nov 2015 14:02:13 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 83177 invoked by uid 500); 23 Nov 2015 14:02:13 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 83168 invoked by uid 99); 23 Nov 2015 14:02:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Nov 2015 14:02:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D4F27E05D3; Mon, 23 Nov 2015 14:02:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Mon, 23 Nov 2015 14:02:19 -0000 Message-Id: <7768a2c2c7614edc8af5e50835967d40@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [08/17] ignite git commit: Merge branch ignite-1.5 into ignite-1.5-tx-futs-opts http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 7e17efe,b266ad2..bd6c2a7 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@@ -142,9 -143,9 +142,12 @@@ public final class GridDhtColocatedLock /** Skip store flag. */ private final boolean skipStore; + /** */ + private Deque mappings; + + /** Keep binary. */ + private final boolean keepBinary; + /** * @param cctx Registry. * @param keys Keys to lock. @@@ -727,213 -720,214 +732,242 @@@ */ private void map(Collection keys, boolean remap, boolean topLocked) { try { - AffinityTopologyVersion topVer = this.topVer.get(); + map0( + keys, + remap, + topLocked); + } + catch (IgniteCheckedException ex) { + onDone(false, ex); + } + } - assert topVer != null; + private synchronized void map0( + Collection keys, + boolean remap, + boolean topLocked + ) throws IgniteCheckedException { + AffinityTopologyVersion topVer = this.topVer; - assert topVer.topologyVersion() > 0; + assert topVer != null; - if (CU.affinityNodes(cctx, topVer).isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid): " + cctx.name())); + assert topVer.topologyVersion() > 0; - return; - } + if (CU.affinityNodes(cctx, topVer).isEmpty()) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid): " + cctx.name())); - boolean clientNode = cctx.kernalContext().clientNode(); + return; + } - assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + boolean clientNode = cctx.kernalContext().clientNode(); - // First assume this node is primary for all keys passed in. - if (!clientNode && mapAsPrimary(keys, topVer)) - return; + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); - Deque mappings = new ConcurrentLinkedDeque8<>(); + // First assume this node is primary for all keys passed in. + if (!clientNode && mapAsPrimary(keys, topVer)) + return; - // Assign keys to primary nodes. - GridNearLockMapping map = null; + mappings = new ArrayDeque<>(); - for (KeyCacheObject key : keys) { - GridNearLockMapping updated = map(key, map, topVer); + // Assign keys to primary nodes. + GridNearLockMapping map = null; - // If new mapping was created, add to collection. - if (updated != map) { - mappings.add(updated); + for (KeyCacheObject key : keys) { + GridNearLockMapping updated = map(key, map, topVer); - if (tx != null && updated.node().isLocal()) - tx.colocatedLocallyMapped(true); - } + // If new mapping was created, add to collection. + if (updated != map) { + mappings.add(updated); - map = updated; + if (tx != null && updated.node().isLocal()) + tx.colocatedLocallyMapped(true); } - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done: " + this); - - return; - } + map = updated; + } + if (isDone()) { if (log.isDebugEnabled()) - log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } - boolean hasRmtNodes = false; + if (log.isDebugEnabled()) + log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); - boolean first = true; + boolean hasRmtNodes = false; - // Create mini futures. - for (Iterator iter = mappings.iterator(); iter.hasNext(); ) { - GridNearLockMapping mapping = iter.next(); + boolean first = true; - ClusterNode node = mapping.node(); - Collection mappedKeys = mapping.mappedKeys(); + // Create mini futures. + for (Iterator iter = mappings.iterator(); iter.hasNext(); ) { + GridNearLockMapping mapping = iter.next(); - boolean loc = node.equals(cctx.localNode()); + ClusterNode node = mapping.node(); + Collection mappedKeys = mapping.mappedKeys(); - assert !mappedKeys.isEmpty(); + boolean loc = node.equals(cctx.localNode()); - GridNearLockRequest req = null; + assert !mappedKeys.isEmpty(); - Collection distributedKeys = new ArrayList<>(mappedKeys.size()); + GridNearLockRequest req = null; - for (KeyCacheObject key : mappedKeys) { - IgniteTxKey txKey = cctx.txKey(key); + Collection distributedKeys = new ArrayList<>(mappedKeys.size()); - GridDistributedCacheEntry entry = null; + for (KeyCacheObject key : mappedKeys) { + IgniteTxKey txKey = cctx.txKey(key); - if (tx != null) { - IgniteTxEntry txEntry = tx.entry(txKey); + GridDistributedCacheEntry entry = null; - if (txEntry != null) { - entry = (GridDistributedCacheEntry)txEntry.cached(); + if (tx != null) { + IgniteTxEntry txEntry = tx.entry(txKey); - if (entry != null && !(loc ^ entry.detached())) { - entry = cctx.colocated().entryExx(key, topVer, true); + if (txEntry != null) { + entry = (GridDistributedCacheEntry)txEntry.cached(); - txEntry.cached(entry); - } + if (entry != null && !(loc ^ entry.detached())) { + entry = cctx.colocated().entryExx(key, topVer, true); + + txEntry.cached(entry); } } + } - boolean explicit; + boolean explicit; - while (true) { - try { - if (entry == null) - entry = cctx.colocated().entryExx(key, topVer, true); + while (true) { + try { + if (entry == null) + entry = cctx.colocated().entryExx(key, topVer, true); - if (!cctx.isAll(entry, filter)) { - if (log.isDebugEnabled()) - log.debug("Entry being locked did not pass filter (will not lock): " + entry); + if (!cctx.isAll(entry, filter)) { + if (log.isDebugEnabled()) + log.debug("Entry being locked did not pass filter (will not lock): " + entry); - onComplete(false, false); + onComplete(false, false); - return; - } + return; + } - assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']'; + assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']'; - GridCacheMvccCandidate cand = addEntry(entry); + GridCacheMvccCandidate cand = addEntry(entry); - // Will either return value from dht cache or null if this is a miss. - IgniteBiTuple val = entry.detached() ? null : - ((GridDhtCacheEntry)entry).versionedValue(topVer); + // Will either return value from dht cache or null if this is a miss. + IgniteBiTuple val = entry.detached() ? null : + ((GridDhtCacheEntry)entry).versionedValue(topVer); - GridCacheVersion dhtVer = null; + GridCacheVersion dhtVer = null; - if (val != null) { - dhtVer = val.get1(); + if (val != null) { + dhtVer = val.get1(); - valMap.put(key, val); - } + valMap.put(key, val); + } - if (cand != null && !cand.reentry()) { - if (req == null) { - boolean clientFirst = false; + if (cand != null && !cand.reentry()) { + if (req == null) { + boolean clientFirst = false; - if (first) { - clientFirst = clientNode && - !topLocked && - (tx == null || !tx.hasRemoteLocks()); + if (first) { + clientFirst = clientNode && + !topLocked && + (tx == null || !tx.hasRemoteLocks()); - first = false; + first = false; + } + + req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + implicitTx(), + implicitSingleTx(), + read, + retval, + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncCommit(), + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L, + skipStore, + keepBinary, + clientFirst, + cctx.deploymentEnabled()); + + mapping.request(req); } - distributedKeys.add(key); - - if (tx != null) - tx.addKeyMapping(txKey, mapping.node()); - - req.addKeyBytes( - key, + req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + implicitTx(), + implicitSingleTx(), + read, retval, - dhtVer, // Include DHT version to match remote DHT entry. - cctx); + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncCommit(), + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L, + skipStore, + clientFirst, + cctx.deploymentEnabled()); + + mapping.request(req); } - explicit = inTx() && cand == null; + distributedKeys.add(key); - if (explicit) + if (tx != null) tx.addKeyMapping(txKey, mapping.node()); - break; + req.addKeyBytes( + key, + retval, + dhtVer, // Include DHT version to match remote DHT entry. + cctx); } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); - entry = null; - } - } + explicit = inTx() && cand == null; - // Mark mapping explicit lock flag. - if (explicit) { - boolean marked = tx != null && tx.markExplicit(node.id()); + if (explicit) + tx.addKeyMapping(txKey, mapping.node()); - assert tx == null || marked; + break; } - } - - if (inTx() && req != null) - req.hasTransforms(tx.hasTransforms()); + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); - if (!distributedKeys.isEmpty()) { - mapping.distributedKeys(distributedKeys); - - hasRmtNodes |= !mapping.node().isLocal(); + entry = null; + } } - else { - assert mapping.request() == null; - iter.remove(); + // Mark mapping explicit lock flag. + if (explicit) { + boolean marked = tx != null && tx.markExplicit(node.id()); + + assert tx == null || marked; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 6f92204,f16573d..832cc3d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@@ -146,10 -150,9 +146,13 @@@ public final class GridNearLockFuture e /** Skip store flag. */ private final boolean skipStore; + /** Mappings to proceed. */ + @GridToStringExclude + private Queue mappings; + + /** Keep binary context flag. */ + private final boolean keepBinary; + /** * @param cctx Registry. * @param keys Keys to lock. @@@ -848,221 -845,199 +853,222 @@@ assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); - ConcurrentLinkedDeque8 mappings = new ConcurrentLinkedDeque8<>(); + synchronized (this) { + mappings = new ArrayDeque<>(); - // Assign keys to primary nodes. - GridNearLockMapping map = null; + // Assign keys to primary nodes. + GridNearLockMapping map = null; - for (KeyCacheObject key : keys) { - GridNearLockMapping updated = map(key, map, topVer); + for (KeyCacheObject key : keys) { + GridNearLockMapping updated = map( + key, + map, + topVer); - // If new mapping was created, add to collection. - if (updated != map) { - mappings.add(updated); + // If new mapping was created, add to collection. + if (updated != map) { + mappings.add(updated); - if (tx != null && updated.node().isLocal()) - tx.nearLocallyMapped(true); + if (tx != null && updated.node().isLocal()) + tx.nearLocallyMapped(true); + } + + map = updated; } - map = updated; - } + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done: " + this); + return; + } - return; - } + if (log.isDebugEnabled()) + log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); - if (log.isDebugEnabled()) - log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); + boolean first = true; - boolean first = true; + // Create mini futures. + for (Iterator iter = mappings.iterator(); iter.hasNext(); ) { + GridNearLockMapping mapping = iter.next(); - // Create mini futures. - for (Iterator iter = mappings.iterator(); iter.hasNext(); ) { - GridNearLockMapping mapping = iter.next(); + ClusterNode node = mapping.node(); + Collection mappedKeys = mapping.mappedKeys(); - ClusterNode node = mapping.node(); - Collection mappedKeys = mapping.mappedKeys(); + assert !mappedKeys.isEmpty(); - assert !mappedKeys.isEmpty(); + GridNearLockRequest req = null; - GridNearLockRequest req = null; + Collection distributedKeys = new ArrayList<>(mappedKeys.size()); - Collection distributedKeys = new ArrayList<>(mappedKeys.size()); + boolean explicit = false; - boolean explicit = false; + for (KeyCacheObject key : mappedKeys) { + IgniteTxKey txKey = cctx.txKey(key); - for (KeyCacheObject key : mappedKeys) { - IgniteTxKey txKey = cctx.txKey(key); + while (true) { + GridNearCacheEntry entry = null; - while (true) { - GridNearCacheEntry entry = null; + try { + entry = cctx.near().entryExx( + key, + topVer); - try { - entry = cctx.near().entryExx(key, topVer); + if (!cctx.isAll( + entry, + filter)) { + if (log.isDebugEnabled()) + log.debug("Entry being locked did not pass filter (will not lock): " + entry); - if (!cctx.isAll(entry, filter)) { - if (log.isDebugEnabled()) - log.debug("Entry being locked did not pass filter (will not lock): " + entry); + onComplete( + false, + false); - onComplete(false, false); + return; + } - return; - } + // Removed exception may be thrown here. + GridCacheMvccCandidate cand = addEntry( + topVer, + entry, + node.id()); - // Removed exception may be thrown here. - GridCacheMvccCandidate cand = addEntry(topVer, entry, node.id()); + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done after addEntry attempt " + + "[fut=" + this + ", entry=" + entry + ']'); - if (isDone()) { - if (log.isDebugEnabled()) - log.debug("Abandoning (re)map because future is done after addEntry attempt " + - "[fut=" + this + ", entry=" + entry + ']'); + return; + } - return; - } + if (cand != null) { + if (tx == null && !cand.reentry()) + cctx.mvcc().addExplicitLock( + threadId, + cand, + topVer); - if (cand != null) { - if (tx == null && !cand.reentry()) - cctx.mvcc().addExplicitLock(threadId, cand, topVer); + IgniteBiTuple val = entry.versionedValue(); - IgniteBiTuple val = entry.versionedValue(); + if (val == null) { + GridDhtCacheEntry dhtEntry = dht().peekExx(key); - if (val == null) { - GridDhtCacheEntry dhtEntry = dht().peekExx(key); + try { + if (dhtEntry != null) + val = dhtEntry.versionedValue(topVer); + } + catch (GridCacheEntryRemovedException ignored) { + assert dhtEntry.obsolete() : dhtEntry; - try { - if (dhtEntry != null) - val = dhtEntry.versionedValue(topVer); + if (log.isDebugEnabled()) + log.debug("Got removed exception for DHT entry in map (will ignore): " + + dhtEntry); + } } - catch (GridCacheEntryRemovedException ignored) { - assert dhtEntry.obsolete() : " Got removed exception for non-obsolete entry: " - + dhtEntry; - if (log.isDebugEnabled()) - log.debug("Got removed exception for DHT entry in map (will ignore): " - + dhtEntry); - } - } + GridCacheVersion dhtVer = null; - GridCacheVersion dhtVer = null; + if (val != null) { + dhtVer = val.get1(); - if (val != null) { - dhtVer = val.get1(); + valMap.put( + key, + val); + } - valMap.put(key, val); - } + if (!cand.reentry()) { + if (req == null) { + boolean clientFirst = false; - if (!cand.reentry()) { - if (req == null) { - boolean clientFirst = false; + if (first) { + clientFirst = clientNode && + !topLocked && + (tx == null || !tx.hasRemoteLocks()); - if (first) { - clientFirst = clientNode && - !topLocked && - (tx == null || !tx.hasRemoteLocks()); + first = false; + } - first = false; + req = new GridNearLockRequest( + cctx.cacheId(), + topVer, + cctx.nodeId(), + threadId, + futId, + lockVer, + inTx(), + implicitTx(), + implicitSingleTx(), + read, + retval, + isolation(), + isInvalidate(), + timeout, + mappedKeys.size(), + inTx() ? tx.size() : mappedKeys.size(), + inTx() && tx.syncCommit(), + inTx() ? tx.subjectId() : null, + inTx() ? tx.taskNameHash() : 0, + read ? accessTtl : -1L, + skipStore, ++ keepBinary, + clientFirst, + cctx.deploymentEnabled()); + + mapping.request(req); } - req = new GridNearLockRequest( - cctx.cacheId(), - topVer, - cctx.nodeId(), - threadId, - futId, - lockVer, - inTx(), - implicitTx(), - implicitSingleTx(), - read, - retval, - isolation(), - isInvalidate(), - timeout, - mappedKeys.size(), - inTx() ? tx.size() : mappedKeys.size(), - inTx() && tx.syncCommit(), - inTx() ? tx.subjectId() : null, - inTx() ? tx.taskNameHash() : 0, - read ? accessTtl : -1L, - skipStore, - keepBinary, - clientFirst, - cctx.deploymentEnabled()); - - mapping.request(req); - } + distributedKeys.add(key); - distributedKeys.add(key); + if (tx != null) + tx.addKeyMapping( + txKey, + mapping.node()); - if (tx != null) - tx.addKeyMapping(txKey, mapping.node()); + req.addKeyBytes( + key, + retval && dhtVer == null, + dhtVer, + // Include DHT version to match remote DHT entry. + cctx); + } - req.addKeyBytes( - key, - retval && dhtVer == null, - dhtVer, // Include DHT version to match remote DHT entry. - cctx); + if (cand.reentry()) + explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion()); } - - if (cand.reentry()) + else + // Ignore reentries within transactions. explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion()); - } - else - // Ignore reentries within transactions. - explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion()); - if (explicit) - tx.addKeyMapping(txKey, mapping.node()); + if (explicit) + tx.addKeyMapping( + txKey, + mapping.node()); - break; - } - catch (GridCacheEntryRemovedException ignored) { - assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry; + break; + } + catch (GridCacheEntryRemovedException ignored) { + assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry; - if (log.isDebugEnabled()) - log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); + if (log.isDebugEnabled()) + log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry); + } } - } - // Mark mapping explicit lock flag. - if (explicit) { - boolean marked = tx != null && tx.markExplicit(node.id()); + // Mark mapping explicit lock flag. + if (explicit) { + boolean marked = tx != null && tx.markExplicit(node.id()); - assert tx == null || marked; + assert tx == null || marked; + } } - } - if (!distributedKeys.isEmpty()) - mapping.distributedKeys(distributedKeys); - else { - assert mapping.request() == null; + if (!distributedKeys.isEmpty()) + mapping.distributedKeys(distributedKeys); + else { + assert mapping.request() == null; - iter.remove(); + iter.remove(); + } } } @@@ -1454,165 -1440,158 +1461,166 @@@ * @param res Result callback. */ void onResult(GridNearLockResponse res) { - if (rcvRes.compareAndSet(false, true)) { - if (res.error() != null) { - if (log.isDebugEnabled()) - log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + - ", res=" + res + ']'); + synchronized (this) { + if (!rcvRes) + rcvRes = true; + else + return; + } - // Fail. - if (res.error() instanceof GridCacheLockTimeoutException) - onDone(false); - else - onDone(res.error()); + if (res.error() != null) { + if (log.isDebugEnabled()) + log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + + ", res=" + res + ']'); - return; - } + // Fail. + if (res.error() instanceof GridCacheLockTimeoutException) + onDone(false); + else + onDone(res.error()); - if (res.clientRemapVersion() != null) { - assert cctx.kernalContext().clientNode(); + return; + } - IgniteInternalFuture affFut = - cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); - if (affFut != null && !affFut.isDone()) { - affFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + IgniteInternalFuture affFut = + cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion()); - remap(); - } - catch (IgniteCheckedException e) { - onDone(e); - } - finally { - cctx.shared().txContextReset(); - } + if (affFut != null && !affFut.isDone()) { + affFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); + + remap(); } - }); - } - else - remap(); + catch (IgniteCheckedException e) { + onDone(e); + } + finally { + cctx.shared().txContextReset(); + } + } + }); } - else { - int i = 0; + else + remap(); + } + else { + int i = 0; - AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get(); + AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer; - for (KeyCacheObject k : keys) { - while (true) { - GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); + for (KeyCacheObject k : keys) { + while (true) { + GridNearCacheEntry entry = cctx.near().entryExx(k, topVer); - try { - if (res.dhtVersion(i) == null) { - onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + - "(will fail the lock): " + res)); + try { + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); - return; - } + return; + } - IgniteBiTuple oldValTup = valMap.get(entry.key()); + IgniteBiTuple oldValTup = valMap.get(entry.key()); - CacheObject oldVal = entry.rawGet(); - boolean hasOldVal = false; - CacheObject newVal = res.value(i); + CacheObject oldVal = entry.rawGet(); + boolean hasOldVal = false; + CacheObject newVal = res.value(i); - boolean readRecordable = false; + boolean readRecordable = false; - if (retval) { - readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + if (retval) { + readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); - if (readRecordable) - hasOldVal = entry.hasValue(); - } + if (readRecordable) + hasOldVal = entry.hasValue(); + } - GridCacheVersion dhtVer = res.dhtVersion(i); - GridCacheVersion mappedVer = res.mappedVersion(i); + GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion mappedVer = res.mappedVersion(i); - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); - oldVal = oldValTup.get2(); - } + oldVal = oldValTup.get2(); } + } - // Lock is held at this point, so we can set the - // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); + // Lock is held at this point, so we can set the + // returned value if any. + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); - if (inTx()) { - tx.hasRemoteLocks(true); + if (inTx()) { + tx.hasRemoteLocks(true); - if (implicitTx() && tx.onePhaseCommit()) { - boolean pass = res.filterResult(i); + if (implicitTx() && tx.onePhaseCommit()) { + boolean pass = res.filterResult(i); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); - } + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); } + } - entry.readyNearLock(lockVer, - mappedVer, - res.committedVersions(), - res.rolledbackVersions(), - res.pending()); - - if (retval) { - if (readRecordable) - cctx.events().addEvent( - entry.partition(), - entry.key(), - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - oldVal, - hasOldVal, - CU.subjectId(tx, cctx.shared()), - null, - inTx() ? tx.resolveTaskName() : null, - keepBinary); - - if (cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(false); - } + entry.readyNearLock(lockVer, + mappedVer, + res.committedVersions(), + res.rolledbackVersions(), + res.pending()); + + if (retval) { + if (readRecordable) + cctx.events().addEvent( + entry.partition(), + entry.key(), + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + oldVal, + hasOldVal, + CU.subjectId(tx, cctx.shared()), + null, - inTx() ? tx.resolveTaskName() : null); ++ inTx() ? tx.resolveTaskName() : null, ++ keepBinary); + + if (cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(false); + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - break; // Inner while loop. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to add candidates because entry was removed (will renew)."); + break; // Inner while loop. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to add candidates because entry was removed (will renew)."); + synchronized (GridNearLockFuture.this) { // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + entries.set(i, + (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); } } - - i++; } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); - } + i++; + } - onDone(true); + try { + proceedMapping(); + } + catch (IgniteCheckedException e) { + onDone(e); } + + onDone(true); } }