ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [35/50] [abbrv] ignite git commit: IGNITE-3074 Optimize DHT atomic update future
Date Tue, 22 Nov 2016 11:40:24 GMT
IGNITE-3074 Optimize DHT atomic update future


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88f38ac6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88f38ac6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88f38ac6

Branch: refs/heads/master
Commit: 88f38ac6305578946f2881b12d2d557bd561f67d
Parents: a24a394
Author: Konstantin Dudkov <kdudkov@ya.ru>
Authored: Mon Nov 21 15:11:09 2016 +0300
Committer: Konstantin Dudkov <kdudkov@ya.ru>
Committed: Mon Nov 21 15:11:09 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheEntryEx.java      |   4 +-
 .../processors/cache/GridCacheMapEntry.java     |   4 +-
 .../GridDhtAtomicAbstractUpdateFuture.java      | 461 +++++++++++++++++++
 .../dht/atomic/GridDhtAtomicCache.java          |  33 +-
 .../atomic/GridDhtAtomicSingleUpdateFuture.java | 121 +++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 393 +---------------
 .../atomic/GridNearAtomicFullUpdateRequest.java |  24 +-
 .../GridNearAtomicSingleUpdateFuture.java       |   4 +-
 .../continuous/CacheContinuousQueryHandler.java |   4 +-
 .../CacheContinuousQueryListener.java           |   4 +-
 .../continuous/CacheContinuousQueryManager.java |   6 +-
 .../processors/cache/GridCacheTestEntryEx.java  |   4 +-
 12 files changed, 638 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index ef6a244..176fe77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -27,7 +27,7 @@ import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -504,7 +504,7 @@ public interface GridCacheEntryEx {
         String taskName,
         @Nullable CacheObject prevVal,
         @Nullable Long updateCntr,
-        @Nullable GridDhtAtomicUpdateFuture fut
+        @Nullable GridDhtAtomicAbstractUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5996672..2bcf360 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -43,7 +43,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1951,7 +1951,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         final String taskName,
         @Nullable final CacheObject prevVal,
         @Nullable final Long updateCntr,
-        @Nullable GridDhtAtomicUpdateFuture fut
+        @Nullable GridDhtAtomicAbstractUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
         assert cctx.atomic();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
new file mode 100644
index 0000000..3bbc348
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * DHT atomic cache backup update future.
+ */
+public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapter<Void>
+    implements GridCacheAtomicFuture<Void> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger msgLog;
+
+    /** Write version. */
+    protected final GridCacheVersion writeVer;
+
+    /** Cache context. */
+    protected final GridCacheContext cctx;
+
+    /** Future version. */
+    protected final GridCacheVersion futVer;
+
+    /** Completion callback. */
+    @GridToStringExclude
+    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+
+    /** Update request. */
+    private final GridNearAtomicAbstractUpdateRequest updateReq;
+
+    /** Update response. */
+    final GridNearAtomicUpdateResponse updateRes;
+
+    /** Force transform backup flag. */
+    private boolean forceTransformBackups;
+
+    /** Mappings. */
+    @GridToStringInclude
+    protected Map<UUID, GridDhtAtomicUpdateRequest> mappings;
+
+    /** Continuous query closures. */
+    private Collection<CI1<Boolean>> cntQryClsrs;
+
+    /** */
+    private final boolean waitForExchange;
+
+    /** Response count. */
+    private volatile int resCnt;
+
+    /**
+     * @param cctx Cache context.
+     * @param completionCb Callback to invoke when future is completed.
+     * @param writeVer Write version.
+     * @param updateReq Update request.
+     * @param updateRes Update response.
+     */
+    protected GridDhtAtomicAbstractUpdateFuture(
+        GridCacheContext cctx,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridCacheVersion writeVer,
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes) {
+        this.cctx = cctx;
+
+        futVer = cctx.versions().next(updateReq.topologyVersion());
+        this.updateReq = updateReq;
+        this.completionCb = completionCb;
+        this.updateRes = updateRes;
+        this.writeVer = writeVer;
+
+        waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
+
+        if (log == null) {
+            msgLog = cctx.shared().atomicMessageLogger();
+            log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+        if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
+            return this;
+
+        return null;
+    }
+
+    /**
+     * @param clsr Continuous query closure.
+     */
+    public final void addContinuousQueryClosure(CI1<Boolean> clsr) {
+        assert !isDone() : this;
+
+        if (cntQryClsrs == null)
+            cntQryClsrs = new ArrayList<>(10);
+
+        cntQryClsrs.add(clsr);
+    }
+
+    /**
+     * @param entry Entry to map.
+     * @param val Value to write.
+     * @param entryProcessor Entry processor.
+     * @param ttl TTL (optional).
+     * @param conflictExpireTime Conflict expire time (optional).
+     * @param conflictVer Conflict version (optional).
+     * @param addPrevVal If {@code true} sends previous value to backups.
+     * @param prevVal Previous value.
+     * @param updateCntr Partition update counter.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    final void addWriteEntry(GridDhtCacheEntry entry,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long conflictExpireTime,
+        @Nullable GridCacheVersion conflictVer,
+        boolean addPrevVal,
+        @Nullable CacheObject prevVal,
+        long updateCntr) {
+        AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+
+        if (log.isDebugEnabled())
+            log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
+
+        CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+        addDhtKey(entry.key(), dhtNodes);
+
+        for (int i = 0; i < dhtNodes.size(); i++) {
+            ClusterNode node = dhtNodes.get(i);
+
+            UUID nodeId = node.id();
+
+            if (!nodeId.equals(cctx.localNodeId())) {
+                GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+
+                if (updateReq == null) {
+                    updateReq = new GridDhtAtomicUpdateRequest(
+                        cctx.cacheId(),
+                        nodeId,
+                        futVer,
+                        writeVer,
+                        syncMode,
+                        topVer,
+                        forceTransformBackups,
+                        this.updateReq.subjectId(),
+                        this.updateReq.taskNameHash(),
+                        forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                        cctx.deploymentEnabled(),
+                        this.updateReq.keepBinary(),
+                        this.updateReq.skipStore());
+
+                    mappings.put(nodeId, updateReq);
+                }
+
+                updateReq.addWriteValue(entry.key(),
+                    val,
+                    entryProcessor,
+                    ttl,
+                    conflictExpireTime,
+                    conflictVer,
+                    addPrevVal,
+                    entry.partition(),
+                    prevVal,
+                    updateCntr);
+            }
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param dhtNodes DHT nodes.
+     */
+    protected abstract void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes);
+
+    /**
+     * @param key Key.
+     * @param readers Near cache readers.
+     */
+    protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> readers);
+
+    /**
+     * @param readers Entry readers.
+     * @param entry Entry.
+     * @param val Value.
+     * @param entryProcessor Entry processor..
+     * @param ttl TTL for near cache update (optional).
+     * @param expireTime Expire time for near cache update (optional).
+     */
+    final void addNearWriteEntries(Collection<UUID> readers,
+        GridDhtCacheEntry entry,
+        @Nullable CacheObject val,
+        EntryProcessor<Object, Object, Object> entryProcessor,
+        long ttl,
+        long expireTime) {
+        CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+        addNearKey(entry.key(), readers);
+
+        AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+        for (UUID nodeId : readers) {
+            GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+
+            if (updateReq == null) {
+                ClusterNode node = cctx.discovery().node(nodeId);
+
+                // Node left the grid.
+                if (node == null)
+                    continue;
+
+                updateReq = new GridDhtAtomicUpdateRequest(
+                    cctx.cacheId(),
+                    nodeId,
+                    futVer,
+                    writeVer,
+                    syncMode,
+                    topVer,
+                    forceTransformBackups,
+                    this.updateReq.subjectId(),
+                    this.updateReq.taskNameHash(),
+                    forceTransformBackups ? this.updateReq.invokeArguments() : null,
+                    cctx.deploymentEnabled(),
+                    this.updateReq.keepBinary(),
+                    this.updateReq.skipStore());
+
+                mappings.put(nodeId, updateReq);
+            }
+
+            addNearReaderEntry(entry);
+
+            updateReq.addNearWriteValue(entry.key(),
+                val,
+                entryProcessor,
+                ttl,
+                expireTime);
+        }
+    }
+
+    /**
+     * adds new nearReader.
+     *
+     * @param entry GridDhtCacheEntry.
+     */
+    protected abstract void addNearReaderEntry(GridDhtCacheEntry entry);
+
+    /**
+     * @return Write version.
+     */
+    final GridCacheVersion writeVersion() {
+        return writeVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteUuid futureId() {
+        return futVer.asGridUuid();
+    }
+
+    /** {@inheritDoc} */
+    @Override public final GridCacheVersion version() {
+        return futVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final boolean onNodeLeft(UUID nodeId) {
+        boolean res = registerResponse(nodeId);
+
+        if (res && msgLog.isDebugEnabled()) {
+            msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
+                ", node=" + nodeId + ']');
+        }
+
+        return res;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @return {@code True} if request found.
+     */
+    final boolean registerResponse(UUID nodeId) {
+        int resCnt0;
+
+        GridDhtAtomicUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
+
+        if (req != null) {
+            synchronized (this) {
+                if (req.onResponse()) {
+                    resCnt0 = resCnt;
+
+                    resCnt0 += 1;
+
+                    resCnt = resCnt0;
+                }
+                else
+                    return false;
+            }
+
+            if (resCnt0 == mappings.size())
+                onDone();
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Sends requests to remote nodes.
+     */
+    final void map() {
+        if (!F.isEmpty(mappings)) {
+            for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+                try {
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DTH update fut, sent request [futId=" + futVer +
+                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+                    }
+                }
+                catch (ClusterTopologyCheckedException ignored) {
+                    if (msgLog.isDebugEnabled()) {
+                        msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
+                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+                    }
+
+                    registerResponse(req.nodeId());
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(msgLog, "Failed to send request [futId=" + futVer +
+                        ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+
+                    registerResponse(req.nodeId());
+                }
+            }
+        }
+        else
+            onDone();
+
+        // Send response right away if no ACKs from backup is required.
+        // Backups will send ACKs anyway, future will be completed after all backups have replied.
+        if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+            completionCb.apply(updateReq, updateRes);
+    }
+
+    /**
+     * Deferred update response.
+     *
+     * @param nodeId Backup node ID.
+     */
+    public final void onResult(UUID nodeId) {
+        if (log.isDebugEnabled())
+            log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
+
+        registerResponse(nodeId);
+    }
+
+    /**
+     * Callback for backup update response.
+     *
+     * @param nodeId Backup node ID.
+     * @param updateRes Update response.
+     */
+    public abstract void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes);
+
+    /**
+     * @param updateRes Response.
+     * @param err Error.
+     */
+    protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
+
+    /** {@inheritDoc} */
+    @Override public final boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+        if (super.onDone(res, err)) {
+            cctx.mvcc().removeAtomicFuture(version());
+
+            boolean suc = err == null;
+
+            if (!suc)
+                addFailedKeys(updateRes, err);
+
+            if (cntQryClsrs != null) {
+                for (CI1<Boolean> clsr : cntQryClsrs)
+                    clsr.apply(suc);
+            }
+
+            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+                completionCb.apply(updateReq, updateRes);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f7d1973..d7eb062 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1685,7 +1685,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
 
-        GridDhtAtomicUpdateFuture dhtFut = null;
+        GridDhtAtomicAbstractUpdateFuture dhtFut = null;
 
         boolean remap = false;
 
@@ -1908,7 +1908,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         final GridNearAtomicUpdateResponse res,
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
-        @Nullable GridDhtAtomicUpdateFuture dhtFut,
+        @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
         final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final boolean replicate,
         final String taskName,
@@ -2331,7 +2331,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         GridNearAtomicUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
-        @Nullable GridDhtAtomicUpdateFuture dhtFut,
+        @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
         CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
@@ -2552,7 +2552,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @return Deleted entries.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Nullable private GridDhtAtomicUpdateFuture updatePartialBatch(
+    @Nullable private GridDhtAtomicAbstractUpdateFuture updatePartialBatch(
         final boolean hasNear,
         final int firstEntryIdx,
         final List<GridDhtCacheEntry> entries,
@@ -2562,7 +2562,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         @Nullable final Map<KeyCacheObject, CacheObject> putMap,
         @Nullable final Collection<KeyCacheObject> rmvKeys,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
-        @Nullable GridDhtAtomicUpdateFuture dhtFut,
+        @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
         final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final GridNearAtomicAbstractUpdateRequest req,
         final GridNearAtomicUpdateResponse res,
@@ -3036,7 +3036,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      * @param force If {@code true} then creates future without optimizations checks.
      * @return Backup update future or {@code null} if there are no backups.
      */
-    @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
+    @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes,
@@ -3064,7 +3064,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
 
-        return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+        if (updateReq.size() == 1)
+            return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+        else
+            return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
     }
 
     /**
@@ -3256,7 +3259,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-        GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+        GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
 
         if (updateFut != null) {
             if (msgLog.isDebugEnabled()) {
@@ -3279,7 +3282,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     @SuppressWarnings("unchecked")
     private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
         for (GridCacheVersion ver : res.futureVersions()) {
-            GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
+            GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver);
 
             if (updateFut != null) {
                 if (msgLog.isDebugEnabled()) {
@@ -3335,7 +3338,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted;
 
         /** */
-        private final GridDhtAtomicUpdateFuture dhtFut;
+        private final GridDhtAtomicAbstractUpdateFuture dhtFut;
 
         /**
          * @param retVal Return value.
@@ -3344,7 +3347,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
          */
         private UpdateSingleResult(GridCacheReturn retVal,
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted,
-            GridDhtAtomicUpdateFuture dhtFut) {
+            GridDhtAtomicAbstractUpdateFuture dhtFut) {
             this.retVal = retVal;
             this.deleted = deleted;
             this.dhtFut = dhtFut;
@@ -3367,7 +3370,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /**
          * @return DHT future.
          */
-        public GridDhtAtomicUpdateFuture dhtFuture() {
+        public GridDhtAtomicAbstractUpdateFuture dhtFuture() {
             return dhtFut;
         }
     }
@@ -3380,7 +3383,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted;
 
         /** */
-        private GridDhtAtomicUpdateFuture dhtFut;
+        private GridDhtAtomicAbstractUpdateFuture dhtFut;
 
         /** */
         private boolean readersOnly;
@@ -3414,7 +3417,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /**
          * @return DHT future.
          */
-        public GridDhtAtomicUpdateFuture dhtFuture() {
+        public GridDhtAtomicAbstractUpdateFuture dhtFuture() {
             return dhtFut;
         }
 
@@ -3435,7 +3438,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         /**
          * @param dhtFut DHT future.
          */
-        private void dhtFuture(@Nullable GridDhtAtomicUpdateFuture dhtFut) {
+        private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
             this.dhtFut = dhtFut;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..f83a7b7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Future keys. */
+    private KeyCacheObject key;
+
+    /** Entries with readers. */
+    private GridDhtCacheEntry nearReaderEntry;
+
+    /**
+     * @param cctx Cache context.
+     * @param completionCb Callback to invoke when future is completed.
+     * @param writeVer Write version.
+     * @param updateReq Update request.
+     * @param updateRes Update response.
+     */
+    GridDhtAtomicSingleUpdateFuture(
+        GridCacheContext cctx,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridCacheVersion writeVer,
+        GridNearAtomicAbstractUpdateRequest updateReq,
+        GridNearAtomicUpdateResponse updateRes
+    ) {
+        super(cctx, completionCb, writeVer, updateReq, updateRes);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
+        assert this.key == null || this.key.equals(key) : this.key;
+
+        if (mappings == null)
+            mappings = U.newHashMap(dhtNodes.size());
+
+        this.key = key;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) {
+        assert this.key == null || this.key.equals(key) : this.key;
+
+        if (mappings == null)
+            mappings = U.newHashMap(readers.size());
+
+        this.key = key;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) {
+        nearReaderEntry = entry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
+        if (log.isDebugEnabled())
+            log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
+
+        if (updateRes.error() != null)
+            this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error());
+
+        if (!F.isEmpty(updateRes.nearEvicted())) {
+            try {
+                assert nearReaderEntry != null;
+
+                nearReaderEntry.removeReader(nodeId, updateRes.messageId());
+            }
+            catch (GridCacheEntryRemovedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Entry with evicted reader was removed [entry=" + nearReaderEntry + ", err=" + e + ']');
+            }
+        }
+
+        registerResponse(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+        updateRes.addFailedKey(key, err);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index c2ad8b8..864aadd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -23,92 +23,30 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
  * DHT atomic cache backup update future.
  */
-public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
-    implements GridCacheAtomicFuture<Void> {
+class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Logger. */
-    private static IgniteLogger log;
-
-    /** Logger. */
-    private static IgniteLogger msgLog;
-
-    /** Cache context. */
-    private final GridCacheContext cctx;
-
-    /** Future version. */
-    private final GridCacheVersion futVer;
-
-    /** Write version. */
-    private final GridCacheVersion writeVer;
-
-    /** Force transform backup flag. */
-    private boolean forceTransformBackups;
-
-    /** Completion callback. */
-    @GridToStringExclude
-    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
-
-    /** Mappings. */
-    @GridToStringInclude
-    private final Map<UUID, GridDhtAtomicUpdateRequest> mappings;
-
-    /** Entries with readers. */
-    private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
-
-    /** Update request. */
-    private final GridNearAtomicAbstractUpdateRequest updateReq;
-
-    /** Update response. */
-    private final GridNearAtomicUpdateResponse updateRes;
-
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
 
-    /** Continuous query closures. */
-    private Collection<CI1<Boolean>> cntQryClsrs;
-
-    /** */
-    private final boolean waitForExchange;
+    /** Entries with readers. */
+    private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
-    /** Response count. */
-    private volatile int resCnt;
 
     /**
      * @param cctx Cache context.
@@ -117,328 +55,39 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param updateReq Update request.
      * @param updateRes Update response.
      */
-    public GridDhtAtomicUpdateFuture(
+    GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
         CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
         GridNearAtomicUpdateResponse updateRes
     ) {
-        this.cctx = cctx;
-        this.writeVer = writeVer;
-
-        futVer = cctx.versions().next(updateReq.topologyVersion());
-        this.updateReq = updateReq;
-        this.completionCb = completionCb;
-        this.updateRes = updateRes;
-
-        if (log == null) {
-            msgLog = cctx.shared().atomicMessageLogger();
-            log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
-        }
+        super(cctx, completionCb, writeVer, updateReq, updateRes);
 
         keys = new ArrayList<>(updateReq.size());
         mappings = U.newHashMap(updateReq.size());
-
-        waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
-    }
-
-    /**
-     * @return Write version.
-     */
-    GridCacheVersion writeVersion() {
-        return writeVer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futVer.asGridUuid();
     }
 
     /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return futVer;
+    @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
+        keys.add(key);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        boolean res = registerResponse(nodeId);
-
-        if (res && msgLog.isDebugEnabled()) {
-            msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
-                ", node=" + nodeId + ']');
-        }
-
-        return res;
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @return {@code True} if request found.
-     */
-    private boolean registerResponse(UUID nodeId) {
-        int resCnt0;
-
-        GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
-
-        if (req != null) {
-            synchronized (this) {
-                if (req.onResponse()) {
-                    resCnt0 = resCnt;
-
-                    resCnt0 += 1;
-
-                    resCnt = resCnt0;
-                }
-                else
-                    return false;
-            }
-
-            if (resCnt0 == mappings.size())
-                onDone();
-
-            return true;
-        }
-
-        return false;
+    @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) {
+        keys.add(key);
     }
 
     /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return true;
-    }
+    @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) {
+        if (nearReadersEntries == null)
+            nearReadersEntries = new HashMap<>();
 
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // No-op.
+        nearReadersEntries.put(entry.key(), entry);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
-        if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
-            return this;
-
-        return null;
-    }
-
-    /**
-     * @param entry Entry to map.
-     * @param val Value to write.
-     * @param entryProcessor Entry processor.
-     * @param ttl TTL (optional).
-     * @param conflictExpireTime Conflict expire time (optional).
-     * @param conflictVer Conflict version (optional).
-     * @param addPrevVal If {@code true} sends previous value to backups.
-     * @param prevVal Previous value.
-     * @param updateCntr Partition update counter.
-     */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    public void addWriteEntry(GridDhtCacheEntry entry,
-        @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
-        long ttl,
-        long conflictExpireTime,
-        @Nullable GridCacheVersion conflictVer,
-        boolean addPrevVal,
-        @Nullable CacheObject prevVal,
-        long updateCntr) {
-        AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
-        List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
-
-        if (log.isDebugEnabled())
-            log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
-
-        CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
-
-        keys.add(entry.key());
-
-        for (int i = 0; i < dhtNodes.size(); i++) {
-            ClusterNode node = dhtNodes.get(i);
-
-            UUID nodeId = node.id();
-
-            if (!nodeId.equals(cctx.localNodeId())) {
-                GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
-
-                if (updateReq == null) {
-                    updateReq = new GridDhtAtomicUpdateRequest(
-                        cctx.cacheId(),
-                        nodeId,
-                        futVer,
-                        writeVer,
-                        syncMode,
-                        topVer,
-                        forceTransformBackups,
-                        this.updateReq.subjectId(),
-                        this.updateReq.taskNameHash(),
-                        forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                        cctx.deploymentEnabled(),
-                        this.updateReq.keepBinary(),
-                        this.updateReq.skipStore());
-
-                    mappings.put(nodeId, updateReq);
-                }
-
-                updateReq.addWriteValue(entry.key(),
-                    val,
-                    entryProcessor,
-                    ttl,
-                    conflictExpireTime,
-                    conflictVer,
-                    addPrevVal,
-                    entry.partition(),
-                    prevVal,
-                    updateCntr);
-            }
-        }
-    }
-
-    /**
-     * @param readers Entry readers.
-     * @param entry Entry.
-     * @param val Value.
-     * @param entryProcessor Entry processor..
-     * @param ttl TTL for near cache update (optional).
-     * @param expireTime Expire time for near cache update (optional).
-     */
-    public void addNearWriteEntries(Iterable<UUID> readers,
-        GridDhtCacheEntry entry,
-        @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
-        long ttl,
-        long expireTime) {
-        CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
-
-        keys.add(entry.key());
-
-        AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
-        for (UUID nodeId : readers) {
-            GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
-
-            if (updateReq == null) {
-                ClusterNode node = cctx.discovery().node(nodeId);
-
-                // Node left the grid.
-                if (node == null)
-                    continue;
-
-                updateReq = new GridDhtAtomicUpdateRequest(
-                    cctx.cacheId(),
-                    nodeId,
-                    futVer,
-                    writeVer,
-                    syncMode,
-                    topVer,
-                    forceTransformBackups,
-                    this.updateReq.subjectId(),
-                    this.updateReq.taskNameHash(),
-                    forceTransformBackups ? this.updateReq.invokeArguments() : null,
-                    cctx.deploymentEnabled(),
-                    this.updateReq.keepBinary(),
-                    this.updateReq.skipStore());
-
-                mappings.put(nodeId, updateReq);
-            }
-
-            if (nearReadersEntries == null)
-                nearReadersEntries = new HashMap<>();
-
-            nearReadersEntries.put(entry.key(), entry);
-
-            updateReq.addNearWriteValue(entry.key(),
-                val,
-                entryProcessor,
-                ttl,
-                expireTime);
-        }
-    }
-
-    /**
-     * @param clsr Continuous query closure.
-     */
-    public void addContinuousQueryClosure(CI1<Boolean> clsr){
-        assert !isDone() : this;
-
-        if (cntQryClsrs == null)
-            cntQryClsrs = new ArrayList<>(10);
-
-        cntQryClsrs.add(clsr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
-        if (super.onDone(res, err)) {
-            cctx.mvcc().removeAtomicFuture(version());
-
-            boolean suc = err == null;
-
-            if (!suc) {
-                for (KeyCacheObject key : keys)
-                    updateRes.addFailedKey(key, err);
-            }
-
-            if (cntQryClsrs != null) {
-                for (CI1<Boolean> clsr : cntQryClsrs)
-                    clsr.apply(suc);
-            }
-
-            if (updateReq.writeSynchronizationMode() == FULL_SYNC)
-                completionCb.apply(updateReq, updateRes);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Sends requests to remote nodes.
-     */
-    public void map() {
-        if (!mappings.isEmpty()) {
-            for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                try {
-                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, sent request [futId=" + futVer +
-                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-                    }
-                }
-                catch (ClusterTopologyCheckedException ignored) {
-                    if (msgLog.isDebugEnabled()) {
-                        msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
-                            ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-                    }
-
-                    registerResponse(req.nodeId());
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(msgLog, "Failed to send request [futId=" + futVer +
-                        ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-
-                    registerResponse(req.nodeId());
-                }
-            }
-        }
-        else
-            onDone();
-
-        // Send response right away if no ACKs from backup is required.
-        // Backups will send ACKs anyway, future will be completed after all backups have replied.
-        if (updateReq.writeSynchronizationMode() != FULL_SYNC)
-            completionCb.apply(updateReq, updateRes);
-    }
-
-    /**
-     * Callback for backup update response.
-     *
-     * @param nodeId Backup node ID.
-     * @param updateRes Update response.
-     */
-    public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
+    @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
         if (log.isDebugEnabled())
             log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
 
@@ -462,16 +111,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         registerResponse(nodeId);
     }
 
-    /**
-     * Deferred update response.
-     *
-     * @param nodeId Backup node ID.
-     */
-    public void onResult(UUID nodeId) {
-        if (log.isDebugEnabled())
-            log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
-
-        registerResponse(nodeId);
+    /** {@inheritDoc} */
+    @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+        for (KeyCacheObject key : keys)
+            updateRes.addFailedKey(key, err);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index e2314f8..b733d7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -487,44 +487,32 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
         return invokeArgs;
     }
 
-    /**
-     * @return Flag indicating whether this is fast-map udpate.
-     */
+    /** {@inheritDoc} */
     @Override public boolean fastMap() {
         return fastMap;
     }
 
-    /**
-     * @return Topology locked flag.
-     */
+    /** {@inheritDoc} */
     @Override public boolean topologyLocked() {
         return topLocked;
     }
 
-    /**
-     * @return {@code True} if request sent from client node.
-     */
+    /** {@inheritDoc} */
     @Override public boolean clientRequest() {
         return clientReq;
     }
 
-    /**
-     * @return Return value flag.
-     */
+    /** {@inheritDoc} */
     @Override public boolean returnValue() {
         return retval;
     }
 
-    /**
-     * @return Skip write-through to a persistent storage.
-     */
+    /** {@inheritDoc} */
     @Override public boolean skipStore() {
         return skipStore;
     }
 
-    /**
-     * @return Keep binary flag.
-     */
+    /** {@inheritDoc} */
     @Override public boolean keepBinary() {
         return keepBinary;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index eaf2f2c..bd231cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -667,9 +667,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @return {@code True} can use 'single' update requests.
      */
     private boolean canUseSingleRequest(ClusterNode node) {
-        assert node != null;
-
-        return expiryPlc == null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
+        return expiryPlc == null && node != null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 304d031..10784fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -59,7 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
@@ -382,7 +382,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             @Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt,
                 boolean primary,
                 final boolean recordIgniteEvt,
-                GridDhtAtomicUpdateFuture fut) {
+                GridDhtAtomicAbstractUpdateFuture fut) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
                     return ;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 8eca81c..84b22f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import java.util.Map;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -41,7 +41,7 @@ public interface CacheContinuousQueryListener<K, V> {
      * @param fut Dht atomic future.
      */
     public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
-        boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut);
+        boolean recordIgniteEvt, @Nullable GridDhtAtomicAbstractUpdateFuture fut);
 
     /**
      * Listener unregistered callback.

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 2863f3d..e2fbf52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -56,7 +56,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -245,7 +245,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean primary,
         boolean preload,
         long updateCntr,
-        @Nullable GridDhtAtomicUpdateFuture fut,
+        @Nullable GridDhtAtomicAbstractUpdateFuture fut,
         AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
         Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
@@ -290,7 +290,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean primary,
         boolean preload,
         long updateCntr,
-        @Nullable GridDhtAtomicUpdateFuture fut,
+        @Nullable GridDhtAtomicAbstractUpdateFuture fut,
         AffinityTopologyVersion topVer)
         throws IgniteCheckedException
     {

http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index bf543cb..396629a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -26,7 +26,7 @@ import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -542,7 +542,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
         String taskName,
         @Nullable CacheObject prevVal,
         @Nullable Long updateCntr,
-        @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException,
+        @Nullable GridDhtAtomicAbstractUpdateFuture fut) throws IgniteCheckedException,
         GridCacheEntryRemovedException {
         assert false;
 


Mime
View raw message