ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 1093
Date Mon, 07 Sep 2015 16:03:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 049918089 -> e97b5818a


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e97b5818
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e97b5818
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e97b5818

Branch: refs/heads/ignite-1093-2
Commit: e97b5818a371ac3d71281f00820d2a025b55b7b6
Parents: 0499180
Author: Anton Vinogradov <av@apache.org>
Authored: Mon Sep 7 19:03:36 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Mon Sep 7 19:03:36 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 37 ++++++--------------
 .../dht/preloader/GridDhtPartitionSupplier.java |  2 +-
 2 files changed, 11 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e97b5818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index b260501..7a0a94c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -40,7 +40,6 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -78,7 +77,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
@@ -142,6 +140,8 @@ public class GridDhtPartitionDemander {
      *
      */
     void stop() {
+        syncFut.onCancel();
+
         lastExchangeFut = null;
 
         lastTimeoutObj.set(null);
@@ -193,7 +193,7 @@ public class GridDhtPartitionDemander {
      * @return {@code True} if topology changed.
      */
     private boolean topologyChanged(AffinityTopologyVersion topVer) {
-        return !cctx.affinity().affinityTopologyVersion().equals(topVer);
+        return cctx.affinity().affinityTopologyVersion().topologyVersion() != topVer.topologyVersion();
     }
 
     /**
@@ -334,8 +334,6 @@ public class GridDhtPartitionDemander {
                 }
             });
 
-            fut.setDemandThread(thread);
-
             thread.start();
         }
         else if (delay > 0) {
@@ -373,7 +371,7 @@ public class GridDhtPartitionDemander {
         AffinityTopologyVersion topVer = fut.topologyVersion();
 
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
-            if (topologyChanged(topVer) || Thread.interrupted()) {
+            if (topologyChanged(topVer)) {
                 fut.onCancel();
 
                 return;
@@ -778,8 +776,6 @@ public class GridDhtPartitionDemander {
         /** Started. */
         private ConcurrentHashMap8<UUID, Long> started = new ConcurrentHashMap8<>();
 
-        private volatile IgniteThread thread;
-
         /** Lock. */
         private Lock lock = new ReentrantLock();
 
@@ -812,24 +808,14 @@ public class GridDhtPartitionDemander {
          * @param assigns Assigns.
          */
         void init(GridDhtPreloaderAssignments assigns) {
-            final SyncFuture fut = this;
-
-            lsnr = new GridLocalEventListener() {
-                @Override public void onEvent(Event evt) {
-                    fut.onCancel();
-                }
-            };
-
-            cctx.events().addListener(lsnr, EVT_NODE_FAILED);
-
             this.assigns = assigns;
-        }
 
-        /**
-         * @param thread
-         */
-        void setDemandThread(IgniteThread thread) {
-            this.thread = thread;
+            cctx.discovery().topologyFuture(assigns.topologyVersion().topologyVersion() +
1).listen(
+                new CI1<IgniteInternalFuture<Long>>() {
+                    @Override public void apply(IgniteInternalFuture<Long> future)
{
+                        SyncFuture.this.onCancel();
+                    }
+                });
         }
 
         /**
@@ -1028,9 +1014,6 @@ public class GridDhtPartitionDemander {
                 if (lsnr != null)
                     cctx.events().removeListener(lsnr);
 
-                if (thread != null)
-                    thread.interrupt();
-
                 onDone(completed);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e97b5818/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0686376..49e89ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -104,7 +104,7 @@ class GridDhtPartitionSupplier {
         assert d != null;
         assert id != null;
 
-        if (!cctx.affinity().affinityTopologyVersion().equals(d.topologyVersion()))
+        if (cctx.affinity().affinityTopologyVersion().topologyVersion() != d.topologyVersion().topologyVersion())
             return;
 
         GridDhtPartitionSupplyMessageV2 s = new GridDhtPartitionSupplyMessageV2(d.workerId(),


Mime
View raw message