ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3073 Possible thread starvation due to rebalancing (do not wait for marshaller cache rebalance) (cherry picked from commit d4afac2284db4de5cc20ed654fc9199dc66bb00f)
Date Thu, 05 May 2016 06:57:58 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 0f2b7532d -> 65183c32f


ignite-3073 Possible thread starvation due to rebalancing (do not wait for marshaller cache
rebalance)
(cherry picked from commit d4afac2284db4de5cc20ed654fc9199dc66bb00f)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/65183c32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/65183c32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/65183c32

Branch: refs/heads/master
Commit: 65183c32fdc8efe47623a84eae3f36130e03d015
Parents: 0f2b753
Author: sboikov <sboikov@gridgain.com>
Authored: Thu May 5 08:30:05 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu May 5 09:55:44 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/MarshallerContextImpl.java  | 24 ++++++++---------
 .../processors/cache/GridCachePreloader.java    |  5 ----
 .../cache/GridCachePreloaderAdapter.java        |  5 ----
 .../processors/cache/GridCacheProcessor.java    | 22 ++++------------
 .../dht/preloader/GridDhtPreloader.java         | 27 --------------------
 5 files changed, 15 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/65183c32/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 2023a58..8f566a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -79,22 +79,18 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
      * @throws IgniteCheckedException In case of error.
      */
     public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException
{
-        ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
-            new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
-            null,
-            ctx.cache().marshallerCache().context().affinityNode(),
-            true,
-            false
-        );
-    }
-
-    /**
-     * @param ctx Kernal context.
-     * @throws IgniteCheckedException In case of error.
-     */
-    public void onMarshallerCachePreloaded(GridKernalContext ctx) throws IgniteCheckedException
{
         assert ctx != null;
 
+        if (!ctx.isDaemon()) {
+            ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery(
+                new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir),
+                null,
+                ctx.cache().marshallerCache().context().affinityNode(),
+                true,
+                false
+            );
+        }
+
         log = ctx.log(MarshallerContextImpl.class);
 
         cache = ctx.cache().marshallerCache();

http://git-wip-us.apache.org/repos/asf/ignite/blob/65183c32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index acd8c4a..a49bb04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -140,11 +140,6 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<Object> request(Collection<KeyCacheObject> keys,
AffinityTopologyVersion topVer);
 
     /**
-     * @return Future completed when rebalance on node start topology finished.
-     */
-    public IgniteInternalFuture<?> initialRebalanceFuture();
-
-    /**
      * Force preload process.
      */
     public void forcePreload();

http://git-wip-us.apache.org/repos/asf/ignite/blob/65183c32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index e393421..58b75df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -128,11 +128,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader
{
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> initialRebalanceFuture() {
-        return finFut;
-    }
-
-    /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
         cctx.deploy().unwind(cctx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/65183c32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 218d313..468d357 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -805,15 +805,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cacheStartedLatch.countDown();
         }
 
-        if (!ctx.config().isDaemon())
-            ctx.marshallerContext().onMarshallerCacheStarted(ctx);
-
-        marshallerCache().context().preloader().initialRebalanceFuture().listen(new CIX1<IgniteInternalFuture<?>>()
{
-            @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException
{
-                ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
-            }
-        });
-
         // Must call onKernalStart on shared managers after creation of fetched caches.
         for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
             mgr.onKernalStart(false);
@@ -821,6 +812,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         for (GridCacheAdapter<?, ?> cache : caches.values())
             onKernalStart(cache);
 
+        ctx.marshallerContext().onMarshallerCacheStarted(ctx);
+
         if (!ctx.config().isDaemon())
             ctx.cacheObjects().onUtilityCacheStarted();
 
@@ -830,15 +823,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             if (cache != null) {
                 if (cfg.getRebalanceMode() == SYNC) {
-                    if (cfg.getCacheMode() == REPLICATED ||
-                        (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay()
>= 0)) {
-                        boolean utilityCache = CU.isUtilityCache(cache.name());
+                    CacheMode cacheMode = cfg.getCacheMode();
 
-                        if (utilityCache || CU.isMarshallerCache(cache.name()))
-                            cache.preloader().initialRebalanceFuture().get();
-                        else
-                            cache.preloader().syncFuture().get();
-                    }
+                    if (cacheMode == REPLICATED || (cacheMode == PARTITIONED && cfg.getRebalanceDelay()
>= 0))
+                        cache.preloader().syncFuture().get();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/65183c32/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index cbaaed4..0de3197 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -103,9 +103,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     /** Start future. */
     private GridFutureAdapter<Object> startFut;
 
-    /** Future completed when rebalance on start topology finished. */
-    private final GridFutureAdapter<Object> initRebalanceFut;
-
     /** Busy lock to prevent activities from accessing exchanger while it's stopping. */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
@@ -140,18 +137,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
                 assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node
joined with smaller-than-local " +
                     "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
-
-                if (!initRebalanceFut.isDone()) {
-                    startFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> fut) {
-                            cctx.closures().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    initRebalanceFut.onDone();
-                                }
-                            });
-                        }
-                    });
-                }
             }
             finally {
                 leaveBusy();
@@ -168,7 +153,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         top = cctx.dht().topology();
 
         startFut = new GridFutureAdapter<>();
-        initRebalanceFut = new GridFutureAdapter<>();
     }
 
     /** {@inheritDoc} */
@@ -204,12 +188,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         supplier = new GridDhtPartitionSupplier(cctx);
         demander = new GridDhtPartitionDemander(cctx, demandLock);
 
-        demander.rebalanceFuture().listen(new CI1<IgniteInternalFuture<Boolean>>()
{
-            @Override public void apply(IgniteInternalFuture<Boolean> fut) {
-                initRebalanceFut.onDone();
-            }
-        });
-
         supplier.start();
         demander.start();
 
@@ -438,11 +416,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) :
demander.rebalanceFuture();
     }
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> initialRebalanceFuture() {
-        return cctx.kernalContext().clientNode() ? startFut : initRebalanceFut;
-    }
-
     /**
      * @return {@code true} if entered to busy state.
      */


Mime
View raw message