ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5075
Date Wed, 17 May 2017 12:38:28 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 dc8e10259 -> 36b7037ab


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/36b7037a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/36b7037a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/36b7037a

Branch: refs/heads/ignite-5075
Commit: 36b7037ab64ec2ca64e2e994c6af0753256a62fa
Parents: dc8e102
Author: sboikov <sboikov@gridgain.com>
Authored: Wed May 17 15:38:19 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed May 17 15:38:19 2017 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtCacheAdapter.java    | 242 +++++++++++++++-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  17 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  17 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |  14 +-
 .../dht/preloader/GridDhtPreloader.java         | 280 +------------------
 5 files changed, 278 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 7789673..d2d59b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
-import java.util.AbstractSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,7 +25,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.Cache;
@@ -34,13 +32,15 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.CachePeekModes;
 import org.apache.ignite.internal.processors.cache.EntryGetResult;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable;
@@ -59,7 +59,9 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdat
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -70,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CI3;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
@@ -86,8 +88,11 @@ import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
+import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache adapter.
@@ -102,6 +107,30 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     /** Multi tx futures. */
     private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<>();
 
+    /** Force key futures. */
+    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts
= newMap();
+
+    /** */
+    private volatile boolean stopping;
+
+    /** Discovery listener. */
+    private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
+        @Override public void onEvent(Event evt) {
+            DiscoveryEvent e = (DiscoveryEvent)evt;
+
+            ClusterNode loc = ctx.localNode();
+
+            assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : e;
+
+            final ClusterNode n = e.eventNode();
+
+            assert !loc.id().equals(n.id());
+
+            for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
+                f.onDiscoveryEvent(e);
+        }
+    };
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -110,6 +139,176 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     }
 
     /**
+     * Adds future to future map.
+     *
+     * @param fut Future to add.
+     * @return {@code False} if node cache is stopping and future was completed with error.
+     */
+    public boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
+        forceKeyFuts.put(fut.futureId(), fut);
+
+        if (stopping) {
+            fut.onDone(stopError());
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Removes future from future map.
+     *
+     * @param fut Future to remove.
+     */
+    public void removeFuture(GridDhtForceKeysFuture<?, ?> fut) {
+        forceKeyFuts.remove(fut.futureId(), fut);
+    }
+
+    /**
+     * @param node Node.
+     * @param msg Message.
+     */
+    protected final void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse
msg) {
+        GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
+
+        if (f != null)
+            f.onResult(msg);
+        else if (log.isDebugEnabled())
+            log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId="
+ node.id() +
+                ", res=" + msg + ']');
+    }
+    /**
+     * @param node Node originated request.
+     * @param msg Force keys message.
+     */
+    protected final void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest
msg) {
+        IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(),
msg.topologyVersion());
+
+        if (fut.isDone())
+            processForceKeysRequest0(node, msg);
+        else
+            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> t) {
+                    processForceKeysRequest0(node, msg);
+                }
+            });
+    }
+
+    /**
+     * @param node Node originated request.
+     * @param msg Force keys message.
+     */
+    private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg)
{
+        try {
+            ClusterNode loc = ctx.localNode();
+
+            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
+                ctx.cacheId(),
+                msg.futureId(),
+                msg.miniId(),
+                ctx.deploymentEnabled());
+
+            GridDhtPartitionTopology top = ctx.topology();
+
+            for (KeyCacheObject k : msg.keys()) {
+                int p = ctx.affinity().partition(k);
+
+                GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE,
false);
+
+                // If this node is no longer an owner.
+                if (locPart == null && !top.owners(p).contains(loc)) {
+                    res.addMissed(k);
+
+                    continue;
+                }
+
+                GridCacheEntryEx entry;
+
+                while (true) {
+                    try {
+                        entry = ctx.dht().entryEx(k);
+
+                        entry.unswap();
+
+                        GridCacheEntryInfo info = entry.info();
+
+                        if (info == null) {
+                            assert entry.obsolete() : entry;
+
+                            continue;
+                        }
+
+                        if (!info.isNew())
+                            res.addInfo(info);
+
+                        ctx.evicts().touch(entry, msg.topologyVersion());
+
+                        break;
+                    }
+                    catch (GridCacheEntryRemovedException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Got removed entry: " + k);
+                    }
+                    catch (GridDhtInvalidPartitionException ignore) {
+                        if (log.isDebugEnabled())
+                            log.debug("Local node is no longer an owner: " + p);
+
+                        res.addMissed(k);
+
+                        break;
+                    }
+                }
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Sending force key response [node=" + node.id() + ", res=" + res
+ ']');
+
+            ctx.io().send(node, res, ctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException ignore) {
+            if (log.isDebugEnabled())
+                log.debug("Received force key request form failed node (will ignore) [nodeId="
+ node.id() +
+                    ", req=" + msg + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ",
req=" + msg + ']', e);
+        }
+    }
+
+    /**
+     *
+     */
+    public void dumpDebugInfo() {
+        if (!forceKeyFuts.isEmpty()) {
+            U.warn(log, "Pending force key futures [cache=" + ctx.name() + "]:");
+
+            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+                U.warn(log, ">>> " + fut);
+        }
+    }
+
+    @Override public void onKernalStop() {
+        super.onKernalStop();
+
+        stopping = true;
+
+        IgniteCheckedException err = stopError();
+
+        for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
+            fut.onDone(err);
+
+        ctx.gridEvents().removeLocalEventListener(discoLsnr);
+    }
+
+    /**
+     * @return Node stop exception.
+     */
+    private IgniteCheckedException stopError() {
+        return new NodeStoppingException("Operation has been cancelled (cache or node is
stopping).");
+    }
+
+    /**
      * @param nodeId Sender node ID.
      * @param res Near get response.
      */
@@ -174,6 +373,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 processTtlUpdateRequest(req);
             }
         });
+
+        ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
@@ -1194,4 +1395,35 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
             return topVer;
         }
     }
+
+    /**
+     *
+     */
+    protected abstract class MessageHandler<M> implements IgniteBiInClosure<UUID,
M> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void apply(UUID nodeId, M msg) {
+            ClusterNode node = ctx.node(nodeId);
+
+            if (node == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Received message from failed node [node=" + nodeId + ", msg="
+ msg + ']');
+
+                return;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Received message from node [node=" + nodeId + ", msg=" + msg +
']');
+
+            onMessage(node, msg);
+        }
+
+        /**
+         * @param node Node.
+         * @param msg Message.
+         */
+        protected abstract void onMessage(ClusterNode node, M msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 8f46f89..5fd3111 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -47,7 +47,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
@@ -159,6 +160,20 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends
GridDhtCach
                 processDhtUnlockRequest(nodeId, req);
             }
         });
+
+        ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysRequest.class,
+            new MessageHandler<GridDhtForceKeysRequest>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest
msg) {
+                    processForceKeysRequest(node, msg);
+                }
+            });
+
+        ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysResponse.class,
+            new MessageHandler<GridDhtForceKeysResponse>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse
msg) {
+                    processForceKeyResponse(node, msg);
+                }
+            });
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c080470..c171514 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -69,7 +69,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -398,6 +399,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 }
             });
 
+        ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysRequest.class,
+            new MessageHandler<GridDhtForceKeysRequest>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest
msg) {
+                    processForceKeysRequest(node, msg);
+                }
+            });
+
+        ctx.io().addHandler(false, ctx.cacheId(), GridDhtForceKeysResponse.class,
+            new MessageHandler<GridDhtForceKeysResponse>() {
+                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse
msg) {
+                    processForceKeyResponse(node, msg);
+                }
+            });
+
         if (near == null) {
             ctx.io().addHandler(
                 false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 0f39081..845619d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -102,9 +102,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     /** Future ID. */
     private IgniteUuid futId = IgniteUuid.randomUuid();
 
-    /** Preloader. */
-    private GridDhtPreloader preloader;
-
     /** Trackable flag. */
     private boolean trackable;
 
@@ -112,13 +109,11 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param cctx Cache context.
      * @param topVer Topology version.
      * @param keys Keys.
-     * @param preloader Preloader.
      */
     public GridDhtForceKeysFuture(
         GridCacheContext<K, V> cctx,
         AffinityTopologyVersion topVer,
-        Collection<KeyCacheObject> keys,
-        GridDhtPreloader preloader
+        Collection<KeyCacheObject> keys
     ) {
         assert topVer.topologyVersion() != 0 : topVer;
         assert !F.isEmpty(keys) : keys;
@@ -126,7 +121,6 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
         this.cctx = cctx;
         this.keys = keys;
         this.topVer = topVer;
-        this.preloader = preloader;
 
         top = cctx.dht().topology();
 
@@ -158,7 +152,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
     @Override public boolean onDone(@Nullable Collection<K> res, @Nullable Throwable
err) {
         if (super.onDone(res, err)) {
             if (trackable)
-                preloader.remoteFuture(this);
+                cctx.dht().removeFuture(this);
 
             return true;
         }
@@ -170,7 +164,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
      * @param evt Discovery event.
      */
     @SuppressWarnings( {"unchecked"})
-    void onDiscoveryEvent(DiscoveryEvent evt) {
+    public void onDiscoveryEvent(DiscoveryEvent evt) {
         topCntr.incrementAndGet();
 
         int type = evt.type();
@@ -244,7 +238,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
 
             int curTopVer = topCntr.get();
 
-            if (!preloader.addFuture(this)) {
+            if (!cctx.dht().addFuture(this)) {
                 assert isDone() : this;
 
                 return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/36b7037a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index afde2cc..52c1600 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -23,29 +23,20 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.NodeStoppingException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
@@ -59,22 +50,14 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
 import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
-import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
  * DHT cache preloader.
@@ -86,9 +69,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** */
     private GridDhtPartitionTopology top;
 
-    /** Force key futures. */
-    private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts
= newMap();
-
     /** Partition suppliers. */
     private GridDhtPartitionSupplier supplier;
 
@@ -111,40 +91,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     private final AtomicInteger partsEvictOwning = new AtomicInteger();
 
     /** */
-    private volatile boolean stopping;
-
-    /** */
     private boolean stopped;
 
-    /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
-        @Override public void onEvent(Event evt) {
-            if (!enterBusy())
-                return;
-
-            DiscoveryEvent e = (DiscoveryEvent)evt;
-
-            try {
-                ClusterNode loc = ctx.localNode();
-
-                assert e.type() == EVT_NODE_JOINED || e.type() == EVT_NODE_LEFT || e.type()
== EVT_NODE_FAILED;
-
-                final ClusterNode n = e.eventNode();
-
-                assert !loc.id().equals(n.id());
-
-                for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values())
-                    f.onDiscoveryEvent(e);
-
-                assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node
joined with smaller-than-local " +
-                    "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
-            }
-            finally {
-                leaveBusy();
-            }
-        }
-    };
-
     /**
      * @param grp Cache group.
      */
@@ -161,26 +109,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("Starting DHT rebalancer...");
 
-        ctx.io().addHandler(true, grp.groupId(), GridDhtForceKeysRequest.class,
-            new MessageHandler<GridDhtForceKeysRequest>() {
-                @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest
msg) {
-                    processForceKeysRequest(node, msg);
-                }
-            });
-
-        ctx.io().addHandler(true, grp.groupId(), GridDhtForceKeysResponse.class,
-            new MessageHandler<GridDhtForceKeysResponse>() {
-                @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse
msg) {
-                    processForceKeyResponse(node, msg);
-                }
-            });
-
         supplier = new GridDhtPartitionSupplier(grp);
         demander = new GridDhtPartitionDemander(grp);
 
         demander.start();
-
-        ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT,
EVT_NODE_FAILED);
     }
 
     /** {@inheritDoc} */
@@ -199,10 +131,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         if (log.isDebugEnabled())
             log.debug("DHT rebalancer onKernalStop callback.");
 
-        stopping = true;
-
-        ctx.gridEvents().removeLocalEventListener(discoLsnr);
-
         // Acquire write busy lock.
         busyLock.writeLock().lock();
 
@@ -213,11 +141,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             if (demander != null)
                 demander.stop();
 
-            IgniteCheckedException err = stopError();
-
-            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
-                fut.onDone(err);
-
             top = null;
 
             stopped = true;
@@ -226,12 +149,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
             busyLock.writeLock().unlock();
         }
     }
-    /**
-     * @return Node stop exception.
-     */
-    private IgniteCheckedException stopError() {
-        return new NodeStoppingException("Operation has been cancelled (cache or node is
stopping).");
-    }
 
     /** {@inheritDoc} */
     @Override public void onInitialExchangeComplete(@Nullable Throwable err) {
@@ -457,132 +374,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /**
-     * @param node Node originated request.
-     * @param msg Force keys message.
-     */
-    private void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest
msg) {
-        IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(),
msg.topologyVersion());
-
-        if (fut.isDone())
-            processForceKeysRequest0(node, msg);
-        else
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> t) {
-                    processForceKeysRequest0(node, msg);
-                }
-            });
-    }
-
-    /**
-     * @param node Node originated request.
-     * @param msg Force keys message.
-     */
-    private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg)
{
-        if (!enterBusy())
-            return;
-
-        try {
-            GridCacheContext cctx = ctx.cacheContext(msg.cacheId());
-
-            ClusterNode loc = cctx.localNode();
-
-            GridDhtForceKeysResponse res = new GridDhtForceKeysResponse(
-                cctx.cacheId(),
-                msg.futureId(),
-                msg.miniId(),
-                cctx.deploymentEnabled());
-
-            for (KeyCacheObject k : msg.keys()) {
-                int p = cctx.affinity().partition(k);
-
-                GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE,
false);
-
-                // If this node is no longer an owner.
-                if (locPart == null && !top.owners(p).contains(loc)) {
-                    res.addMissed(k);
-
-                    continue;
-                }
-
-                GridCacheEntryEx entry;
-
-                while (true) {
-                    try {
-                        entry = cctx.dht().entryEx(k);
-
-                        entry.unswap();
-
-                        GridCacheEntryInfo info = entry.info();
-
-                        if (info == null) {
-                            assert entry.obsolete() : entry;
-
-                            continue;
-                        }
-
-                        if (!info.isNew())
-                            res.addInfo(info);
-
-                        cctx.evicts().touch(entry, msg.topologyVersion());
-
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Got removed entry: " + k);
-                    }
-                    catch (GridDhtInvalidPartitionException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("Local node is no longer an owner: " + p);
-
-                        res.addMissed(k);
-
-                        break;
-                    }
-                }
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Sending force key response [node=" + node.id() + ", res=" + res
+ ']');
-
-            cctx.io().send(node, res, cctx.ioPolicy());
-        }
-        catch (ClusterTopologyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("Received force key request form failed node (will ignore) [nodeId="
+ node.id() +
-                    ", req=" + msg + ']');
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ",
req=" + msg + ']', e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param node Node.
-     * @param msg Message.
-     */
-    private void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg)
{
-        if (!enterBusy())
-            return;
-
-        try {
-            GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId());
-
-            if (f != null)
-                f.onResult(msg);
-            else if (log.isDebugEnabled())
-                log.debug("Receive force key response for unknown future (is it duplicate?)
[nodeId=" + node.id() +
-                    ", res=" + msg + ']');
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
      * Resends partitions on partition evict within configured timeout.
      *
      * @param part Evicted partition.
@@ -643,13 +434,17 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /**
+     * @param cctx Cache context.
      * @param keys Keys to request.
      * @param topVer Topology version.
      * @return Future for request.
      */
     @SuppressWarnings({"unchecked", "RedundantCast"})
     private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject>
keys, AffinityTopologyVersion topVer) {
-        final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx,
topVer, keys, this);
+        if (cctx.isNear())
+            cctx = cctx.near().dht().context();
+
+        final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx,
topVer, keys);
 
         IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
 
@@ -703,33 +498,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         }
     }
 
-    /**
-     * Adds future to future map.
-     *
-     * @param fut Future to add.
-     * @return {@code False} if node cache is stopping and future was completed with error.
-     */
-    boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) {
-        forceKeyFuts.put(fut.futureId(), fut);
-
-        if (stopping) {
-            fut.onDone(stopError());
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * Removes future from future map.
-     *
-     * @param fut Future to remove.
-     */
-    void remoteFuture(GridDhtForceKeysFuture<?, ?> fut) {
-        forceKeyFuts.remove(fut.futureId(), fut);
-    }
-
     /** {@inheritDoc} */
     @Override public void evictPartitionAsync(GridDhtLocalPartition part) {
         partsToEvict.putIfAbsent(part.id(), part);
@@ -791,44 +559,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
     /** {@inheritDoc} */
     @Override public void dumpDebugInfo() {
-        if (!forceKeyFuts.isEmpty()) {
-            U.warn(log, "Pending force key futures [grp=" + grp.name() + "]:");
-
-            for (GridDhtForceKeysFuture fut : forceKeyFuts.values())
-                U.warn(log, ">>> " + fut);
-        }
-
         supplier.dumpDebugInfo();
     }
-
-    /**
-     *
-     */
-    private abstract class MessageHandler<M> implements IgniteBiInClosure<UUID,
M> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void apply(UUID nodeId, M msg) {
-            ClusterNode node = ctx.node(nodeId);
-
-            if (node == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Received message from failed node [node=" + nodeId + ", msg="
+ msg + ']');
-
-                return;
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Received message from node [node=" + nodeId + ", msg=" + msg +
']');
-
-            onMessage(node, msg);
-        }
-
-        /**
-         * @param node Node.
-         * @param msg Message.
-         */
-        protected abstract void onMessage(ClusterNode node, M msg);
-    }
 }


Mime
View raw message