ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 1093
Date Tue, 03 Nov 2015 09:59:39 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-3 8333e2535 -> 7316e4a44


1093


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7316e4a4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7316e4a4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7316e4a4

Branch: refs/heads/ignite-1093-3
Commit: 7316e4a44b422cd00df7661a8086252b4c877cf4
Parents: 8333e25
Author: Anton Vinogradov <av@apache.org>
Authored: Tue Nov 3 12:59:27 2015 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Nov 3 12:59:27 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       | 72 +++++++++++---------
 .../configuration/IgniteConfiguration.java      |  2 +-
 .../apache/ignite/internal/IgniteKernal.java    | 11 +++
 .../processors/cache/GridCachePreloader.java    |  8 ++-
 .../dht/preloader/GridDhtPartitionDemander.java |  8 +--
 .../dht/preloader/GridDhtPartitionSupplier.java |  5 +-
 .../GridCacheRebalancingSyncSelfTest.java       |  4 +-
 7 files changed, 67 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 1e1f437..76929b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -69,13 +69,14 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
     private static final long serialVersionUID = 0L;
 
     /** Default size of rebalance thread pool. */
+    @Deprecated
     public static final int DFLT_REBALANCE_THREAD_POOL_SIZE = 2;
 
     /** Default rebalance timeout (ms).*/
     public static final long DFLT_REBALANCE_TIMEOUT = 10000;
 
-    /** Default rebalance batches count. */
-    public static final long DFLT_REBALANCE_BATCHES_COUNT = 2;
+    /** Default rebalance batches prefetch count. */
+    public static final long DFLT_REBALANCE_BATCHES_PREFETCH_COUNT = 2;
 
     /** Time in milliseconds to wait between rebalance messages to avoid overloading CPU.
*/
     public static final long DFLT_REBALANCE_THROTTLE = 0;
@@ -177,6 +178,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
     private String name;
 
     /** Rebalance thread pool size. */
+    @Deprecated
     private int rebalancePoolSize = DFLT_REBALANCE_THREAD_POOL_SIZE;
 
     /** Rebalance timeout. */
@@ -257,12 +259,12 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
     /** Rebalance batch size. */
     private int rebalanceBatchSize = DFLT_REBALANCE_BATCH_SIZE;
 
+    /** Rebalance batches prefetch count. */
+    private long rebalanceBatchesPrefetchCount = DFLT_REBALANCE_BATCHES_PREFETCH_COUNT;
+
     /** Off-heap memory size. */
     private long offHeapMaxMem = DFLT_OFFHEAP_MEMORY;
 
-    /** Rebalance batches count. */
-    private long rebalanceBatchesCount = DFLT_REBALANCE_BATCHES_COUNT;
-
     /** */
     private boolean swapEnabled = DFLT_SWAP_ENABLED;
 
@@ -405,7 +407,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
         rebalanceDelay = cc.getRebalanceDelay();
         rebalanceOrder = cc.getRebalanceOrder();
         rebalancePoolSize = cc.getRebalanceThreadPoolSize();
-        rebalanceBatchesCount = cc.getRebalanceBatchesCount();
+        rebalanceBatchesPrefetchCount = cc.getRebalanceBatchesPrefetchCount();
         rebalanceTimeout = cc.getRebalanceTimeout();
         rebalanceThrottle = cc.getRebalanceThrottle();
         readFromBackup = cc.isReadFromBackup();
@@ -1089,7 +1091,36 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
      * @return {@code this} for chaining.
      */
     public CacheConfiguration<K, V> setRebalanceBatchSize(int rebalanceBatchSize) {
-        this.rebalanceBatchSize = Math.max(1, rebalanceBatchSize);
+        this.rebalanceBatchSize = rebalanceBatchSize;
+
+        return this;
+    }
+
+    /**
+     * To gain better rebalancing performance supplier node can provide more than one batch
at rebalancing start and
+     * provide one new to each next demand request.
+     *
+     * Gets number of batches generated by supply node at rebalancing start.
+     * Minimum is 1.
+     *
+     * @return batches count
+     */
+    public long getRebalanceBatchesPrefetchCount() {
+        return rebalanceBatchesPrefetchCount;
+    }
+
+    /**
+     * To gain better rebalancing performance supplier node can provide more than one batch
at rebalancing start and
+     * provide one new to each next demand request.
+     *
+     * Sets number of batches generated by supply node at rebalancing start.
+     * Minimum is 1.
+     *
+     * @param rebalanceBatchesCnt batches count.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K, V> setRebalanceBatchesPrefetchCount(long rebalanceBatchesCnt)
{
+        this.rebalanceBatchesPrefetchCount = rebalanceBatchesCnt;
 
         return this;
     }
@@ -1781,33 +1812,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K,
V> {
     }
 
     /**
-     * To gain better rebalancing performance supplier node can provide more than one batch
at start and provide
-     * one new to each next demand request.
-     *
-     * Gets number of batches generated by supply node at rebalancing start.
-     *
-     * @return batches count
-     */
-    public long getRebalanceBatchesCount() {
-        return rebalanceBatchesCount;
-    }
-
-    /**
-     *  To gain better rebalancing performance supplier node can provide more than one batch
at start and provide
-     * one new to each next demand request.
-     *
-     * Sets number of batches generated by supply node at rebalancing start.
-     *
-     * @param rebalanceBatchesCnt batches count.
-     * @return {@code this} for chaining.
-     */
-    public CacheConfiguration<K, V> setRebalanceBatchesCount(long rebalanceBatchesCnt)
{
-        this.rebalanceBatchesCount = rebalanceBatchesCnt;
-
-        return this;
-    }
-
-    /**
      * Gets cache store session listener factories.
      *
      * @return Cache store session listener factories.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 02b1066..7cf3a65 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -1343,7 +1343,7 @@ public class IgniteConfiguration {
      * @return count.
      */
     public int getRebalanceThreadPoolSize() {
-        return Math.max(1, rebalanceThreadPoolSize);
+        return rebalanceThreadPoolSize;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 7691434..94ffb40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2144,6 +2144,17 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         if (cfg.getSystemThreadPoolSize() <= cfg.getRebalanceThreadPoolSize())
             throw new IgniteCheckedException("Rebalance thread pool size exceed or equals
System thread pool size. " +
                 "Change IgniteConfiguration.rebalanceThreadPoolSize property before next
start.");
+
+        if (cfg.getRebalanceThreadPoolSize() < 1)
+            throw new IgniteCheckedException("Rebalance thread pool size minimal allowed
value is 1. " +
+                "Change IgniteConfiguration.rebalanceThreadPoolSize property before next
start.");
+
+        for (CacheConfiguration ccfg : cfg.getCacheConfiguration()){
+            if (ccfg.getRebalanceBatchesPrefetchCount() < 1)
+                throw new IgniteCheckedException("Rebalance batches prefetch count minimal
allowed value is 1. " +
+                    "Change CacheConfiguration.rebalanceBatchesPrefetchCount property before
next start. " +
+                    "[cache="+ccfg.getName()+"]");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index cda392c..bab3b32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -123,7 +123,13 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<?> syncFuture();
 
     /**
-     * @return Future which will complete when preloading is finished on current topology.
+     * @return Future which will complete when preloading finishes on current topology.
+     *
+     * Future result is {@code true} in case rebalancing successfully finished at current
topology.
+     * Future result is {@code false} in case rebalancing cancelled or finished with missed
partitions and will be
+     * restarted at current or pending topology.
+     *
+     * Note that topology change creates new futures and finishes previous.
      */
     public IgniteInternalFuture<Boolean> rebalanceFuture();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index f2c1dc2..b131679 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -110,6 +110,10 @@ public class GridDhtPartitionDemander {
     @Deprecated//Backward compatibility. To be removed in future.
     private final ReadWriteLock demandLock;
 
+    /** DemandWorker index. */
+    @Deprecated//Backward compatibility. To be removed in future.
+    private final AtomicInteger dmIdx = new AtomicInteger();
+
     /**
      * @param cctx Cctx.
      * @param demandLock Demand lock.
@@ -1063,10 +1067,6 @@ public class GridDhtPartitionDemander {
         }
     }
 
-    /** DemandWorker index. */
-    @Deprecated//Backward compatibility. To be removed in future.
-    private final AtomicInteger dmIdx = new AtomicInteger();
-
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 494560f..e7e1dbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -212,6 +212,9 @@ class GridDhtPartitionSupplier {
 
         ClusterNode node = cctx.discovery().node(id);
 
+        if (node == null)
+            return; //Context will be cleaned at topology change.
+
         try {
             SupplyContext sctx;
 
@@ -233,7 +236,7 @@ class GridDhtPartitionSupplier {
 
             boolean newReq = true;
 
-            long maxBatchesCnt = cctx.config().getRebalanceBatchesCount();
+            long maxBatchesCnt = cctx.config().getRebalanceBatchesPrefetchCount();
 
             if (sctx != null) {
                 phase = sctx.phase;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7316e4a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index c866a1d..6a7f701 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -84,7 +84,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         cachePCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cachePCfg.setBackups(1);
         cachePCfg.setRebalanceBatchSize(1);
-        cachePCfg.setRebalanceBatchesCount(1);
+        cachePCfg.setRebalanceBatchesPrefetchCount(1);
         cachePCfg.setRebalanceOrder(2);
 
         CacheConfiguration<Integer, Integer> cachePCfg2 = new CacheConfiguration<>();
@@ -102,7 +102,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest
{
         cacheRCfg.setCacheMode(CacheMode.REPLICATED);
         cacheRCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cacheRCfg.setRebalanceBatchSize(1);
-        cacheRCfg.setRebalanceBatchesCount(Integer.MAX_VALUE);
+        cacheRCfg.setRebalanceBatchesPrefetchCount(Integer.MAX_VALUE);
         ((TcpCommunicationSpi)iCfg.getCommunicationSpi()).setSharedMemoryPort(-1);//Shmem
fail fix for Integer.MAX_VALUE.
 
         CacheConfiguration<Integer, Integer> cacheRCfg2 = new CacheConfiguration<>();


Mime
View raw message