ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [1/2] ignite git commit: Revert "Speculation: path for single key update."
Date Fri, 04 Mar 2016 20:21:42 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-atomic-good-lock-bench 573145090 -> 7a7494482


Revert "Speculation: path for single key update."

This reverts commit 5731450902d936d2b34bfb05469397b2cc65b9f7.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bc92417c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bc92417c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bc92417c

Branch: refs/heads/ignite-atomic-good-lock-bench
Commit: bc92417cf671555dcbdb0382533396c5be63cedb
Parents: 5731450
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Fri Mar 4 22:51:25 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Fri Mar 4 22:51:25 2016 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 485 +------------------
 1 file changed, 2 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bc92417c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d7c0e72..dad00ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1321,185 +1321,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         }
     }
 
-    public void updateAllAsyncInternal1(
-        UUID nodeId,
-        GridNearAtomicUpdateRequest req,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
-    ) {
-        KeyCacheObject key = req.keys().get(0);
-
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId, req.futureVersion(),
-            ctx.deploymentEnabled());
-
-        assert !req.returnValue() || (req.operation() == TRANSFORM);
-
-        GridDhtAtomicUpdateFuture dhtFut = null;
-
-        boolean remap = false;
-
-        String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
-
-        IgniteCacheExpiryPolicy expiry = null;
-
-        try {
-            // If batch store update is enabled, we need to lock all entries.
-            // First, need to acquire locks on cache entries, then check filter.
-            GridDhtCacheEntry locked = lockEntry(key, req.topologyVersion());
-
-            Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted
= null;
-
-            try {
-                GridDhtPartitionTopology top = topology();
-
-                top.readLock();
-
-                try {
-                    if (top.stopping()) {
-                        res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed
to perform cache operation " +
-                            "(cache is stopped): " + name()));
-
-                        completionCb.apply(req, res);
-
-                        return;
-                    }
-
-                    // Do not check topology version for CLOCK versioning since
-                    // partition exchange will wait for near update future (if future is
on server node).
-                    // Also do not check topology version if topology was locked on near
node by
-                    // external transaction or explicit lock.
-                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked()
||
-                        !needRemap(req.topologyVersion(), top.topologyVersion())) {
-                        ClusterNode node = ctx.discovery().node(nodeId);
-
-                        if (node == null) {
-                            U.warn(log, "Node originated update request left grid: " + nodeId);
-
-                            return;
-                        }
-
-                        boolean hasNear = ctx.discovery().cacheNearNode(node, name());
-
-                        GridCacheVersion ver = req.updateVersion();
-
-                        if (ver == null) {
-                            // Assign next version for update inside entries lock.
-                            ver = ctx.versions().next(top.topologyVersion());
-
-                            if (hasNear)
-                                res.nearVersion(ver);
-                        }
-
-                        assert ver != null : "Got null version for update request: " + req;
-
-                        if (log.isDebugEnabled())
-                            log.debug("Using cache version for update request on primary
node [ver=" + ver +
-                                ", req=" + req + ']');
-
-                        boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
-
-                        dhtFut = createDhtFuture(ver, req, res, completionCb, false);
-
-                        expiry = expiryPolicy(req.expiry());
-
-                        GridCacheReturn retVal = null;
-
-                        UpdateSingleResult updRes = updateSingle(node,
-                            hasNear,
-                            req,
-                            res,
-                            locked,
-                            ver,
-                            dhtFut,
-                            completionCb,
-                            ctx.isDrEnabled(),
-                            taskName,
-                            expiry,
-                            sndPrevVal);
-
-                        retVal = updRes.returnValue();
-                        deleted = updRes.deleted();
-                        dhtFut = updRes.dhtFuture();
-
-                        if (retVal == null)
-                            retVal = new GridCacheReturn(ctx, node.isLocal(), true, null,
true);
-
-                        res.returnValue(retVal);
-
-                        if (req.writeSynchronizationMode() != FULL_ASYNC)
-                            req.cleanup(!node.isLocal());
-
-                        if (dhtFut != null)
-                            ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
-                    }
-                    else
-                        // Should remap all keys.
-                        remap = true;
-                }
-                finally {
-                    top.readUnlock();
-                }
-            }
-            catch (GridCacheEntryRemovedException e) {
-                assert false : "Entry should not become obsolete while holding lock.";
-
-                e.printStackTrace();
-            }
-            finally {
-                if (locked != null)
-                    unlockEntry(locked, req.topologyVersion());
-
-                // Enqueue if necessary after locks release.
-                if (deleted != null) {
-                    assert !deleted.isEmpty();
-                    assert ctx.deferredDelete() : this;
-
-                    for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted)
-                        ctx.onDeferredDelete(e.get1(), e.get2());
-                }
-            }
-        }
-        catch (GridDhtInvalidPartitionException ignore) {
-            assert !req.fastMap() || req.clientRequest() : req;
-
-            if (log.isDebugEnabled())
-                log.debug("Caught invalid partition exception for cache entry (will remap
update request): " + req);
-
-            remap = true;
-        }
-        catch (Throwable e) {
-            // At least RuntimeException can be thrown by the code above when GridCacheContext
is cleaned and there is
-            // an attempt to use cleaned resources.
-            U.error(log, "Unexpected exception during cache update", e);
-
-            res.addFailedKeys(req.keys(), e);
-
-            completionCb.apply(req, res);
-
-            if (e instanceof Error)
-                throw e;
-
-            return;
-        }
-
-        if (remap) {
-            assert dhtFut == null;
-
-            res.remapKeys(req.keys());
-
-            completionCb.apply(req, res);
-        }
-        else {
-            // If there are backups, map backup update future.
-            if (dhtFut != null)
-                dhtFut.map();
-                // Otherwise, complete the call.
-            else
-                completionCb.apply(req, res);
-        }
-
-        sendTtlUpdateRequest(expiry);
-    }
-
     /**
      * Executes local update after preloader fetched values.
      *
@@ -1512,17 +1333,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridNearAtomicUpdateRequest req,
         CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
-        List<KeyCacheObject> keys = req.keys();
-
-        if (keys.size() == 1) {
-            updateAllAsyncInternal1(nodeId, req, completionCb);
-
-            return;
-        }
-
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
 
+        List<KeyCacheObject> keys = req.keys();
+
         assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
 
         GridDhtAtomicUpdateFuture dhtFut = null;
@@ -2387,248 +2202,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     }
 
     /**
-     * Updates locked entries one-by-one.
-     *
-     * @param node Originating node.
-     * @param hasNear {@code True} if originating node has near cache.
-     * @param req Update request.
-     * @param res Update response.
-     * @param entry Locked entries.
-     * @param ver Assigned update version.
-     * @param dhtFut Optional DHT future.
-     * @param completionCb Completion callback to invoke when DHT future is completed.
-     * @param replicate Whether DR is enabled for that cache.
-     * @param taskName Task name.
-     * @param expiry Expiry policy.
-     * @param sndPrevVal If {@code true} sends previous value to backups.
-     * @return Return value.
-     * @throws GridCacheEntryRemovedException Should be never thrown.
-     */
-    private UpdateSingleResult updateSingle(
-        ClusterNode node,
-        boolean hasNear,
-        GridNearAtomicUpdateRequest req,
-        GridNearAtomicUpdateResponse res,
-        GridDhtCacheEntry entry,
-        GridCacheVersion ver,
-        @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
-        boolean replicate,
-        String taskName,
-        @Nullable IgniteCacheExpiryPolicy expiry,
-        boolean sndPrevVal
-    ) throws GridCacheEntryRemovedException {
-        GridCacheReturn retVal = null;
-        Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted
= null;
-
-        KeyCacheObject k = req.keys().get(0);
-
-        AffinityTopologyVersion topVer = req.topologyVersion();
-
-        boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
-
-        boolean readersOnly = false;
-
-        boolean intercept = ctx.config().getInterceptor() != null;
-
-        boolean initLsnrs = false;
-        Map<UUID, CacheContinuousQueryListener> lsnrs = null;
-        boolean internal = false;
-
-        GridCacheOperation op = req.operation();
-
-        // We are holding java-level locks on entries at this point.
-        // No GridCacheEntryRemovedException can be thrown.
-        try {
-            if (entry != null) {
-                if (!initLsnrs) {
-                    internal = entry.isInternal() || !context().userCache();
-
-                    lsnrs = ctx.continuousQueries().updateListeners(internal, false);
-
-                    initLsnrs = true;
-                }
-
-                GridCacheVersion newConflictVer = req.conflictVersion(0);
-                long newConflictTtl = req.conflictTtl(0);
-                long newConflictExpireTime = req.conflictExpireTime(0);
-
-                assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
-
-                boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(),
entry.key(),
-                    req.topologyVersion());
-
-                Object writeVal = op == TRANSFORM ? req.entryProcessor(0) : req.writeValue(0);
-
-                Collection<UUID> readers = null;
-                Collection<UUID> filteredReaders = null;
-
-                if (checkReaders) {
-                    readers = entry.readers();
-                    filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
-                }
-
-                GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
-                    ver,
-                    node.id(),
-                    locNodeId,
-                    op,
-                    writeVal,
-                    req.invokeArguments(),
-                    primary && writeThrough() && !req.skipStore(),
-                    !req.skipStore(),
-                    lsnrs != null || sndPrevVal || req.returnValue(),
-                    req.keepBinary(),
-                    expiry,
-                    true,
-                    true,
-                    primary,
-                    ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in
CLOCK mode on primary node.
-                    topVer,
-                    req.filter(),
-                    replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
-                    newConflictTtl,
-                    newConflictExpireTime,
-                    newConflictVer,
-                    true,
-                    intercept,
-                    req.subjectId(),
-                    taskName,
-                    null,
-                    null);
-
-                if (dhtFut == null && !F.isEmpty(filteredReaders)) {
-                    dhtFut = createDhtFuture(ver, req, res, completionCb, true);
-
-                    readersOnly = true;
-                }
-
-                if (dhtFut != null) {
-                    dhtFut.listeners(lsnrs);
-
-                    if (updRes.sendToDht()) { // Send to backups even in case of remove-remove
scenarios.
-                        GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
-
-                        if (conflictCtx == null)
-                            newConflictVer = null;
-                        else if (conflictCtx.isMerge())
-                            newConflictVer = null; // Conflict version is discarded in case
of merge.
-
-                        EntryProcessor<Object, Object, Object> entryProcessor = null;
-
-                        if (!readersOnly) {
-                            dhtFut.addWriteEntry(entry,
-                                updRes.newValue(),
-                                entryProcessor,
-                                updRes.newTtl(),
-                                updRes.conflictExpireTime(),
-                                newConflictVer,
-                                sndPrevVal,
-                                updRes.oldValue(),
-                                updRes.updateCounter());
-                        }
-
-                        if (!F.isEmpty(filteredReaders))
-                            dhtFut.addNearWriteEntries(filteredReaders,
-                                entry,
-                                updRes.newValue(),
-                                entryProcessor,
-                                updRes.newTtl(),
-                                updRes.conflictExpireTime());
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Entry did not pass the filter or conflict resolution
(will skip write) " +
-                                "[entry=" + entry + ", filter=" + Arrays.toString(req.filter())
+ ']');
-                    }
-                }
-                else if (lsnrs != null && updRes.success()) {
-                    ctx.continuousQueries().onEntryUpdated(
-                        lsnrs,
-                        entry.key(),
-                        updRes.newValue(),
-                        updRes.oldValue(),
-                        internal,
-                        entry.partition(),
-                        primary,
-                        false,
-                        updRes.updateCounter(),
-                        topVer);
-                }
-
-                if (hasNear) {
-                    if (primary && updRes.sendToDht()) {
-                        if (!ctx.affinity().belongs(node, entry.partition(), topVer)) {
-                            // If put the same value as in request then do not need to send
it back.
-                            if (op == TRANSFORM || writeVal != updRes.newValue()) {
-                                res.addNearValue(0,
-                                    updRes.newValue(),
-                                    updRes.newTtl(),
-                                    updRes.conflictExpireTime());
-                            }
-                            else
-                                res.addNearTtl(0, updRes.newTtl(), updRes.conflictExpireTime());
-
-                            if (updRes.newValue() != null) {
-                                IgniteInternalFuture<Boolean> f = entry.addReader(node.id(),
req.messageId(), topVer);
-
-                                assert f == null : f;
-                            }
-                        }
-                        else if (F.contains(readers, node.id())) // Reader became primary
or backup.
-                            entry.removeReader(node.id(), req.messageId());
-                        else
-                            res.addSkippedIndex(0);
-                    }
-                    else
-                        res.addSkippedIndex(0);
-                }
-
-                if (updRes.removeVersion() != null) {
-                    if (deleted == null)
-                        deleted = new ArrayList<>(1);
-
-                    deleted.add(F.t(entry, updRes.removeVersion()));
-                }
-
-                if (op == TRANSFORM) {
-                    assert !req.returnValue();
-
-                    IgniteBiTuple<Object, Exception> compRes = updRes.computedResult();
-
-                    if (compRes != null && (compRes.get1() != null || compRes.get2()
!= null)) {
-                        if (retVal == null)
-                            retVal = new GridCacheReturn(node.isLocal());
-
-                        retVal.addEntryProcessResult(ctx,
-                            k,
-                            null,
-                            compRes.get1(),
-                            compRes.get2());
-                    }
-                }
-                else {
-                    // Create only once.
-                    if (retVal == null) {
-                        CacheObject ret = updRes.oldValue();
-
-                        retVal = new GridCacheReturn(ctx,
-                            node.isLocal(),
-                            req.keepBinary(),
-                            req.returnValue() ? ret : null,
-                            updRes.success());
-                    }
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            res.addFailedKey(k, e);
-        }
-
-        return new UpdateSingleResult(retVal, deleted, dhtFut);
-    }
-
-    /**
      * @param hasNear {@code True} if originating node has near cache.
      * @param firstEntryIdx Index of the first entry in the request keys collection.
      * @param entries Entries to update.
@@ -2994,60 +2567,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         }
     }
 
-    private GridDhtCacheEntry lockEntry(KeyCacheObject key, AffinityTopologyVersion topVer)
-        throws GridDhtInvalidPartitionException {
-        while (true) {
-            try {
-                GridDhtCacheEntry entry = entryExx(key, topVer);
-
-                GridUnsafe.monitorEnter(entry);
-
-                if (entry.obsolete())
-                    GridUnsafe.monitorExit(entry);
-                else
-                    return entry;
-            }
-            catch (GridDhtInvalidPartitionException e) {
-                // Ignore invalid partition exception in CLOCK ordering mode.
-                if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
-                    return null;
-                else
-                    throw e;
-            }
-        }
-    }
-
-    /**
-     * Releases java-level locks on cache entries.
-     *
-     * @param entry Locked entries.
-     * @param topVer Topology version.
-     */
-    private void unlockEntry(GridDhtCacheEntry entry, AffinityTopologyVersion topVer) {
-        if (entry == null)
-            return;
-
-        // Process deleted entries before locks release.
-        assert ctx.deferredDelete() : this;
-
-        // Entries to skip eviction manager notification for.
-        // Enqueue entries while holding locks.
-        boolean skip = false;
-
-        try {
-            if (entry.deleted())
-                skip = true;
-        }
-        finally {
-            GridUnsafe.monitorExit(entry);
-        }
-
-        entry.onUnlock();
-
-        if (!skip)
-            ctx.evicts().touch(entry, topVer);
-    }
-
     /**
      * Releases java-level locks on cache entries.
      *


Mime
View raw message