ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/3] ignite git commit: IGNITE-2926: WIP.
Date Thu, 14 Apr 2016 13:16:10 GMT
IGNITE-2926: WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/17080659
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/17080659
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/17080659

Branch: refs/heads/ignite-2926
Commit: 17080659b28ea02f6b95e63fb00dac7bd19c7add
Parents: 143ce34
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Apr 14 16:15:37 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Apr 14 16:15:37 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |   5 -
 .../GridAbstractNearAtomicUpdateFuture.java     | 187 ++++++++++++++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   5 -
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 247 +++----------------
 4 files changed, 227 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 359909e..c96d00f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -38,9 +38,4 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R>
{
      * @return Future or {@code null} if no need to wait.
      */
     public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer);
-
-    /**
-     * @return Future keys.
-     */
-    public Collection<?> keys();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
new file mode 100644
index 0000000..e40e8ca
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAbstractNearAtomicUpdateFuture.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * Base for near atomic update futures.
+ */
+public abstract class GridAbstractNearAtomicUpdateFuture extends GridFutureAdapter<Object>
+    implements GridCacheAtomicFuture<Object> {
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    protected static IgniteLogger log;
+
+    /** Cache context. */
+    protected final GridCacheContext cctx;
+
+    /** Cache. */
+    protected final GridDhtAtomicCache cache;
+
+    /** Write synchronization mode. */
+    protected final CacheWriteSynchronizationMode syncMode;
+
+    /** Update operation. */
+    protected final GridCacheOperation op;
+
+    /** Return value require flag. */
+    protected final boolean retval;
+
+    /** Raw return value flag. */
+    protected final boolean rawRetval;
+
+    /** Expiry policy. */
+    protected final ExpiryPolicy expiryPlc;
+
+    /** Optional filter. */
+    protected final CacheEntryPredicate[] filter;
+
+    /** Subject ID. */
+    protected final UUID subjId;
+
+    /** Task name hash. */
+    protected final int taskNameHash;
+
+    /** Skip store flag. */
+    protected final boolean skipStore;
+
+    /** Keep binary flag. */
+    protected final boolean keepBinary;
+
+    /** Wait for topology future flag. */
+    protected final boolean waitTopFut;
+
+    /** Fast map flag. */
+    protected final boolean fastMap;
+
+    /** Near cache flag. */
+    protected final boolean nearEnabled;
+
+    /** Mutex to synchronize state updates. */
+    protected final Object mux = new Object();
+
+    /**
+     * Constructor.
+     */
+    protected GridAbstractNearAtomicUpdateFuture(
+        GridCacheContext cctx,
+        GridDhtAtomicCache cache,
+        CacheWriteSynchronizationMode syncMode,
+        GridCacheOperation op,
+        boolean retval,
+        boolean rawRetval,
+        @Nullable ExpiryPolicy expiryPlc,
+        CacheEntryPredicate[] filter,
+        UUID subjId,
+        int taskNameHash,
+        boolean skipStore,
+        boolean keepBinary,
+        boolean waitTopFut
+    ) {
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+        this.cctx = cctx;
+        this.cache = cache;
+        this.syncMode = syncMode;
+        this.op = op;
+        this.retval = retval;
+        this.rawRetval = rawRetval;
+        this.expiryPlc = expiryPlc;
+        this.filter = filter;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.skipStore = skipStore;
+        this.keepBinary = keepBinary;
+        this.waitTopFut = waitTopFut;
+
+        fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode()
== FULL_SYNC &&
+            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+        nearEnabled = CU.isNearEnabled(cctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        // No-op.
+    }
+
+    /**
+     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+     */
+    protected boolean storeFuture() {
+        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+    }
+
+    /**
+     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can
assign version on near
+     * node and send updates in parallel to all participating nodes.
+     *
+     * @param key Key to map.
+     * @param topVer Topology version to map.
+     * @return Collection of nodes to which key is mapped.
+     */
+    protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion
topVer) {
+        GridCacheAffinityManager affMgr = cctx.affinity();
+
+        // If we can send updates in parallel - do it.
+        return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 9f52658..4721d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -212,11 +212,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         return null;
     }
 
-    /** {@inheritDoc} */
-    @Override public Collection<KeyCacheObject> keys() {
-        return keys;
-    }
-
     /**
      * @param entry Entry to map.
      * @param val Value to write.

http://git-wip-us.apache.org/repos/asf/ignite/blob/17080659/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9955df7..d68b4ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -23,10 +23,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
 import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -35,10 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
@@ -57,35 +52,17 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
  * DHT atomic cache near update future.
  */
-public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
-    implements GridCacheAtomicFuture<Object>{
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Logger. */
-    protected static IgniteLogger log;
-
-    /** Cache context. */
-    private final GridCacheContext cctx;
-
-    /** Cache. */
-    private GridDhtAtomicCache cache;
-
-    /** Update operation. */
-    private final GridCacheOperation op;
-
+public class GridNearAtomicUpdateFuture extends GridAbstractNearAtomicUpdateFuture {
     /** Keys */
     private Collection<?> keys;
 
@@ -104,51 +81,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
     private Collection<GridCacheVersion> conflictRmvVals;
 
-    /** Return value require flag. */
-    private final boolean retval;
-
-    /** Expiry policy. */
-    private final ExpiryPolicy expiryPlc;
-
-    /** Optional filter. */
-    private final CacheEntryPredicate[] filter;
-
-    /** Write synchronization mode. */
-    private final CacheWriteSynchronizationMode syncMode;
-
-    /** Raw return value flag. */
-    private final boolean rawRetval;
-
-    /** Fast map flag. */
-    private final boolean fastMap;
-
-    /** Near cache flag. */
-    private final boolean nearEnabled;
-
-    /** Subject ID. */
-    private final UUID subjId;
-
-    /** Task name hash. */
-    private final int taskNameHash;
-
     /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock.
*/
     private boolean topLocked;
 
-    /** Skip store flag. */
-    private final boolean skipStore;
-
-    /** */
-    private final boolean keepBinary;
-
-    /** Wait for topology future flag. */
-    private final boolean waitTopFut;
-
     /** Remap count. */
     private int remapCnt;
 
-    /** Mutex to synchronize state updates. */
-    private final Object mux = new Object();
-
     /** Current topology version. */
     private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
 
@@ -225,39 +163,19 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         int remapCnt,
         boolean waitTopFut
     ) {
-        this.rawRetval = rawRetval;
+        super(cctx, cache, syncMode, op, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
skipStore,
+            keepBinary, waitTopFut);
 
         assert vals == null || vals.size() == keys.size();
         assert conflictPutVals == null || conflictPutVals.size() == keys.size();
         assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
         assert subjId != null;
 
-        this.cctx = cctx;
-        this.cache = cache;
-        this.syncMode = syncMode;
-        this.op = op;
         this.keys = keys;
         this.vals = vals;
         this.invokeArgs = invokeArgs;
         this.conflictPutVals = conflictPutVals;
         this.conflictRmvVals = conflictRmvVals;
-        this.retval = retval;
-        this.expiryPlc = expiryPlc;
-        this.filter = filter;
-        this.subjId = subjId;
-        this.taskNameHash = taskNameHash;
-        this.skipStore = skipStore;
-        this.keepBinary = keepBinary;
-        this.waitTopFut = waitTopFut;
-
-        if (log == null)
-            log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
-
-        fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode()
== FULL_SYNC &&
-            cctx.config().getAtomicWriteOrderMode() == CLOCK &&
-            !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
-
-        nearEnabled = CU.isNearEnabled(cctx);
 
         if (!waitTopFut)
             remapCnt = 1;
@@ -266,30 +184,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
     @Override public GridCacheVersion version() {
         synchronized (mux) {
             return futVer;
         }
     }
 
-    /**
-     * @return {@code True} if this future should block partition map exchange.
-     */
-    private boolean waitForPartitionExchange() {
-        // Wait fast-map near atomic update futures in CLOCK mode.
-        return fastMap;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<?> keys() {
-        return keys;
-    }
-
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         GridNearAtomicUpdateResponse res = null;
@@ -323,16 +223,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         return false;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // No-op.
-    }
-
     /**
      * Performs future mapping.
      */
@@ -353,8 +243,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion
topVer) {
-        if (waitForPartitionExchange()) {
-            GridFutureAdapter<Void> fut = completeFuture0(topVer);
+        // Wait fast-map near atomic update futures in CLOCK mode.
+        if (fastMap) {
+            GridFutureAdapter<Void> fut;
+
+            synchronized (mux) {
+                if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer)
< 0) {
+                    if (topCompleteFut == null)
+                        topCompleteFut = new GridFutureAdapter<>();
+
+                    fut = topCompleteFut;
+                }
+                else
+                    fut = null;
+            }
 
             if (fut != null && isDone()) {
                 fut.onDone();
@@ -454,16 +356,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                     mapErrTopVer = req.topologyVersion();
             }
             else if (res.error() != null) {
-                if (res.failedKeys() != null)
-                    addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+                if (res.failedKeys() != null) {
+                    if (err == null)
+                        err = new CachePartialUpdateCheckedException(
+                            "Failed to update keys (retry update if possible).");
+
+                    Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+
+                    for (KeyCacheObject key : res.failedKeys())
+                        keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary,
false));
+
+                    err.add(keys, res.error(), req.topologyVersion());
+                }
             }
             else {
                 if (!req.fastMap() || req.hasPrimary()) {
                     GridCacheReturn ret = res.returnValue();
 
                     if (op == TRANSFORM) {
-                        if (ret != null)
-                            addInvokeResults(ret);
+                        if (ret != null) {
+                            assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+                            if (ret.value() != null) {
+                                if (opRes != null)
+                                    opRes.mergeEntryProcessResults(ret);
+                                else
+                                    opRes = ret;
+                            }
+                        }
                     }
                     else
                         opRes = ret;
@@ -677,35 +597,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     }
 
     /**
-     * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
-     */
-    private boolean storeFuture() {
-        return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
-    }
-
-    /**
-     * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can
assign version on near
-     * node and send updates in parallel to all participating nodes.
-     *
-     * @param key Key to map.
-     * @param topVer Topology version to map.
-     * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
-     * @return Collection of nodes to which key is mapped.
-     */
-    private Collection<ClusterNode> mapKey(
-        KeyCacheObject key,
-        AffinityTopologyVersion topVer,
-        boolean fastMap
-    ) {
-        GridCacheAffinityManager affMgr = cctx.affinity();
-
-        // If we can send updates in parallel - do it.
-        return fastMap ?
-            cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primary(key, topVer));
-    }
-
-    /**
      * Maps future to single node.
      *
      * @param nodeId Node ID.
@@ -914,26 +805,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     }
 
     /**
-     * @param topVer Topology version.
-     * @return Future.
-     */
-    @Nullable GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer)
{
-        synchronized (mux) {
-            if (this.topVer == AffinityTopologyVersion.ZERO)
-                return null;
-
-            if (this.topVer.compareTo(topVer) < 0) {
-                if (topCompleteFut == null)
-                    topCompleteFut = new GridFutureAdapter<>();
-
-                return topCompleteFut;
-            }
-
-            return null;
-        }
-    }
-
-    /**
      * @return Future version.
      */
     GridCacheVersion onFutureDone() {
@@ -1039,7 +910,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             if (op != TRANSFORM)
                 val = cctx.toCacheObject(val);
 
-            Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+            Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer);
 
             if (affNodes.isEmpty())
                 throw new ClusterTopologyServerNotFoundException("Failed to map keys for
cache " +
@@ -1189,44 +1060,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         return req;
     }
 
-    /**
-     * @param ret Result from single node.
-     */
-    @SuppressWarnings("unchecked")
-    private void addInvokeResults(GridCacheReturn ret) {
-        assert op == TRANSFORM : op;
-        assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
-        if (ret.value() != null) {
-            if (opRes != null)
-                opRes.mergeEntryProcessResults(ret);
-            else
-                opRes = ret;
-        }
-    }
-
-    /**
-     * @param failedKeys Failed keys.
-     * @param topVer Topology version for failed update.
-     * @param err Error cause.
-     */
-    private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
-        AffinityTopologyVersion topVer,
-        Throwable err) {
-        CachePartialUpdateCheckedException err0 = this.err;
-
-        if (err0 == null)
-            err0 = this.err =
-                new CachePartialUpdateCheckedException("Failed to update keys (retry update
if possible).");
-
-        Collection<Object> keys = new ArrayList<>(failedKeys.size());
-
-        for (KeyCacheObject key : failedKeys)
-            keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
-
-        err0.add(keys, err, topVer);
-    }
-
     /** {@inheritDoc} */
     public String toString() {
         synchronized (mux) {


Mime
View raw message