ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/10] ignite git commit: tmp
Date Mon, 20 Mar 2017 11:27:12 GMT
tmp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e01204d1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e01204d1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e01204d1

Branch: refs/heads/ignite-4680-sb
Commit: e01204d14369139e9e5e329d4971c499a2b58d3f
Parents: 5cfea99
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Mar 20 13:13:11 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Mar 20 14:26:38 2017 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      | 14 +++++-----
 .../dht/atomic/GridDhtAtomicCache.java          | 27 +++++++++++++++-----
 .../atomic/GridNearAtomicFullUpdateRequest.java |  7 +++--
 3 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e01204d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 17ee298..fef1dbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -152,7 +152,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /**
      * @param affAssignment Affinity assignment.
-     * @param entry Entry to map.
+     * @param key Key to map.
      * @param val Value to write.
      * @param entryProcessor Entry processor.
      * @param ttl TTL (optional).
@@ -165,7 +165,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     @SuppressWarnings("ForLoopReplaceableByForEach")
     final void addWriteEntry(
         AffinityAssignment affAssignment,
-        GridDhtCacheEntry entry,
+        KeyCacheObject key,
         @Nullable CacheObject val,
         EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
@@ -176,19 +176,19 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         long updateCntr) {
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        List<ClusterNode> affNodes = affAssignment.get(entry.partition());
+        List<ClusterNode> affNodes = affAssignment.get(key.partition());
 
-        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(),
affAssignment, affNodes);
+        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(key.partition(), affAssignment,
affNodes);
 
         if (dhtNodes == null)
             dhtNodes = affNodes;
 
         if (log.isDebugEnabled())
-            log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry="
+ entry + ']');
+            log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry="
+ key + ']');
 
         CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
 
-        addDhtKey(entry.key(), dhtNodes);
+        addDhtKey(key, dhtNodes);
 
         for (int i = 0; i < dhtNodes.size(); i++) {
             ClusterNode node = dhtNodes.get(i);
@@ -212,7 +212,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     mappings.put(nodeId, updateReq);
                 }
 
-                updateReq.addWriteValue(entry.key(),
+                updateReq.addWriteValue(key,
                     val,
                     entryProcessor,
                     ttl,

http://git-wip-us.apache.org/repos/asf/ignite/blob/e01204d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 923b895..516a8a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1798,7 +1798,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         Map<Integer, int[]> stripemap = req.stripeMap();
 
-                        final GridDhtAtomicAbstractUpdateFuture fut = null;//createDhtFuture(null,
req, req.size());
+                        final GridDhtAtomicAbstractUpdateFuture fut = createDhtFuture(null,
req, req.size());
+
+                        ctx.mvcc().addAtomicFuture(fut.id(), fut);
 
                         final AffinityAssignment affAssignment = ctx.affinity().assignment(req.topologyVersion());
 
@@ -1901,8 +1903,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridNearAtomicAbstractUpdateRequest req,
         int[] stripeIdxs,
         UpdateReplyClosure completionCb) throws GridCacheEntryRemovedException {
-        fut = createDhtFuture(null, req, req.size());
-
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             node.id(),
             req.futureId(),
@@ -1941,7 +1941,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             res,
             locked,
             ver,
-            fut,
+            null,
             ctx.isDrEnabled(),
             null,
             null,
@@ -1960,7 +1960,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridNearAtomicUpdateResponse res0 = req.responseHelper().addResponse(res);
 
         if (res0 != null) {
-            //fut.onDone();
+            for (int i = 0; i < req.size(); i++) {
+                fut.addWriteEntry(affinityAssignment,
+                    req.key(i),
+                    req.value(i),
+                    null,
+                    0,
+                    0,
+                    null,
+                    false,
+                    null,
+                    1L);
+            }
+
+            fut.onDone();
 
             completionCb.apply(req, res);
         }
@@ -2526,7 +2539,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         dhtFut.addWriteEntry(
                             affAssignment,
-                            entry,
+                            entry.key(),
                             updRes.newValue(),
                             entryProcessor,
                             updRes.newTtl(),
@@ -2796,7 +2809,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                         dhtFut.addWriteEntry(
                             affAssignment,
-                            entry,
+                            entry.key(),
                             writeVal,
                             entryProcessor,
                             updRes.newTtl(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/e01204d1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 2b461ce..f7a767f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -231,7 +231,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
                 stripeCnt = new int[maxStripes];
 
             if (stripeMap == null)
-                stripeMap = new HashMap<>(maxStripes);
+                stripeMap = U.newHashMap(maxStripes);
 
             int[] idxs = stripeMap.get(stripe);
 
@@ -417,9 +417,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
             expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
 
         if (stripeMap != null && stripeCnt != null) {
-            for (Integer idx : stripeMap.keySet()) {
-                stripeMap.put(idx, Arrays.copyOf(stripeMap.get(idx), stripeCnt[idx]));
-            }
+            for (Map.Entry<Integer, int[]> e : stripeMap.entrySet())
+                e.setValue(Arrays.copyOf(e.getValue(), stripeCnt[e.getKey()]));
 
             stripeCnt = null;
         }


Mime
View raw message