Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2C715200AE3 for ; Wed, 4 May 2016 13:20:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1FF201601A3; Wed, 4 May 2016 11:20:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3EB6C1601A2 for ; Wed, 4 May 2016 13:20:02 +0200 (CEST) Received: (qmail 49827 invoked by uid 500); 4 May 2016 11:20:01 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 49818 invoked by uid 99); 4 May 2016 11:20:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 May 2016 11:20:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EB881DFB38; Wed, 4 May 2016 11:20:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <8891d845d4564b86b8bd5b49ce9d07fe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-3073 Possible thread starvation due to rebalancing (no need to wait for utility cache rebalance to get metadata) Date: Wed, 4 May 2016 11:20:00 +0000 (UTC) archived-at: Wed, 04 May 2016 11:20:03 -0000 Repository: ignite Updated Branches: refs/heads/master 718cf78b6 -> f3eb94e0f ignite-3073 Possible thread starvation due to rebalancing (no need to wait for utility cache rebalance to get metadata) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f3eb94e0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f3eb94e0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f3eb94e0 Branch: refs/heads/master Commit: f3eb94e0ffb935546be0288e0520f3e406a2a543 Parents: 718cf78 Author: sboikov Authored: Wed May 4 14:19:50 2016 +0300 Committer: sboikov Committed: Wed May 4 14:19:50 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 9 +- .../IgniteCacheStarvationOnRebalanceTest.java | 166 +++++++++++++++++++ .../IgniteCacheWithIndexingTestSuite.java | 2 + 3 files changed, 172 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f3eb94e0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 3d5052b..218d313 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -821,6 +821,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter cache : caches.values()) onKernalStart(cache); + if (!ctx.config().isDaemon()) + ctx.cacheObjects().onUtilityCacheStarted(); + // Wait for caches in SYNC preload mode. for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { GridCacheAdapter cache = caches.get(maskNull(cfg.getName())); @@ -831,12 +834,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) { boolean utilityCache = CU.isUtilityCache(cache.name()); - if (utilityCache || CU.isMarshallerCache(cache.name())) { + if (utilityCache || CU.isMarshallerCache(cache.name())) cache.preloader().initialRebalanceFuture().get(); - - if (utilityCache) - ctx.cacheObjects().onUtilityCacheStarted(); - } else cache.preloader().syncFuture().get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f3eb94e0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStarvationOnRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStarvationOnRebalanceTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStarvationOnRebalanceTest.java new file mode 100644 index 0000000..17e0c74 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStarvationOnRebalanceTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * Test to reproduce https://issues.apache.org/jira/browse/IGNITE-3073. + */ +public class IgniteCacheStarvationOnRebalanceTest extends GridCacheAbstractSelfTest { + /** Grid count. */ + private static final int GRID_CNT = 4; + + /** Test timeout. */ + private static final long TEST_TIMEOUT = 3 * 60 * 1000; + + /** Use small system thread pool to reproduce the issue. */ + private static final int IGNITE_THREAD_POOL_SIZE = 5; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + // Use small system thread pool to reproduce the issue. + cfg.setSystemThreadPoolSize(IGNITE_THREAD_POOL_SIZE); + + cfg.setMarshaller(new BinaryMarshaller()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected Class[] indexedTypes() { + return new Class[] {Integer.class, CacheValue.class}; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** + * @throws Exception If failed. + */ + public void testLoadSystemWithPutAndStartRebalancing() throws Exception { + final IgniteCache cache = grid(0).cache(null); + + final long endTime = System.currentTimeMillis() + TEST_TIMEOUT - 60_000; + + int iter = 0; + + while (System.currentTimeMillis() < endTime) { + info("Iteration: " + iter++); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get() && System.currentTimeMillis() < endTime) { + int key = rnd.nextInt(100_000); + + cache.put(key, new CacheValue(key)); + } + + return null; + } + }, IGNITE_THREAD_POOL_SIZE * 4, "put-thread"); + + try { + Thread.sleep(500); + + info("Initial set of keys is loaded."); + + info("Starting new node..."); + + startGrid(GRID_CNT + 1); + + info("New node is started."); + + Thread.sleep(500); + } + finally { + stop.set(true); + } + + // Wait for put tasks. If put() is blocked the test is timed out. + fut.get(); + + stopGrid(GRID_CNT + 1); + } + } + + /** + * Test cache value. + */ + private static class CacheValue { + /** */ + @QuerySqlField(index = true) + private final int val; + + /** + * @param val Value. + */ + CacheValue(int val) { + this.val = val; + } + + /** + * @return Value. + */ + int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheValue.class, this); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f3eb94e0/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index 9ff7520..929fb37 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexGetSelfT import org.apache.ignite.internal.processors.cache.GridCacheSwapSelfTest; import org.apache.ignite.internal.processors.cache.GridIndexingWithNoopSwapSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheConfigurationPrimitiveTypesSelfTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheStarvationOnRebalanceTest; import org.apache.ignite.internal.processors.cache.IgniteClientReconnectQueriesTest; import org.apache.ignite.internal.processors.cache.ttl.CacheTtlOffheapAtomicLocalSelfTest; import org.apache.ignite.internal.processors.cache.ttl.CacheTtlOffheapAtomicPartitionedSelfTest; @@ -72,6 +73,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class); suite.addTestSuite(IgniteClientReconnectQueriesTest.class); suite.addTestSuite(CacheRandomOperationsMultithreadedTest.class); + suite.addTestSuite(IgniteCacheStarvationOnRebalanceTest.class); return suite; }