ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 1093
Date Sat, 17 Oct 2015 07:19:14 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 3d1a95fd6 -> 0566a77b5


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0566a77b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0566a77b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0566a77b

Branch: refs/heads/ignite-1093-2
Commit: 0566a77b5772043347e52d9376567948855fc3fb
Parents: 3d1a95f
Author: Anton Vinogradov <avinogradov@gridgain.com>
Authored: Sat Oct 17 10:18:24 2015 +0300
Committer: Anton Vinogradov <avinogradov@gridgain.com>
Committed: Sat Oct 17 10:18:24 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 28 +++++++++---
 .../processors/cache/GridCachePreloader.java    |  5 ++-
 .../cache/GridCachePreloaderAdapter.java        |  5 ++-
 .../dht/preloader/GridDhtPartitionDemander.java | 45 ++++++++------------
 .../dht/preloader/GridDhtPreloader.java         |  5 ++-
 5 files changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 4d95894..ababac1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -29,6 +29,7 @@ import java.util.Queue;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -141,7 +142,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     private GridFutureAdapter<?> reconnectExchangeFut;
 
     /** */
-    private final Queue<Runnable> rebalancingQueue = new ConcurrentLinkedDeque8<>();
+    private final Queue<Callable<Boolean>> rebalancingQueue = new ConcurrentLinkedDeque8<>();
 
     /**
      * Partition map futures.
@@ -1295,7 +1296,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             orderMap.get(order).add(cacheId);
                         }
 
-                        Runnable marsR = null;
+                        Callable<Boolean> marsR = null;
 
                         //Ordered rebalance scheduling.
                         for (Integer order : orderMap.keySet()) {
@@ -1310,7 +1311,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                     }
                                 }
 
-                                Runnable r = cacheCtx.preloader().addAssignments(
+                                Callable<Boolean> r = cacheCtx.preloader().addAssignments(
                                     assignsMap.get(cacheId), forcePreload, waitList, cnt);
 
                                 if (r != null) {
@@ -1333,7 +1334,14 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                 U.log(log, "Starting caches rebalancing [top=" + exchFut.topologyVersion()
+ "]");
 
                                 if (marsR != null)
-                                    marsR.run();//Marshaller cache rebalancing launches in
sync way.
+                                    try {
+                                        marsR.call();//Marshaller cache rebalancing launches
in sync way.
+                                    }
+                                    catch (Exception ex) {
+                                        U.error(log, "Failed to send partition demand message
to node", ex);
+
+                                        continue;
+                                    }
 
                                 final GridFutureAdapter fut = new GridFutureAdapter();
 
@@ -1343,14 +1351,20 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
                                     @Override public Boolean call() {
                                         try {
                                             while (true) {
-                                                Runnable rn = rebalancingQueue.poll();
+                                                Callable<Boolean> r = rebalancingQueue.poll();
 
-                                                if (rn == null)
+                                                if (r == null)
                                                     return false;
 
-                                                rn.run();
+                                               if (!r.call())
+                                                   return false;
                                             }
                                         }
+                                        catch (Exception ex) {
+                                            U.error(log, "Failed to send partition demand
message to node", ex);
+
+                                            return false;
+                                        }
                                         finally {
                                             fut.onDone();
                                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 9555bf4..79861a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -97,8 +98,8 @@ public interface GridCachePreloader {
      * @param caches Rebalancing of these caches will be finished before this started.
      * @param cnt Counter.
      */
-    public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload,
-        Collection<String> caches, int cnt) throws IgniteCheckedException;
+    public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
+        Collection<String> caches, int cnt);
 
     /**
      * @param p Preload predicate.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 4aba537..b784383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.affinity.AffinityFunction;
@@ -163,8 +164,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
-    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean
forcePreload,
-        Collection<String> caches, int cnt) throws IgniteCheckedException {
+    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
boolean forcePreload,
+        Collection<String> caches, int cnt) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index b8aa2b0..85649c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -267,8 +268,8 @@ public class GridDhtPartitionDemander {
      * @param cnt Counter.
      * @throws IgniteCheckedException Exception
      */
-    Runnable addAssignments(final GridDhtPreloaderAssignments assigns, boolean force, final
Collection<String> caches,
-        int cnt) throws IgniteCheckedException {
+    Callable<Boolean> addAssignments(final GridDhtPreloaderAssignments assigns, boolean
force,
+        final Collection<String> caches, int cnt) {
         if (log.isDebugEnabled())
             log.debug("Adding partition assignments: " + assigns);
 
@@ -298,17 +299,18 @@ public class GridDhtPartitionDemander {
                 return null;
             }
 
-            return new Runnable() {
-                @Override
-                public void run() {
+            return new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception{
                     for (String c : caches) {
                         waitForCacheRebalancing(c, fut);
 
                         if (fut.isDone())
-                            return;
+                            return false;
                     }
 
                     requestPartitions(fut, assigns);
+
+                    return true;
                 }
             };
         }
@@ -343,7 +345,7 @@ public class GridDhtPartitionDemander {
     /**
      * @param fut Future.
      */
-    private void requestPartitions(RebalanceFuture fut, GridDhtPreloaderAssignments assigns)
{
+    private void requestPartitions(RebalanceFuture fut, GridDhtPreloaderAssignments assigns)
throws IgniteCheckedException {
         for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet())
{
             if (topologyChanged(fut)) {
                 fut.cancel();
@@ -394,20 +396,13 @@ public class GridDhtPartitionDemander {
                         initD.topic(GridCachePartitionExchangeManager.rebalanceTopic(cnt));
                         initD.updateSequence(fut.updateSeq);
 
-                        try {
-                            cctx.io().sendOrderedMessage(node,
-                                GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD,
cctx.ioPolicy(), d.timeout());
-
-                            if (log.isDebugEnabled())
-                                log.debug("Requested rebalancing [from node=" + node.id()
+ ", listener index=" +
-                                    cnt + ", partitions count=" + sParts.get(cnt).size()
+
-                                    " (" + partitionsList(sParts.get(cnt)) + ")]");
-                        }
-                        catch (IgniteCheckedException ex) {
-                            fut.cancel();
+                        cctx.io().sendOrderedMessage(node,
+                            GridCachePartitionExchangeManager.rebalanceTopic(cnt), initD,
cctx.ioPolicy(), d.timeout());
 
-                            U.error(log, "Failed to send partition demand message to node",
ex);
-                        }
+                        if (log.isDebugEnabled())
+                            log.debug("Requested rebalancing [from node=" + node.id() + ",
listener index=" +
+                                cnt + ", partitions count=" + sParts.get(cnt).size() +
+                                " (" + partitionsList(sParts.get(cnt)) + ")]");
                     }
                 }
             }
@@ -1365,7 +1360,7 @@ public class GridDhtPartitionDemander {
          * @param node Node.
          * @param d D.
          */
-        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) {
+        public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException{
             demandLock.readLock().lock();
 
             try {
@@ -1399,14 +1394,10 @@ public class GridDhtPartitionDemander {
 
                     fut.cancel();
                 }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to receive partitions from node (rebalancing will
not " +
-                        "fully finish) [node=" + node.id() + ", msg=" + d + ']', e);
-
-                    fut.cancel(node.id());
-                }
                 catch (InterruptedException e) {
                     fut.cancel();
+
+                    throw new IgniteCheckedException(e);
                 }
             }
             finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0566a77b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0080406..3441f94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -408,8 +409,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments,
-        boolean forcePreload, Collection<String> caches, int cnt) throws IgniteCheckedException
{
+    @Override public Callable<Boolean> addAssignments(GridDhtPreloaderAssignments assignments,
+        boolean forcePreload, Collection<String> caches, int cnt) {
         return demander.addAssignments(assignments, forcePreload, caches, cnt);
     }
 


Mime
View raw message