ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject ignite git commit: IGNITE-3195 Rebalancing: IgniteConfiguration.rebalanceThreadPoolSize is wrongly treated
Date Tue, 11 Apr 2017 12:59:54 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3195 [created] 1bde2bdc9


IGNITE-3195 Rebalancing: IgniteConfiguration.rebalanceThreadPoolSize is wrongly treated


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1bde2bdc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1bde2bdc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1bde2bdc

Branch: refs/heads/ignite-3195
Commit: 1bde2bdc92c7da12c570d3c478d7767d046c2ef3
Parents: c9d08d3
Author: Anton Vinogradov <av@apache.org>
Authored: Tue Apr 11 15:59:44 2017 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Apr 11 15:59:44 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 75 +++++++++++++++-----
 1 file changed, 56 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1bde2bdc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 885106d..6b38f55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -384,28 +385,64 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++)
{
                 final int idx = cnt;
 
+                // rebQ & rebOwn provides guaranties (efficiently) of maximum threads
amount can be used at rebalancing
+                // specified at IgniteConfiguration.getRebalanceThreadPoolSize()
+                final ConcurrentLinkedDeque<T2<GridCacheMessage, UUID>> rebQ
= new ConcurrentLinkedDeque<>();
+                final AtomicBoolean rebOwn = new AtomicBoolean();
+
                 cctx.io().addOrderedHandler(rebalanceTopic(cnt), new CI2<UUID, GridCacheMessage>()
{
-                    @Override public void apply(final UUID id, final GridCacheMessage m)
{
-                        if (!enterBusy())
-                            return;
-
-                        try {
-                            GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId);
-
-                            if (cacheCtx != null) {
-                                if (m instanceof GridDhtPartitionSupplyMessage)
-                                    cacheCtx.preloader().handleSupplyMessage(
-                                        idx, id, (GridDhtPartitionSupplyMessage)m);
-                                else if (m instanceof GridDhtPartitionDemandMessage)
-                                    cacheCtx.preloader().handleDemandMessage(
-                                        idx, id, (GridDhtPartitionDemandMessage)m);
-                                else
-                                    U.error(log, "Unsupported message type: " + m.getClass().getName());
+                    @Override public void apply(final UUID id0, final GridCacheMessage m0)
{
+                        rebQ.add(new T2<>(m0, id0));
+
+                        if (!rebOwn.get() && rebOwn.compareAndSet(false, true)) {
+                            boolean locked = true;
+
+                            while (locked || !rebQ.isEmpty()) {
+                                if (!locked && !rebOwn.compareAndSet(false, true))
+                                    return;
+
+                                try {
+                                    T2<GridCacheMessage, UUID> t = rebQ.poll();
+
+                                    if (t != null) {
+                                        GridCacheMessage m = t.get1();
+                                        UUID id = t.get2();
+
+                                        if (!enterBusy())
+                                            return;
+
+                                        try {
+                                            GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId);
+
+                                            if (cacheCtx != null) {
+                                                if (m instanceof GridDhtPartitionSupplyMessage)
+                                                    cacheCtx.preloader().handleSupplyMessage(
+                                                        idx, id, (GridDhtPartitionSupplyMessage)m);
+                                                else if (m instanceof GridDhtPartitionDemandMessage)
+                                                    cacheCtx.preloader().handleDemandMessage(
+                                                        idx, id, (GridDhtPartitionDemandMessage)m);
+                                                else
+                                                    U.error(log, "Unsupported message type:
" + m.getClass().getName());
+                                            }
+                                        }
+                                        finally {
+                                            leaveBusy();
+                                        }
+                                    }
+                                }
+                                finally {
+                                    if (!rebQ.isEmpty())
+                                        locked = true;
+                                    else {
+                                        boolean res = rebOwn.compareAndSet(true, false);
+
+                                        assert res;
+
+                                        locked = false;
+                                    }
+                                }
                             }
                         }
-                        finally {
-                            leaveBusy();
-                        }
                     }
                 });
             }


Mime
View raw message