ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: WIP.
Date Tue, 26 Apr 2016 06:35:35 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2523-1-dht [created] 581f544f6


WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/581f544f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/581f544f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/581f544f

Branch: refs/heads/ignite-2523-1-dht
Commit: 581f544f6f8606dc28b25a631cbe225b23603b57
Parents: 96cc54c
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Mon Apr 25 09:01:53 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Mon Apr 25 09:01:53 2016 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 205 ++++++++++++-------
 .../GridNearAtomicAbstractUpdateRequest.java    |  16 ++
 .../GridNearAtomicSingleUpdateRequest.java      |  12 ++
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  10 +
 4 files changed, 167 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f412dfa..ac3f73f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1484,9 +1484,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId, req.futureVersion(),
             ctx.deploymentEnabled());
 
-        List<KeyCacheObject> keys = req.keys();
-
-        assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1);
+        assert !req.returnValue() || (req.operation() == TRANSFORM || req.keysCount() ==
1);
 
         GridDhtAtomicUpdateFuture dhtFut = null;
 
@@ -1499,9 +1497,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         try {
             // If batch store update is enabled, we need to lock all entries.
             // First, need to acquire locks on cache entries, then check filter.
-            List<GridDhtCacheEntry> locked = lockEntries(keys, req.topologyVersion());
+            List<GridDhtCacheEntry> lockedEntries;
+            GridDhtCacheEntry lockedEntry;
+
+            if (req.keysCount() == 1) {
+                lockedEntries = null;
+                lockedEntry = lockEntry(req);
+            }
+            else {
+                lockedEntries = lockEntries(req);
+                lockedEntry = null;
+            }
 
-            Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted
= null;
+            Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deletedEntries
= null;
+            IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> deletedEntry = null;
 
             try {
                 GridDhtPartitionTopology top = topology();
@@ -1510,7 +1519,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                 try {
                     if (top.stopping()) {
-                        res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform
cache operation " +
+                        // TODO: Add this method to request instead.
+                        res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed
to perform cache operation " +
                             "(cache is stopped): " + name()));
 
                         completionCb.apply(req, res);
@@ -1558,7 +1568,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         GridCacheReturn retVal = null;
 
-                        if (keys.size() > 1 &&                             //
Several keys ...
+                        if (req.keysCount() > 1 &&                           
 // Several keys ...
                             writeThrough() && !req.skipStore() &&       
  // and store is enabled ...
                             !ctx.store().isLocal() &&                      // and
this is not local store ...
                             !ctx.dr().receiveEnabled()                     // and no DR.
@@ -1568,7 +1578,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 hasNear,
                                 req,
                                 res,
-                                locked,
+                                lockedEntries,
                                 ver,
                                 dhtFut,
                                 completionCb,
@@ -1577,7 +1587,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 expiry,
                                 sndPrevVal);
 
-                            deleted = updRes.deleted();
+                            deletedEntries = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
 
                             if (req.operation() == TRANSFORM)
@@ -1588,7 +1598,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 hasNear,
                                 req,
                                 res,
-                                locked,
+                                lockedEntries,
                                 ver,
                                 dhtFut,
                                 completionCb,
@@ -1598,7 +1608,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                                 sndPrevVal);
 
                             retVal = updRes.returnValue();
-                            deleted = updRes.deleted();
+                            deletedEntries = updRes.deleted();
                             dhtFut = updRes.dhtFuture();
                         }
 
@@ -1627,15 +1637,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 e.printStackTrace();
             }
             finally {
-                if (locked != null)
-                    unlockEntries(locked, req.topologyVersion());
+                if (lockedEntry != null)
+                    unlockEntry(lockedEntry, req.topologyVersion());
+
+                if (lockedEntries != null)
+                    unlockEntries(lockedEntries, req.topologyVersion());
 
                 // Enqueue if necessary after locks release.
-                if (deleted != null) {
-                    assert !deleted.isEmpty();
+                if (deletedEntries != null) {
+                    assert !deletedEntries.isEmpty();
                     assert ctx.deferredDelete() : this;
 
-                    for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted)
+                    for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deletedEntries)
                         ctx.onDeferredDelete(e.get1(), e.get2());
                 }
             }
@@ -1653,7 +1666,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             // an attempt to use cleaned resources.
             U.error(log, "Unexpected exception during cache update", e);
 
-            res.addFailedKeys(keys, e);
+            res.addFailedKeys(req.keys(), e);
 
             completionCb.apply(req, res);
 
@@ -1666,7 +1679,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         if (remap) {
             assert dhtFut == null;
 
-            res.remapKeys(keys);
+            res.remapKeys(req.keys());
 
             completionCb.apply(req, res);
         }
@@ -2137,8 +2150,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridCacheReturn retVal = null;
         Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted
= null;
 
-        List<KeyCacheObject> keys = req.keys();
-
         AffinityTopologyVersion topVer = req.topologyVersion();
 
         boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer);
@@ -2148,8 +2159,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         boolean intercept = ctx.config().getInterceptor() != null;
 
         // Avoid iterator creation.
-        for (int i = 0; i < keys.size(); i++) {
-            KeyCacheObject k = keys.get(i);
+        for (int i = 0; i < req.keysCount(); i++) {
+            KeyCacheObject k = req.key(i);
 
             GridCacheOperation op = req.operation();
 
@@ -2284,7 +2295,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                 if (updRes.removeVersion() != null) {
                     if (deleted == null)
-                        deleted = new ArrayList<>(keys.size());
+                        deleted = new ArrayList<>(req.keysCount());
 
                     deleted.add(F.t(entry, updRes.removeVersion()));
                 }
@@ -2597,88 +2608,130 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     /**
      * Acquires java-level locks on cache entries. Returns collection of locked entries.
      *
-     * @param keys Keys to lock.
-     * @param topVer Topology version to lock on.
+     * @param req Request.
      * @return Collection of locked entries.
      * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If
exception is thrown,
      *      locks are released.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private List<GridDhtCacheEntry> lockEntries(List<KeyCacheObject> keys, AffinityTopologyVersion
topVer)
+    private GridDhtCacheEntry lockEntry(GridNearAtomicAbstractUpdateRequest req)
         throws GridDhtInvalidPartitionException {
-        if (keys.size() == 1) {
-            KeyCacheObject key = keys.get(0);
+        while (true) {
+            try {
+                GridDhtCacheEntry entry = entryExx(req.key(0), req.topologyVersion());
 
-            while (true) {
-                try {
-                    GridDhtCacheEntry entry = entryExx(key, topVer);
+                GridUnsafe.monitorEnter(entry);
 
-                    GridUnsafe.monitorEnter(entry);
+                if (entry.obsolete())
+                    GridUnsafe.monitorExit(entry);
+                else
+                    return entry;
+            }
+            catch (GridDhtInvalidPartitionException e) {
+                // Ignore invalid partition exception in CLOCK ordering mode.
+                if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+                    return null;
+                else
+                    throw e;
+            }
+        }
+    }
 
-                    if (entry.obsolete())
-                        GridUnsafe.monitorExit(entry);
-                    else
-                        return Collections.singletonList(entry);
+    /**
+     * Acquires java-level locks on cache entries. Returns collection of locked entries.
+     *
+     * @param req Request.
+     * @return Collection of locked entries.
+     * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If
exception is thrown,
+     *      locks are released.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest
req)
+        throws GridDhtInvalidPartitionException {
+        List<GridDhtCacheEntry> locked = new ArrayList<>(req.keysCount());
+
+        while (true) {
+            for (int i = 0; i < req.keysCount(); i++) {
+                KeyCacheObject key = req.key(i);
+
+                try {
+                    GridDhtCacheEntry entry = entryExx(key, req.topologyVersion());
+
+                    locked.add(entry);
                 }
                 catch (GridDhtInvalidPartitionException e) {
                     // Ignore invalid partition exception in CLOCK ordering mode.
                     if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
-                        return Collections.singletonList(null);
+                        locked.add(null);
                     else
                         throw e;
                 }
             }
-        }
-        else {
-            List<GridDhtCacheEntry> locked = new ArrayList<>(keys.size());
 
-            while (true) {
-                for (KeyCacheObject key : keys) {
-                    try {
-                        GridDhtCacheEntry entry = entryExx(key, topVer);
+            boolean retry = false;
 
-                        locked.add(entry);
-                    }
-                    catch (GridDhtInvalidPartitionException e) {
-                        // Ignore invalid partition exception in CLOCK ordering mode.
-                        if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
-                            locked.add(null);
-                        else
-                            throw e;
-                    }
-                }
+            for (int i = 0; i < locked.size(); i++) {
+                GridCacheMapEntry entry = locked.get(i);
 
-                boolean retry = false;
+                if (entry == null)
+                    continue;
 
-                for (int i = 0; i < locked.size(); i++) {
-                    GridCacheMapEntry entry = locked.get(i);
+                GridUnsafe.monitorEnter(entry);
 
-                    if (entry == null)
-                        continue;
+                if (entry.obsolete()) {
+                    // Unlock all locked.
+                    for (int j = 0; j <= i; j++) {
+                        if (locked.get(j) != null)
+                            GridUnsafe.monitorExit(locked.get(j));
+                    }
 
-                    GridUnsafe.monitorEnter(entry);
+                    // Clear entries.
+                    locked.clear();
 
-                    if (entry.obsolete()) {
-                        // Unlock all locked.
-                        for (int j = 0; j <= i; j++) {
-                            if (locked.get(j) != null)
-                                GridUnsafe.monitorExit(locked.get(j));
-                        }
+                    // Retry.
+                    retry = true;
 
-                        // Clear entries.
-                        locked.clear();
+                    break;
+                }
+            }
 
-                        // Retry.
-                        retry = true;
+            if (!retry)
+                return locked;
+        }
+    }
 
-                        break;
-                    }
-                }
+    /**
+     * Releases java-level locks on cache entries.
+     *
+     * @param locked Locked entries.
+     * @param topVer Topology version.
+     */
+    private void unlockEntry(GridDhtCacheEntry locked, AffinityTopologyVersion topVer) {
+        // Process deleted entries before locks release.
+        assert ctx.deferredDelete() : this;
 
-                if (!retry)
-                    return locked;
-            }
+        // Entries to skip eviction manager notification for.
+        // Enqueue entries while holding locks.
+        boolean skip = false;
+
+        try {
+            if (locked.deleted())
+                skip = true;
         }
+        finally {
+            // At least RuntimeException can be thrown by the code above when GridCacheContext
is cleaned and there is
+            // an attempt to use cleaned resources.
+            // That's why releasing locks in the finally block..
+            GridUnsafe.monitorExit(locked);
+        }
+
+        // Try evict partitions.
+        locked.onUnlock();
+
+        // Must touch all entries since update may have deleted entries.
+        // Eviction manager will remove empty entries.
+        if (!skip)
+            ctx.evicts().touch(locked, topVer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 37d9e45..cbf4a7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -131,6 +131,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     public abstract boolean keepBinary();
 
     /**
+     * Get key at the given index.
+     *
+     * @param idx Index.
+     * @return Key.
+     */
+    public abstract KeyCacheObject key(int idx);
+
+    /**
      * @return Keys for this update request.
      */
     // TODO
@@ -226,4 +234,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @param clearKeys If {@code true} clears keys.
      */
     public abstract void cleanup(boolean clearKeys);
+
+    /**
+     * Whether this is single=key request.
+     *
+     * @return {@code True} if this is a single-key request.
+     */
+    // TODO: Looks like we can remove it.
+    public abstract boolean isSingle();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 37d86d9..f62f512 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -347,6 +347,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
     }
 
     /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        assert idx == 0;
+
+        return key;
+    }
+
+    /** {@inheritDoc} */
     @Override public List<KeyCacheObject> keys() {
         return Collections.singletonList(key);
     }
@@ -494,6 +501,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isSingle() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/581f544f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index f061868..ed7d595 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -411,6 +411,11 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq
     }
 
     /** {@inheritDoc} */
+    @Override public KeyCacheObject key(int idx) {
+        return keys.get(idx);
+    }
+
+    /** {@inheritDoc} */
     @Override public List<KeyCacheObject> keys() {
         return keys;
     }
@@ -593,6 +598,11 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isSingle() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 


Mime
View raw message