ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/50] [abbrv] ignite git commit: Merge branch ignite-1.5 into ignite-1.5-tx-futs-opts
Date Tue, 24 Nov 2015 09:22:19 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 7e17efe,b266ad2..bd6c2a7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@@ -142,9 -143,9 +142,12 @@@ public final class GridDhtColocatedLock
      /** Skip store flag. */
      private final boolean skipStore;
  
 +    /** */
 +    private Deque<GridNearLockMapping> mappings;
 +
+     /** Keep binary. */
+     private final boolean keepBinary;
+ 
      /**
       * @param cctx Registry.
       * @param keys Keys to lock.
@@@ -727,213 -720,214 +732,242 @@@
       */
      private void map(Collection<KeyCacheObject> keys, boolean remap, boolean topLocked) {
          try {
 -            AffinityTopologyVersion topVer = this.topVer.get();
 +            map0(
 +                keys,
 +                remap,
 +                topLocked);
 +        }
 +        catch (IgniteCheckedException ex) {
 +            onDone(false, ex);
 +        }
 +    }
  
 -            assert topVer != null;
 +    private synchronized void map0(
 +        Collection<KeyCacheObject> keys,
 +        boolean remap,
 +        boolean topLocked
 +    ) throws IgniteCheckedException {
 +        AffinityTopologyVersion topVer = this.topVer;
  
 -            assert topVer.topologyVersion() > 0;
 +        assert topVer != null;
  
 -            if (CU.affinityNodes(cctx, topVer).isEmpty()) {
 -                onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
 -                    "(all partition nodes left the grid): " + cctx.name()));
 +        assert topVer.topologyVersion() > 0;
  
 -                return;
 -            }
 +        if (CU.affinityNodes(cctx, topVer).isEmpty()) {
 +            onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
 +                "(all partition nodes left the grid): " + cctx.name()));
  
 -            boolean clientNode = cctx.kernalContext().clientNode();
 +            return;
 +        }
  
 -            assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
 +        boolean clientNode = cctx.kernalContext().clientNode();
  
 -            // First assume this node is primary for all keys passed in.
 -            if (!clientNode && mapAsPrimary(keys, topVer))
 -                return;
 +        assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
  
 -            Deque<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
 +        // First assume this node is primary for all keys passed in.
 +        if (!clientNode && mapAsPrimary(keys, topVer))
 +            return;
  
 -            // Assign keys to primary nodes.
 -            GridNearLockMapping map = null;
 +        mappings = new ArrayDeque<>();
  
 -            for (KeyCacheObject key : keys) {
 -                GridNearLockMapping updated = map(key, map, topVer);
 +        // Assign keys to primary nodes.
 +        GridNearLockMapping map = null;
  
 -                // If new mapping was created, add to collection.
 -                if (updated != map) {
 -                    mappings.add(updated);
 +        for (KeyCacheObject key : keys) {
 +            GridNearLockMapping updated = map(key, map, topVer);
  
 -                    if (tx != null && updated.node().isLocal())
 -                        tx.colocatedLocallyMapped(true);
 -                }
 +            // If new mapping was created, add to collection.
 +            if (updated != map) {
 +                mappings.add(updated);
  
 -                map = updated;
 +                if (tx != null && updated.node().isLocal())
 +                    tx.colocatedLocallyMapped(true);
              }
  
 -            if (isDone()) {
 -                if (log.isDebugEnabled())
 -                    log.debug("Abandoning (re)map because future is done: " + this);
 -
 -                return;
 -            }
 +            map = updated;
 +        }
  
 +        if (isDone()) {
              if (log.isDebugEnabled())
 -                log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
 +                log.debug("Abandoning (re)map because future is done: " + this);
 +
 +            return;
 +        }
  
 -            boolean hasRmtNodes = false;
 +        if (log.isDebugEnabled())
 +            log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
  
 -            boolean first = true;
 +        boolean hasRmtNodes = false;
  
 -            // Create mini futures.
 -            for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
 -                GridNearLockMapping mapping = iter.next();
 +        boolean first = true;
  
 -                ClusterNode node = mapping.node();
 -                Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
 +        // Create mini futures.
 +        for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
 +            GridNearLockMapping mapping = iter.next();
  
 -                boolean loc = node.equals(cctx.localNode());
 +            ClusterNode node = mapping.node();
 +            Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
  
 -                assert !mappedKeys.isEmpty();
 +            boolean loc = node.equals(cctx.localNode());
  
 -                GridNearLockRequest req = null;
 +            assert !mappedKeys.isEmpty();
  
 -                Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
 +            GridNearLockRequest req = null;
  
 -                for (KeyCacheObject key : mappedKeys) {
 -                    IgniteTxKey txKey = cctx.txKey(key);
 +            Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
  
 -                    GridDistributedCacheEntry entry = null;
 +            for (KeyCacheObject key : mappedKeys) {
 +                IgniteTxKey txKey = cctx.txKey(key);
  
 -                    if (tx != null) {
 -                        IgniteTxEntry txEntry = tx.entry(txKey);
 +                GridDistributedCacheEntry entry = null;
  
 -                        if (txEntry != null) {
 -                            entry = (GridDistributedCacheEntry)txEntry.cached();
 +                if (tx != null) {
 +                    IgniteTxEntry txEntry = tx.entry(txKey);
  
 -                            if (entry != null && !(loc ^ entry.detached())) {
 -                                entry = cctx.colocated().entryExx(key, topVer, true);
 +                    if (txEntry != null) {
 +                        entry = (GridDistributedCacheEntry)txEntry.cached();
  
 -                                txEntry.cached(entry);
 -                            }
 +                        if (entry != null && !(loc ^ entry.detached())) {
 +                            entry = cctx.colocated().entryExx(key, topVer, true);
 +
 +                            txEntry.cached(entry);
                          }
                      }
 +                }
  
 -                    boolean explicit;
 +                boolean explicit;
  
 -                    while (true) {
 -                        try {
 -                            if (entry == null)
 -                                entry = cctx.colocated().entryExx(key, topVer, true);
 +                while (true) {
 +                    try {
 +                        if (entry == null)
 +                            entry = cctx.colocated().entryExx(key, topVer, true);
  
 -                            if (!cctx.isAll(entry, filter)) {
 -                                if (log.isDebugEnabled())
 -                                    log.debug("Entry being locked did not pass filter (will not lock): " + entry);
 +                        if (!cctx.isAll(entry, filter)) {
 +                            if (log.isDebugEnabled())
 +                                log.debug("Entry being locked did not pass filter (will not lock): " + entry);
  
 -                                onComplete(false, false);
 +                            onComplete(false, false);
  
 -                                return;
 -                            }
 +                            return;
 +                        }
  
 -                            assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
 +                        assert loc ^ entry.detached() : "Invalid entry [loc=" + loc + ", entry=" + entry + ']';
  
 -                            GridCacheMvccCandidate cand = addEntry(entry);
 +                        GridCacheMvccCandidate cand = addEntry(entry);
  
 -                            // Will either return value from dht cache or null if this is a miss.
 -                            IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
 -                                ((GridDhtCacheEntry)entry).versionedValue(topVer);
 +                        // Will either return value from dht cache or null if this is a miss.
 +                        IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.detached() ? null :
 +                            ((GridDhtCacheEntry)entry).versionedValue(topVer);
  
 -                            GridCacheVersion dhtVer = null;
 +                        GridCacheVersion dhtVer = null;
  
 -                            if (val != null) {
 -                                dhtVer = val.get1();
 +                        if (val != null) {
 +                            dhtVer = val.get1();
  
 -                                valMap.put(key, val);
 -                            }
 +                            valMap.put(key, val);
 +                        }
  
 -                            if (cand != null && !cand.reentry()) {
 -                                if (req == null) {
 -                                    boolean clientFirst = false;
 +                        if (cand != null && !cand.reentry()) {
 +                            if (req == null) {
 +                                boolean clientFirst = false;
  
 -                                    if (first) {
 -                                        clientFirst = clientNode &&
 -                                            !topLocked &&
 -                                            (tx == null || !tx.hasRemoteLocks());
 +                                if (first) {
 +                                    clientFirst = clientNode &&
 +                                        !topLocked &&
 +                                        (tx == null || !tx.hasRemoteLocks());
  
-                                     first = false;
+                                         first = false;
+                                     }
+ 
+                                     req = new GridNearLockRequest(
+                                         cctx.cacheId(),
+                                         topVer,
+                                         cctx.nodeId(),
+                                         threadId,
+                                         futId,
+                                         lockVer,
+                                         inTx(),
+                                         implicitTx(),
+                                         implicitSingleTx(),
+                                         read,
+                                         retval,
+                                         isolation(),
+                                         isInvalidate(),
+                                         timeout,
+                                         mappedKeys.size(),
+                                         inTx() ? tx.size() : mappedKeys.size(),
+                                         inTx() && tx.syncCommit(),
+                                         inTx() ? tx.subjectId() : null,
+                                         inTx() ? tx.taskNameHash() : 0,
+                                         read ? accessTtl : -1L,
+                                         skipStore,
+                                         keepBinary,
+                                         clientFirst,
+                                         cctx.deploymentEnabled());
+ 
+                                     mapping.request(req);
                                  }
  
 -                                distributedKeys.add(key);
 -
 -                                if (tx != null)
 -                                    tx.addKeyMapping(txKey, mapping.node());
 -
 -                                req.addKeyBytes(
 -                                    key,
 +                                req = new GridNearLockRequest(
 +                                    cctx.cacheId(),
 +                                    topVer,
 +                                    cctx.nodeId(),
 +                                    threadId,
 +                                    futId,
 +                                    lockVer,
 +                                    inTx(),
 +                                    implicitTx(),
 +                                    implicitSingleTx(),
 +                                    read,
                                      retval,
 -                                    dhtVer, // Include DHT version to match remote DHT entry.
 -                                    cctx);
 +                                    isolation(),
 +                                    isInvalidate(),
 +                                    timeout,
 +                                    mappedKeys.size(),
 +                                    inTx() ? tx.size() : mappedKeys.size(),
 +                                    inTx() && tx.syncCommit(),
 +                                    inTx() ? tx.subjectId() : null,
 +                                    inTx() ? tx.taskNameHash() : 0,
 +                                    read ? accessTtl : -1L,
 +                                    skipStore,
 +                                    clientFirst,
 +                                    cctx.deploymentEnabled());
 +
 +                                mapping.request(req);
                              }
  
 -                            explicit = inTx() && cand == null;
 +                            distributedKeys.add(key);
  
 -                            if (explicit)
 +                            if (tx != null)
                                  tx.addKeyMapping(txKey, mapping.node());
  
 -                            break;
 +                            req.addKeyBytes(
 +                                key,
 +                                retval,
 +                                dhtVer, // Include DHT version to match remote DHT entry.
 +                                cctx);
                          }
 -                        catch (GridCacheEntryRemovedException ignored) {
 -                            if (log.isDebugEnabled())
 -                                log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
  
 -                            entry = null;
 -                        }
 -                    }
 +                        explicit = inTx() && cand == null;
  
 -                    // Mark mapping explicit lock flag.
 -                    if (explicit) {
 -                        boolean marked = tx != null && tx.markExplicit(node.id());
 +                        if (explicit)
 +                            tx.addKeyMapping(txKey, mapping.node());
  
 -                        assert tx == null || marked;
 +                        break;
                      }
 -                }
 -
 -                if (inTx() && req != null)
 -                    req.hasTransforms(tx.hasTransforms());
 +                    catch (GridCacheEntryRemovedException ignored) {
 +                        if (log.isDebugEnabled())
 +                            log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
  
 -                if (!distributedKeys.isEmpty()) {
 -                    mapping.distributedKeys(distributedKeys);
 -
 -                    hasRmtNodes |= !mapping.node().isLocal();
 +                        entry = null;
 +                    }
                  }
 -                else {
 -                    assert mapping.request() == null;
  
 -                    iter.remove();
 +                // Mark mapping explicit lock flag.
 +                if (explicit) {
 +                    boolean marked = tx != null && tx.markExplicit(node.id());
 +
 +                    assert tx == null || marked;
                  }
              }
  

http://git-wip-us.apache.org/repos/asf/ignite/blob/7cd4a77a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 6f92204,f16573d..832cc3d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@@ -146,10 -150,9 +146,13 @@@ public final class GridNearLockFuture e
      /** Skip store flag. */
      private final boolean skipStore;
  
 +    /** Mappings to proceed. */
 +    @GridToStringExclude
 +    private Queue<GridNearLockMapping> mappings;
 +
+     /** Keep binary context flag. */
+     private final boolean keepBinary;
+ 
      /**
       * @param cctx Registry.
       * @param keys Keys to lock.
@@@ -848,221 -845,199 +853,222 @@@
  
              assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks()));
  
 -            ConcurrentLinkedDeque8<GridNearLockMapping> mappings = new ConcurrentLinkedDeque8<>();
 +            synchronized (this) {
 +                mappings = new ArrayDeque<>();
  
 -            // Assign keys to primary nodes.
 -            GridNearLockMapping map = null;
 +                // Assign keys to primary nodes.
 +                GridNearLockMapping map = null;
  
 -            for (KeyCacheObject key : keys) {
 -                GridNearLockMapping updated = map(key, map, topVer);
 +                for (KeyCacheObject key : keys) {
 +                    GridNearLockMapping updated = map(
 +                        key,
 +                        map,
 +                        topVer);
  
 -                // If new mapping was created, add to collection.
 -                if (updated != map) {
 -                    mappings.add(updated);
 +                    // If new mapping was created, add to collection.
 +                    if (updated != map) {
 +                        mappings.add(updated);
  
 -                    if (tx != null && updated.node().isLocal())
 -                        tx.nearLocallyMapped(true);
 +                        if (tx != null && updated.node().isLocal())
 +                            tx.nearLocallyMapped(true);
 +                    }
 +
 +                    map = updated;
                  }
  
 -                map = updated;
 -            }
 +                if (isDone()) {
 +                    if (log.isDebugEnabled())
 +                        log.debug("Abandoning (re)map because future is done: " + this);
  
 -            if (isDone()) {
 -                if (log.isDebugEnabled())
 -                    log.debug("Abandoning (re)map because future is done: " + this);
 +                    return;
 +                }
  
 -                return;
 -            }
 +                if (log.isDebugEnabled())
 +                    log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
  
 -            if (log.isDebugEnabled())
 -                log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']');
 +                boolean first = true;
  
 -            boolean first = true;
 +                // Create mini futures.
 +                for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
 +                    GridNearLockMapping mapping = iter.next();
  
 -            // Create mini futures.
 -            for (Iterator<GridNearLockMapping> iter = mappings.iterator(); iter.hasNext(); ) {
 -                GridNearLockMapping mapping = iter.next();
 +                    ClusterNode node = mapping.node();
 +                    Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
  
 -                ClusterNode node = mapping.node();
 -                Collection<KeyCacheObject> mappedKeys = mapping.mappedKeys();
 +                    assert !mappedKeys.isEmpty();
  
 -                assert !mappedKeys.isEmpty();
 +                    GridNearLockRequest req = null;
  
 -                GridNearLockRequest req = null;
 +                    Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
  
 -                Collection<KeyCacheObject> distributedKeys = new ArrayList<>(mappedKeys.size());
 +                    boolean explicit = false;
  
 -                boolean explicit = false;
 +                    for (KeyCacheObject key : mappedKeys) {
 +                        IgniteTxKey txKey = cctx.txKey(key);
  
 -                for (KeyCacheObject key : mappedKeys) {
 -                    IgniteTxKey txKey = cctx.txKey(key);
 +                        while (true) {
 +                            GridNearCacheEntry entry = null;
  
 -                    while (true) {
 -                        GridNearCacheEntry entry = null;
 +                            try {
 +                                entry = cctx.near().entryExx(
 +                                    key,
 +                                    topVer);
  
 -                        try {
 -                            entry = cctx.near().entryExx(key, topVer);
 +                                if (!cctx.isAll(
 +                                    entry,
 +                                    filter)) {
 +                                    if (log.isDebugEnabled())
 +                                        log.debug("Entry being locked did not pass filter (will not lock): " + entry);
  
 -                            if (!cctx.isAll(entry, filter)) {
 -                                if (log.isDebugEnabled())
 -                                    log.debug("Entry being locked did not pass filter (will not lock): " + entry);
 +                                    onComplete(
 +                                        false,
 +                                        false);
  
 -                                onComplete(false, false);
 +                                    return;
 +                                }
  
 -                                return;
 -                            }
 +                                // Removed exception may be thrown here.
 +                                GridCacheMvccCandidate cand = addEntry(
 +                                    topVer,
 +                                    entry,
 +                                    node.id());
  
 -                            // Removed exception may be thrown here.
 -                            GridCacheMvccCandidate cand = addEntry(topVer, entry, node.id());
 +                                if (isDone()) {
 +                                    if (log.isDebugEnabled())
 +                                        log.debug("Abandoning (re)map because future is done after addEntry attempt " +
 +                                            "[fut=" + this + ", entry=" + entry + ']');
  
 -                            if (isDone()) {
 -                                if (log.isDebugEnabled())
 -                                    log.debug("Abandoning (re)map because future is done after addEntry attempt " +
 -                                        "[fut=" + this + ", entry=" + entry + ']');
 +                                    return;
 +                                }
  
 -                                return;
 -                            }
 +                                if (cand != null) {
 +                                    if (tx == null && !cand.reentry())
 +                                        cctx.mvcc().addExplicitLock(
 +                                            threadId,
 +                                            cand,
 +                                            topVer);
  
 -                            if (cand != null) {
 -                                if (tx == null && !cand.reentry())
 -                                    cctx.mvcc().addExplicitLock(threadId, cand, topVer);
 +                                    IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
  
 -                                IgniteBiTuple<GridCacheVersion, CacheObject> val = entry.versionedValue();
 +                                    if (val == null) {
 +                                        GridDhtCacheEntry dhtEntry = dht().peekExx(key);
  
 -                                if (val == null) {
 -                                    GridDhtCacheEntry dhtEntry = dht().peekExx(key);
 +                                        try {
 +                                            if (dhtEntry != null)
 +                                                val = dhtEntry.versionedValue(topVer);
 +                                        }
 +                                        catch (GridCacheEntryRemovedException ignored) {
 +                                            assert dhtEntry.obsolete() : dhtEntry;
  
 -                                    try {
 -                                        if (dhtEntry != null)
 -                                            val = dhtEntry.versionedValue(topVer);
 +                                            if (log.isDebugEnabled())
 +                                                log.debug("Got removed exception for DHT entry in map (will ignore): "
 +                                                    + dhtEntry);
 +                                        }
                                      }
 -                                    catch (GridCacheEntryRemovedException ignored) {
 -                                        assert dhtEntry.obsolete() : " Got removed exception for non-obsolete entry: "
 -                                            + dhtEntry;
  
 -                                        if (log.isDebugEnabled())
 -                                            log.debug("Got removed exception for DHT entry in map (will ignore): "
 -                                                + dhtEntry);
 -                                    }
 -                                }
 +                                    GridCacheVersion dhtVer = null;
  
 -                                GridCacheVersion dhtVer = null;
 +                                    if (val != null) {
 +                                        dhtVer = val.get1();
  
 -                                if (val != null) {
 -                                    dhtVer = val.get1();
 +                                        valMap.put(
 +                                            key,
 +                                            val);
 +                                    }
  
 -                                    valMap.put(key, val);
 -                                }
 +                                    if (!cand.reentry()) {
 +                                        if (req == null) {
 +                                            boolean clientFirst = false;
  
 -                                if (!cand.reentry()) {
 -                                    if (req == null) {
 -                                        boolean clientFirst = false;
 +                                            if (first) {
 +                                                clientFirst = clientNode &&
 +                                                    !topLocked &&
 +                                                    (tx == null || !tx.hasRemoteLocks());
  
 -                                        if (first) {
 -                                            clientFirst = clientNode &&
 -                                                !topLocked &&
 -                                                (tx == null || !tx.hasRemoteLocks());
 +                                                first = false;
 +                                            }
  
 -                                            first = false;
 +                                            req = new GridNearLockRequest(
 +                                                cctx.cacheId(),
 +                                                topVer,
 +                                                cctx.nodeId(),
 +                                                threadId,
 +                                                futId,
 +                                                lockVer,
 +                                                inTx(),
 +                                                implicitTx(),
 +                                                implicitSingleTx(),
 +                                                read,
 +                                                retval,
 +                                                isolation(),
 +                                                isInvalidate(),
 +                                                timeout,
 +                                                mappedKeys.size(),
 +                                                inTx() ? tx.size() : mappedKeys.size(),
 +                                                inTx() && tx.syncCommit(),
 +                                                inTx() ? tx.subjectId() : null,
 +                                                inTx() ? tx.taskNameHash() : 0,
 +                                                read ? accessTtl : -1L,
 +                                                skipStore,
++                                                keepBinary,
 +                                                clientFirst,
 +                                                cctx.deploymentEnabled());
 +
 +                                            mapping.request(req);
                                          }
  
 -                                        req = new GridNearLockRequest(
 -                                            cctx.cacheId(),
 -                                            topVer,
 -                                            cctx.nodeId(),
 -                                            threadId,
 -                                            futId,
 -                                            lockVer,
 -                                            inTx(),
 -                                            implicitTx(),
 -                                            implicitSingleTx(),
 -                                            read,
 -                                            retval,
 -                                            isolation(),
 -                                            isInvalidate(),
 -                                            timeout,
 -                                            mappedKeys.size(),
 -                                            inTx() ? tx.size() : mappedKeys.size(),
 -                                            inTx() && tx.syncCommit(),
 -                                            inTx() ? tx.subjectId() : null,
 -                                            inTx() ? tx.taskNameHash() : 0,
 -                                            read ? accessTtl : -1L,
 -                                            skipStore,
 -                                            keepBinary,
 -                                            clientFirst,
 -                                            cctx.deploymentEnabled());
 -
 -                                        mapping.request(req);
 -                                    }
 +                                        distributedKeys.add(key);
  
 -                                    distributedKeys.add(key);
 +                                        if (tx != null)
 +                                            tx.addKeyMapping(
 +                                                txKey,
 +                                                mapping.node());
  
 -                                    if (tx != null)
 -                                        tx.addKeyMapping(txKey, mapping.node());
 +                                        req.addKeyBytes(
 +                                            key,
 +                                            retval && dhtVer == null,
 +                                            dhtVer,
 +                                            // Include DHT version to match remote DHT entry.
 +                                            cctx);
 +                                    }
  
 -                                    req.addKeyBytes(
 -                                        key,
 -                                        retval && dhtVer == null,
 -                                        dhtVer, // Include DHT version to match remote DHT entry.
 -                                        cctx);
 +                                    if (cand.reentry())
 +                                        explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
                                  }
 -
 -                                if (cand.reentry())
 +                                else
 +                                    // Ignore reentries within transactions.
                                      explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
 -                            }
 -                            else
 -                                // Ignore reentries within transactions.
 -                                explicit = tx != null && !entry.hasLockCandidate(tx.xidVersion());
  
 -                            if (explicit)
 -                                tx.addKeyMapping(txKey, mapping.node());
 +                                if (explicit)
 +                                    tx.addKeyMapping(
 +                                        txKey,
 +                                        mapping.node());
  
 -                            break;
 -                        }
 -                        catch (GridCacheEntryRemovedException ignored) {
 -                            assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
 +                                break;
 +                            }
 +                            catch (GridCacheEntryRemovedException ignored) {
 +                                assert entry.obsolete() : "Got removed exception on non-obsolete entry: " + entry;
  
 -                            if (log.isDebugEnabled())
 -                                log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
 +                                if (log.isDebugEnabled())
 +                                    log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
 +                            }
                          }
 -                    }
  
 -                    // Mark mapping explicit lock flag.
 -                    if (explicit) {
 -                        boolean marked = tx != null && tx.markExplicit(node.id());
 +                        // Mark mapping explicit lock flag.
 +                        if (explicit) {
 +                            boolean marked = tx != null && tx.markExplicit(node.id());
  
 -                        assert tx == null || marked;
 +                            assert tx == null || marked;
 +                        }
                      }
 -                }
  
 -                if (!distributedKeys.isEmpty())
 -                    mapping.distributedKeys(distributedKeys);
 -                else {
 -                    assert mapping.request() == null;
 +                    if (!distributedKeys.isEmpty())
 +                        mapping.distributedKeys(distributedKeys);
 +                    else {
 +                        assert mapping.request() == null;
  
 -                    iter.remove();
 +                        iter.remove();
 +                    }
                  }
              }
  
@@@ -1454,165 -1440,158 +1461,166 @@@
           * @param res Result callback.
           */
          void onResult(GridNearLockResponse res) {
 -            if (rcvRes.compareAndSet(false, true)) {
 -                if (res.error() != null) {
 -                    if (log.isDebugEnabled())
 -                        log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
 -                            ", res=" + res + ']');
 +            synchronized (this) {
 +                if (!rcvRes)
 +                    rcvRes = true;
 +                else
 +                    return;
 +            }
  
 -                    // Fail.
 -                    if (res.error() instanceof GridCacheLockTimeoutException)
 -                        onDone(false);
 -                    else
 -                        onDone(res.error());
 +            if (res.error() != null) {
 +                if (log.isDebugEnabled())
 +                    log.debug("Finishing mini future with an error due to error in response [miniFut=" + this +
 +                        ", res=" + res + ']');
  
 -                    return;
 -                }
 +                // Fail.
 +                if (res.error() instanceof GridCacheLockTimeoutException)
 +                    onDone(false);
 +                else
 +                    onDone(res.error());
  
 -                if (res.clientRemapVersion() != null) {
 -                    assert cctx.kernalContext().clientNode();
 +                return;
 +            }
  
 -                    IgniteInternalFuture<?> affFut =
 -                        cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
 +            if (res.clientRemapVersion() != null) {
 +                assert cctx.kernalContext().clientNode();
  
 -                    if (affFut != null && !affFut.isDone()) {
 -                        affFut.listen(new CI1<IgniteInternalFuture<?>>() {
 -                            @Override public void apply(IgniteInternalFuture<?> fut) {
 -                                try {
 -                                    fut.get();
 +                IgniteInternalFuture<?> affFut =
 +                    cctx.shared().exchange().affinityReadyFuture(res.clientRemapVersion());
  
 -                                    remap();
 -                                }
 -                                catch (IgniteCheckedException e) {
 -                                    onDone(e);
 -                                }
 -                                finally {
 -                                    cctx.shared().txContextReset();
 -                                }
 +                if (affFut != null && !affFut.isDone()) {
 +                    affFut.listen(new CI1<IgniteInternalFuture<?>>() {
 +                        @Override public void apply(IgniteInternalFuture<?> fut) {
 +                            try {
 +                                fut.get();
 +
 +                                remap();
                              }
 -                        });
 -                    }
 -                    else
 -                        remap();
 +                            catch (IgniteCheckedException e) {
 +                                onDone(e);
 +                            }
 +                            finally {
 +                                cctx.shared().txContextReset();
 +                            }
 +                        }
 +                    });
                  }
 -                else {
 -                    int i = 0;
 +                else
 +                    remap();
 +            }
 +            else {
 +                int i = 0;
  
 -                    AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer.get();
 +                AffinityTopologyVersion topVer = GridNearLockFuture.this.topVer;
  
 -                    for (KeyCacheObject k : keys) {
 -                        while (true) {
 -                            GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
 +                for (KeyCacheObject k : keys) {
 +                    while (true) {
 +                        GridNearCacheEntry entry = cctx.near().entryExx(k, topVer);
  
 -                            try {
 -                                if (res.dhtVersion(i) == null) {
 -                                    onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
 -                                        "(will fail the lock): " + res));
 +                        try {
 +                            if (res.dhtVersion(i) == null) {
 +                                onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
 +                                    "(will fail the lock): " + res));
  
 -                                    return;
 -                                }
 +                                return;
 +                            }
  
 -                                IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
 +                            IgniteBiTuple<GridCacheVersion, CacheObject> oldValTup = valMap.get(entry.key());
  
 -                                CacheObject oldVal = entry.rawGet();
 -                                boolean hasOldVal = false;
 -                                CacheObject newVal = res.value(i);
 +                            CacheObject oldVal = entry.rawGet();
 +                            boolean hasOldVal = false;
 +                            CacheObject newVal = res.value(i);
  
 -                                boolean readRecordable = false;
 +                            boolean readRecordable = false;
  
 -                                if (retval) {
 -                                    readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
 +                            if (retval) {
 +                                readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ);
  
 -                                    if (readRecordable)
 -                                        hasOldVal = entry.hasValue();
 -                                }
 +                                if (readRecordable)
 +                                    hasOldVal = entry.hasValue();
 +                            }
  
 -                                GridCacheVersion dhtVer = res.dhtVersion(i);
 -                                GridCacheVersion mappedVer = res.mappedVersion(i);
 +                            GridCacheVersion dhtVer = res.dhtVersion(i);
 +                            GridCacheVersion mappedVer = res.mappedVersion(i);
  
 -                                if (newVal == null) {
 -                                    if (oldValTup != null) {
 -                                        if (oldValTup.get1().equals(dhtVer))
 -                                            newVal = oldValTup.get2();
 +                            if (newVal == null) {
 +                                if (oldValTup != null) {
 +                                    if (oldValTup.get1().equals(dhtVer))
 +                                        newVal = oldValTup.get2();
  
 -                                        oldVal = oldValTup.get2();
 -                                    }
 +                                    oldVal = oldValTup.get2();
                                  }
 +                            }
  
 -                                // Lock is held at this point, so we can set the
 -                                // returned value if any.
 -                                entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
 +                            // Lock is held at this point, so we can set the
 +                            // returned value if any.
 +                            entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer);
  
 -                                if (inTx()) {
 -                                    tx.hasRemoteLocks(true);
 +                            if (inTx()) {
 +                                tx.hasRemoteLocks(true);
  
 -                                    if (implicitTx() && tx.onePhaseCommit()) {
 -                                        boolean pass = res.filterResult(i);
 +                                if (implicitTx() && tx.onePhaseCommit()) {
 +                                    boolean pass = res.filterResult(i);
  
 -                                        tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
 -                                    }
 +                                    tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr());
                                  }
 +                            }
  
 -                                entry.readyNearLock(lockVer,
 -                                    mappedVer,
 -                                    res.committedVersions(),
 -                                    res.rolledbackVersions(),
 -                                    res.pending());
 -
 -                                if (retval) {
 -                                    if (readRecordable)
 -                                        cctx.events().addEvent(
 -                                            entry.partition(),
 -                                            entry.key(),
 -                                            tx,
 -                                            null,
 -                                            EVT_CACHE_OBJECT_READ,
 -                                            newVal,
 -                                            newVal != null,
 -                                            oldVal,
 -                                            hasOldVal,
 -                                            CU.subjectId(tx, cctx.shared()),
 -                                            null,
 -                                            inTx() ? tx.resolveTaskName() : null,
 -                                            keepBinary);
 -
 -                                    if (cctx.cache().configuration().isStatisticsEnabled())
 -                                        cctx.cache().metrics0().onRead(false);
 -                                }
 +                            entry.readyNearLock(lockVer,
 +                                mappedVer,
 +                                res.committedVersions(),
 +                                res.rolledbackVersions(),
 +                                res.pending());
 +
 +                            if (retval) {
 +                                if (readRecordable)
 +                                    cctx.events().addEvent(
 +                                        entry.partition(),
 +                                        entry.key(),
 +                                        tx,
 +                                        null,
 +                                        EVT_CACHE_OBJECT_READ,
 +                                        newVal,
 +                                        newVal != null,
 +                                        oldVal,
 +                                        hasOldVal,
 +                                        CU.subjectId(tx, cctx.shared()),
 +                                        null,
-                                         inTx() ? tx.resolveTaskName() : null);
++                                        inTx() ? tx.resolveTaskName() : null,
++                                        keepBinary);
 +
 +                                if (cctx.cache().configuration().isStatisticsEnabled())
 +                                    cctx.cache().metrics0().onRead(false);
 +                            }
  
 -                                if (log.isDebugEnabled())
 -                                    log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
 +                            if (log.isDebugEnabled())
 +                                log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
  
 -                                break; // Inner while loop.
 -                            }
 -                            catch (GridCacheEntryRemovedException ignored) {
 -                                if (log.isDebugEnabled())
 -                                    log.debug("Failed to add candidates because entry was removed (will renew).");
 +                            break; // Inner while loop.
 +                        }
 +                        catch (GridCacheEntryRemovedException ignored) {
 +                            if (log.isDebugEnabled())
 +                                log.debug("Failed to add candidates because entry was removed (will renew).");
  
 +                            synchronized (GridNearLockFuture.this) {
                                  // Replace old entry with new one.
 -                                entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
 +                                entries.set(i,
 +                                    (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
                              }
                          }
 -
 -                        i++;
                      }
  
 -                    try {
 -                        proceedMapping(mappings);
 -                    }
 -                    catch (IgniteCheckedException e) {
 -                        onDone(e);
 -                    }
 +                    i++;
 +                }
  
 -                    onDone(true);
 +                try {
 +                    proceedMapping();
 +                }
 +                catch (IgniteCheckedException e) {
 +                    onDone(e);
                  }
 +
 +                onDone(true);
              }
          }
  


Mime
View raw message