ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [5/5] ignite git commit: ignite-4705
Date Mon, 20 Feb 2017 11:33:48 GMT
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b979880
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b979880
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b979880

Branch: refs/heads/ignite-4705
Commit: 7b979880cd1f987d9e707627339f3cb8890ceb30
Parents: 743f8b4
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Feb 20 12:43:47 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Feb 20 14:33:40 2017 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   2 +
 .../apache/ignite/internal/IgniteKernal.java    |   1 +
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheIoManager.java    |   3 -
 .../processors/cache/GridCacheMessage.java      |   2 +-
 .../processors/cache/GridCacheMvccManager.java  |   9 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      |  31 ++-
 .../dht/atomic/GridDhtAtomicCache.java          |  76 ++++++--
 .../GridDhtAtomicDeferredUpdateResponse.java    |   6 +
 .../atomic/GridDhtAtomicSingleUpdateFuture.java |   3 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   3 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |   6 +-
 .../atomic/GridNearAtomicMappingResponse.java   | 193 +++++++++++++++++++
 .../GridNearAtomicSingleUpdateFuture.java       |  34 +++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  15 +-
 15 files changed, 352 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 6636bf2..50876dd 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -196,6 +197,7 @@ public class MessageCodeGenerator {
         gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
         gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
         gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
+        gen.generateAndWrite(GridNearAtomicMappingResponse.class);
 
 //        gen.generateAndWrite(GridMessageCollection.class);
 //        gen.generateAndWrite(DataStreamerEntry.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 750c316..9f01615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1151,6 +1151,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
                                 "Metrics for local node (to disable set 'metricsLogFrequency'
to 0)" + NL +
                                 "    ^-- Node [id=" + id + ", name=" + name() + ", uptime="
+ getUpTimeFormatted() + "]" + NL +
                                 "    ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ",
CPUs=" + cpus + "]" + NL +
+                                "    ^-- Futures [futs=" + ctx.cache().context().mvcc().atomicFuturesCount()
+ "]" + NL +
                                 "    ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg="
+
                                 dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct)
+ "%]" + NL +
                                 "    ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) +
"MB, free=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5ed46ff..908d1d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -175,6 +176,11 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -47:
+                msg = new GridNearAtomicMappingResponse();
+
+                break;
+
             case -46:
                 msg = new UpdateErrors();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0f7371d..4d3ab1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -362,9 +362,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
             if (depEnabled)
                 cctx.deploy().ignoreOwnership(true);
 
-            if (!cacheMsg.partitionExchangeMessage())
-                log.info("Cache message: " + cacheMsg);
-
             unmarshall(nodeId, cacheMsg);
 
             if (cacheMsg.classError() != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 3ec5323..f3acf66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
     private static final long serialVersionUID = 0L;
 
     /** Maximum number of cache lookup indexes. */
-    public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 6;
+    public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7;
 
     /** Cache message index field name. */
     public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index e8a5c8d..7f9f18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -107,7 +107,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>>
mvccFuts = newMap();
 
     /** Pending atomic futures. */
-    private final ConcurrentMap<Long, GridCacheAtomicFuture<?>> atomicFuts =
new ConcurrentHashMap8<>();
+    private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts
= new ConcurrentHashMap8<>();
 
     /** Pending data streamer futures. */
     private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new
GridConcurrentHashSet<>();
@@ -453,6 +453,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
     }
 
     /**
+     * @return Collection of pending atomic futures.
+     */
+    public int atomicFuturesCount() {
+        return atomicFuts.size();
+    }
+
+    /**
      * @return Collection of pending data streamer futures.
      */
     public Collection<DataStreamerFuture> dataStreamerFutures() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index da6616b..b512bdc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -81,7 +82,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /** Completion callback. */
     @GridToStringExclude
-    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb;
+    private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
 
     /** Update request. */
     protected final GridNearAtomicAbstractUpdateRequest updateReq;
@@ -108,7 +109,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      */
     protected GridDhtAtomicAbstractUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes) {
@@ -367,9 +368,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         List<UUID> dhtNodes = null;
 
         if (fullSync) {
-            dhtNodes = new ArrayList<>(mappings.size());
+            if (!F.isEmpty(mappings)) {
+                dhtNodes = new ArrayList<>(mappings.size());
 
-            dhtNodes.addAll(mappings.keySet());
+                dhtNodes.addAll(mappings.keySet());
+            }
+            else
+                dhtNodes = Collections.emptyList();
 
             if (primaryReplyToNear)
                 updateRes.mapping(dhtNodes);
@@ -380,9 +385,25 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
             if (primaryReplyToNear)
                 completionCb.apply(updateReq, updateRes);
+            else {
+                if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR)
{
+                    GridNearAtomicMappingResponse mappingRes = new GridNearAtomicMappingResponse(
+                        cctx.cacheId(),
+                        updateReq.partition(),
+                        updateReq.futureId(),
+                        dhtNodes);
+
+                    try {
+                        cctx.io().send(updateRes.nodeId(), mappingRes, cctx.ioPolicy());
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(msgLog, "Failed to send mapping response [futId=" + futId
+
+                            ", writeVer=" + writeVer + ", node=" + updateRes.nodeId() + ']');
+                    }
+                }
+            }
         }
         else {
-            // Reply.
             completionCb.apply(updateReq, updateRes);
 
             onDone();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d402c86..73a5acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -35,6 +35,7 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
@@ -110,7 +111,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -137,9 +137,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
+    /** */
+    static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR = IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR",
false);
+
     /** Update reply closure. */
     @GridToStringExclude
-    private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
updateReplyClos;
+    private UpdateReplyClosure updateReplyClos;
 
     /** Pending */
     private GridDeferredAckMessageSender deferredUpdateMsgSnd;
@@ -208,7 +211,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     @Override protected void init() {
         super.init();
 
-        updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>()
{
+        updateReplyClos = new UpdateReplyClosure() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse
res) {
                 if (req.writeSynchronizationMode() != FULL_ASYNC)
@@ -231,6 +234,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     @Override public void start() throws IgniteCheckedException {
         super.start();
 
+        // TODO IGNITE-4705.
+        log.info("Atomic cache start [name=" + name() +
+            ", mode=" + configuration().getWriteSynchronizationMode() +
+            ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR +
']');
+
         deferredUpdateMsgSnd = new GridDeferredAckMessageSender<Long>(ctx.time(), ctx.closures())
{
             @Override public int getTimeout() {
                 return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
@@ -423,6 +431,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(),
+            GridNearAtomicMappingResponse.class,
+            new CI2<UUID, GridNearAtomicMappingResponse>() {
+                @Override public void apply(UUID uuid, GridNearAtomicMappingResponse msg)
{
+                    processDhtAtomicNearMappingResponse(uuid, msg);
+                }
+
+                @Override public String toString() {
+                    return "GridNearAtomicMappingResponse handler " +
+                        "[msgIdx=" + GridNearAtomicMappingResponse.CACHE_MSG_IDX + ']';
+                }
+            });
+
         if (near == null) {
             ctx.io().addHandler(
                 ctx.cacheId(),
@@ -1697,10 +1718,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param req Update request.
      * @param completionCb Completion callback.
      */
-    public void updateAllAsyncInternal(
+    void updateAllAsyncInternal(
         final UUID nodeId,
         final GridNearAtomicAbstractUpdateRequest req,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb
+        final UpdateReplyClosure completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
 
@@ -1749,7 +1770,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      */
     private void onForceKeysError(final UUID nodeId,
         final GridNearAtomicAbstractUpdateRequest req,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb,
+        final UpdateReplyClosure completionCb,
         IgniteCheckedException e
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
@@ -1772,7 +1793,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     private void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicAbstractUpdateRequest req,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        UpdateReplyClosure completionCb
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId, req.futureId(),
             ctx.deploymentEnabled());
@@ -1997,7 +2018,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb,
+        final GridDhtAtomicCache.UpdateReplyClosure completionCb,
         final boolean replicate,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
@@ -2420,7 +2441,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2651,7 +2672,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         @Nullable final Collection<KeyCacheObject> rmvKeys,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>>
entryProcessorMap,
         @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb,
+        final GridDhtAtomicCache.UpdateReplyClosure completionCb,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
         final boolean replicate,
@@ -3133,7 +3154,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         boolean force
     ) {
         if (updateReq.size() == 1)
@@ -3370,6 +3391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     }
 
     /**
+     * @param primaryId Primary node ID.
      * @param req Request.
      * @param nearRes Response to send.
      */
@@ -3424,6 +3446,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param nodeId Node ID.
      * @param res Response.
      */
+    private void processDhtAtomicNearMappingResponse(UUID nodeId, GridNearAtomicMappingResponse
res) {
+        GridNearAtomicAbstractUpdateFuture updateFut =
+            (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+
+        if (updateFut != null) {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Received near mapping response [futId=" + res.futureId() +
+                    ", node=" + nodeId + ']');
+            }
+
+            updateFut.onMappingReceived(nodeId, res);
+        }
+        else {
+            if (msgLog.isDebugEnabled()) {
+                msgLog.debug("Failed to find future for near mapping response [futId=" +
res.futureId() +
+                    ", node=" + nodeId +
+                    ", res=" + res + ']');
+            }
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param res Response.
+     */
     private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res)
{
         GridNearAtomicAbstractUpdateFuture updateFut =
             (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
@@ -3663,4 +3710,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             return Collections.emptyList();
         }
     }
+
+    /**
+     *
+     */
+    static interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest,
GridNearAtomicUpdateResponse> {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index bd2bae0..671d516 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -147,4 +148,9 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage
implem
     @Override public byte fieldsCount() {
         return 4;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAtomicDeferredUpdateResponse.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 4ee90a0..1e63165 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -58,7 +57,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
      */
     GridDhtAtomicSingleUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 20c3d4f..dece1d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,7 +60,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
{
      */
     GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridDhtAtomicCache.UpdateReplyClosure completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 82f171d..15075c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -197,7 +197,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /**
      * Performs future mapping.
      */
-    public void map() {
+    public final void map() {
         AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
 
         if (topVer == null)
@@ -257,7 +257,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>()
{
+                new GridDhtAtomicCache.UpdateReplyClosure() {
                     @Override public void apply(GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
@@ -300,6 +300,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
 
     public abstract void onResult(UUID nodeId, GridDhtAtomicNearResponse res);
 
+    public abstract void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res);
+
     /**
      * @param req Request.
      * @param e Error.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
new file mode 100644
index 0000000..be1f0f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridNearAtomicMappingResponse extends GridCacheMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Message index. */
+    public static final int CACHE_MSG_IDX = nextIndexId();
+
+    /** */
+    private int part;
+
+    /** */
+    @GridDirectCollection(UUID.class)
+    private List<UUID> mapping;
+
+    /** */
+    private long futId;
+
+    /**
+     *
+     */
+    public GridNearAtomicMappingResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param part Partition.
+     * @param futId Future ID.
+     * @param mapping Mapping.
+     */
+    GridNearAtomicMappingResponse(int cacheId, int part, long futId, List<UUID> mapping)
{
+        assert part >= 0 : part;
+
+        this.cacheId = cacheId;
+        this.part = part;
+        this.futId = futId;
+        this.mapping = mapping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return part;
+    }
+
+    /**
+     * @return Mapping.
+     */
+    public List<UUID> mapping() {
+        return mapping;
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int lookupIndex() {
+        return CACHE_MSG_IDX;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -47;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 6;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeInt("part", part))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                part = reader.readInt("part");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridNearAtomicMappingResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridNearAtomicMappingResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 2016c98..8bfbe72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -217,6 +217,29 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (cctx.discovery().node(dhtNodeId) != null)
                 mapping.add(dhtNodeId);
         }
+
+        if (rcvd != null)
+            mapping.removeAll(rcvd);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res)
{
+        GridCacheReturn opRes0 = null;
+
+        synchronized (mux) {
+            if (futId == null || futId != res.futureId())
+                return;
+
+            if (mapping == null) {
+                initMapping(res.mapping());
+
+                if (mapping.isEmpty() && opRes != null)
+                    opRes0 = opRes;
+            }
+        }
+
+        if (opRes0 != null)
+            onDone(opRes0);
     }
 
     /** {@inheritDoc} */
@@ -251,11 +274,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
             if (opRes == null && res.hasResult())
                 opRes = res.result();
 
-            if (mapping.isEmpty() && opRes != null) {
+            if (mapping.isEmpty() && opRes != null)
                 opRes0 = opRes;
-
-                futId = null;
-            }
         }
 
         if (opRes0 != null)
@@ -324,12 +344,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 else
                     opRes = ret;
 
-                if (res.mapping() != null) {
+                if (res.mapping() != null)
                     initMapping(res.mapping());
-
-                    if (rcvd != null)
-                        mapping.removeAll(rcvd);
-                }
                 else
                     mapping = Collections.emptySet();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7b573b1..e135000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -256,6 +256,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     }
 
     /** {@inheritDoc} */
+    @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res)
{
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
     @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) {
         throw new UnsupportedOperationException();
     }
@@ -591,8 +596,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (locUpdate != null) {
             cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicFullUpdateRequest, GridNearAtomicUpdateResponse>()
{
-                    @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse
res) {
+                new GridDhtAtomicCache.UpdateReplyClosure() {
+                    @Override public void apply(GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
                 });
@@ -615,6 +620,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     void map(AffinityTopologyVersion topVer,
         Long futId,
         @Nullable Collection<KeyCacheObject> remapKeys) {
+        if (true) {
+            onDone(new IgniteCheckedException("Failed"));
+
+            return;
+        }
+
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {


Mime
View raw message