ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ag...@apache.org
Subject ignite git commit: ignite-5061: move rebalance enabled methods to GridCacheSharedContext&GridCacheProcessor
Date Wed, 06 Sep 2017 14:11:36 GMT
Repository: ignite
Updated Branches:
  refs/heads/master b08eef240 -> 0ece83f02


ignite-5061: move rebalance enabled methods to GridCacheSharedContext&GridCacheProcessor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0ece83f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0ece83f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0ece83f0

Branch: refs/heads/master
Commit: 0ece83f02af219346e85269f0d80112b2faae25e
Parents: b08eef2
Author: Alexander Belyak <alexandr.belyak@xored.com>
Authored: Wed Sep 6 17:07:33 2017 +0300
Committer: Andrey Gura <agura@apache.org>
Committed: Wed Sep 6 17:07:33 2017 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgniteEx.java    | 14 ++++
 .../apache/ignite/internal/IgniteKernal.java    | 12 +++
 .../processors/cache/GridCacheProcessor.java    |  8 ++
 .../cache/GridCacheSharedContext.java           | 20 +++++
 .../dht/preloader/GridDhtPartitionDemander.java |  9 ++
 .../org/apache/ignite/mxbean/IgniteMXBean.java  | 21 +++++
 .../cache/CacheRebalancingSelfTest.java         | 88 +++++++++++++++++++-
 .../processors/igfs/IgfsIgniteMock.java         | 10 +++
 .../junits/multijvm/IgniteProcessProxy.java     | 10 +++
 9 files changed, 190 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
index 0a44987..53b3e4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEx.java
@@ -160,4 +160,18 @@ public interface IgniteEx extends Ignite {
      * @return Kernal context.
      */
     public GridKernalContext context();
+
+    /**
+     * Get rebalance enabled flag.
+     *
+     * @return {@code True} if rebalance enabled on node, {@code False} otherwise.
+     */
+    public boolean isRebalanceEnabled();
+
+    /**
+     * Set rebalance enable flag on node.
+     *
+     * @param rebalanceEnabled rebalance enabled flag.
+     */
+    public void rebalanceEnabled(boolean rebalanceEnabled);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index cfad8b4..a3b8651 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -461,6 +461,18 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
     }
 
     /** {@inheritDoc} */
+    @Override
+    public boolean isRebalanceEnabled() {
+        return ctx.cache().context().isRebalanceEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void rebalanceEnabled(boolean rebalanceEnabled) {
+        ctx.cache().context().rebalanceEnabled(rebalanceEnabled);
+    }
+
+    /** {@inheritDoc} */
     @Override public long getUpTime() {
         return U.currentTimeMillis() - startTime;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index bd950fa..5d84809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -279,6 +279,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Start cache rebalance.
+     */
+    public void enableRebalance() {
+        for (IgniteCacheProxy c : publicCaches())
+            c.rebalance();
+    }
+
+    /**
      * Create exchange worker task for custom discovery message.
      *
      * @param msg Custom discovery message.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 82d960a..d8614b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -158,6 +158,9 @@ public class GridCacheSharedContext<K, V> {
     /** Concurrent DHT atomic updates counters. */
     private AtomicIntegerArray dhtAtomicUpdCnt;
 
+    /** Rebalance enabled flag. */
+    private boolean rebalanceEnabled = true;
+
     /** */
     private final List<IgniteChangeGlobalStateSupport> stateAwareMgrs;
 
@@ -304,6 +307,23 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return rebalance enabled flag.
+     */
+    public boolean isRebalanceEnabled() {
+        return this.rebalanceEnabled;
+    }
+
+    /**
+     * @param rebalanceEnabled rebalance enabled flag.
+     */
+    public void rebalanceEnabled(boolean rebalanceEnabled) {
+        this.rebalanceEnabled = rebalanceEnabled;
+
+        if (rebalanceEnabled)
+            cache().enableRebalance();
+    }
+
+    /**
      * @param reconnectFut Reconnect future.
      * @throws IgniteCheckedException If failed.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2258187..54661ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -402,6 +402,15 @@ public class GridDhtPartitionDemander {
             return;
         }
 
+        if (!ctx.kernalContext().grid().isRebalanceEnabled()) {
+            if (log.isDebugEnabled())
+                log.debug("Cancel partition demand because rebalance disabled on current
node.");
+
+            fut.cancel();
+
+            return;
+        }
+
         synchronized (fut) { // Synchronized to prevent consistency issues in case of parallel
cancellation.
             if (fut.isDone())
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
index ce63e4f..428d03c 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
@@ -51,6 +51,27 @@ public interface IgniteMXBean {
     public String getStartTimestampFormatted();
 
     /**
+     * Gets rebalance enabled flag.
+     *
+     * @return Rebalance enabled flag.
+     */
+    @MXBeanDescription("Rebalance enabled flag.")
+    public boolean isRebalanceEnabled();
+
+    /**
+     * Enable or disable cache partition rebalance per node.
+     *
+     * @param rebalanceEnabled If {@code true} then set rebalance to enabled state.
+     */
+    @MXBeanParametersDescriptions(
+        {
+            "Enable cache partitions rebalance on node.",
+            "Disable cache partitions rebalance on node."
+        }
+    )
+    public void rebalanceEnabled(boolean rebalanceEnabled);
+
+    /**
      * Gets string presentation of up-time for the kernal.
      *
      * @return String presentation of up-time for the kernal.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
index f5ae59d..421ff08 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
@@ -18,24 +18,43 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import com.sun.org.apache.regexp.internal.RE;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
 /**
  * Test for rebalancing.
  */
 public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
+
+    /** Cache name with one backups */
+    private static final String REBALANCE_TEST_CACHE_NAME = "rebalanceCache";
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
-        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME));
+        CacheConfiguration<Integer,Integer> rebalabceCacheCfg = new CacheConfiguration<>();
+        rebalabceCacheCfg.setBackups(1);
+        rebalabceCacheCfg.setName(REBALANCE_TEST_CACHE_NAME);
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME), rebalabceCacheCfg);
 
         return cfg;
     }
@@ -75,8 +94,73 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
      * @return Internal future.
      */
     private static IgniteInternalFuture internalFuture(IgniteFuture fut) {
-        assert fut instanceof IgniteFutureImpl : fut;
+        assertTrue(fut.toString(), fut instanceof IgniteFutureImpl);
 
         return ((IgniteFutureImpl) fut).internalFuture();
     }
+
+    /**
+     * Test local cache size with and without rebalancing in case or topology change.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDisableRebalancing() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+        IgniteEx ig1 = startGrid(1);
+        startGrid(2);
+
+        ig1.rebalanceEnabled(false);
+
+        Random r = new Random();
+
+        int totalKeysCount = 10240;
+
+        IgniteCache<Integer, Integer> cache = ig0.getOrCreateCache(REBALANCE_TEST_CACHE_NAME);
+
+        for (int i = 0;i < totalKeysCount;i++)
+            cache.put(r.nextInt(), 1);
+
+
+        testLocalCacheSize(ig0, 0, totalKeysCount);
+        int before_ig1 = testLocalCacheSize(ig1, 0, totalKeysCount);
+
+        stopGrid(2);
+
+        testLocalCacheSize(ig0, totalKeysCount, null);
+        testLocalCacheSize(ig1, before_ig1, null);
+
+
+        ig1.rebalanceEnabled(true);
+
+        testLocalCacheSize(ig0, totalKeysCount, null);
+        testLocalCacheSize(ig1, totalKeysCount, null);
+    }
+
+    /**
+     * Test if test cache in specified node have correct local size.
+     *
+     * @param ignite node to test
+     * @param expFrom left bound
+     * @param expTo right bound (or {@code null})
+     * @return actual local cache size
+     * @throws IgniteInterruptedCheckedException
+     */
+    private int testLocalCacheSize(IgniteEx ignite, final Integer expFrom, final Integer
expTo) throws IgniteInterruptedCheckedException {
+        final IgniteCache cache = ignite.cache(REBALANCE_TEST_CACHE_NAME);
+
+        boolean isOk = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                Integer actualSize = cache.localSize(CachePeekMode.ALL);
+
+                return expTo == null ? expFrom.equals(actualSize) : expFrom <= actualSize
&& actualSize <= expTo;
+            }
+        }, 10_000);
+
+        int rslt = cache.localSize(CachePeekMode.ALL);
+
+        assertTrue(ignite.configuration().getIgniteInstanceName() + " cache local size =
"
+            + rslt + " not " + (expTo == null ? "equal " + expFrom : "in " + expFrom + "-"
+ expTo), isOk);
+
+        return rslt;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
index df3c1ea..1e5fcd1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -178,6 +178,16 @@ public class IgfsIgniteMock implements IgniteEx {
         return null;
     }
 
+    @Override
+    public boolean isRebalanceEnabled() {
+        return true;
+    }
+
+    @Override
+    public void rebalanceEnabled(boolean rebalanceEnabled) {
+        throwUnsupported();
+    }
+
     /** {@inheritDoc} */
     @Override public String name() {
         return name;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0ece83f0/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
index 4d51853..3075920 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java
@@ -418,6 +418,16 @@ public class IgniteProcessProxy implements IgniteEx {
         throw new UnsupportedOperationException("Operation isn't supported yet.");
     }
 
+    @Override
+    public boolean isRebalanceEnabled() {
+        return true;
+    }
+
+    @Override
+    public void rebalanceEnabled(boolean rebalanceEnabled) {
+        throw new UnsupportedOperationException("Operation isn't supported yet.");
+    }
+
     /** {@inheritDoc} */
     @Override public IgniteCompute compute() {
         throw new UnsupportedOperationException("Operation isn't supported yet.");


Mime
View raw message