Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F39BA200B8D for ; Fri, 23 Sep 2016 12:41:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2478160AC2; Fri, 23 Sep 2016 10:41:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C2964160AEA for ; Fri, 23 Sep 2016 12:41:04 +0200 (CEST) Received: (qmail 26321 invoked by uid 500); 23 Sep 2016 10:41:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 26093 invoked by uid 99); 23 Sep 2016 10:41:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Sep 2016 10:41:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2C2D6E7E18; Fri, 23 Sep 2016 10:41:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 23 Sep 2016 10:41:10 -0000 Message-Id: In-Reply-To: <548ab4801fac47caba4d66805b5048a3@git.apache.org> References: <548ab4801fac47caba4d66805b5048a3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/12] ignite git commit: ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored archived-at: Fri, 23 Sep 2016 10:41:06 -0000 ignite-3810 Fixed hang in FileSwapSpaceSpi when too large value is stored Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/780bf23d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/780bf23d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/780bf23d Branch: refs/heads/ignite-1.6.8-hadoop Commit: 780bf23d5c89452dd062be4fab9e2e56d50bb9e2 Parents: 9b72d18 Author: sboikov Authored: Mon Sep 19 18:19:33 2016 +0300 Committer: sboikov Committed: Mon Sep 19 18:19:33 2016 +0300 ---------------------------------------------------------------------- .../spi/swapspace/file/FileSwapSpaceSpi.java | 38 +++++++-- .../CacheSwapUnswapGetTestSmallQueueSize.java | 35 ++++++++ .../file/GridFileSwapSpaceSpiSelfTest.java | 89 ++++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 2 + 4 files changed, 158 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java index 8809f08..9be5b93 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/swapspace/file/FileSwapSpaceSpi.java @@ -639,7 +639,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, if (space == null && create) { validateName(name); - Space old = spaces.putIfAbsent(masked, space = new Space(masked)); + Space old = spaces.putIfAbsent(masked, space = new Space(masked, log)); if (old != null) space = old; @@ -833,13 +833,21 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, /** */ private final int maxSize; + /** */ + private final IgniteLogger log; + + /** */ + private boolean queueSizeWarn; + /** * @param minTakeSize Min size. * @param maxSize Max size. + * @param log logger */ - private SwapValuesQueue(int minTakeSize, int maxSize) { + private SwapValuesQueue(int minTakeSize, int maxSize, IgniteLogger log) { this.minTakeSize = minTakeSize; this.maxSize = maxSize; + this.log = log; } /** @@ -852,8 +860,24 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, lock.lock(); try { - while (size + val.len > maxSize) - mayAdd.await(); + boolean largeVal = val.len > maxSize; + + if (largeVal) { + if (!queueSizeWarn) { + U.warn(log, "Trying to save in swap entry which have size more than write queue size. " + + "You may wish to increase 'maxWriteQueueSize' in FileSwapSpaceSpi configuration " + + "[queueMaxSize=" + maxSize + ", valSize=" + val.len + ']'); + + queueSizeWarn = true; + } + + while (size >= minTakeSize) + mayAdd.await(); + } + else { + while (size + val.len > maxSize) + mayAdd.await(); + } size += val.len; @@ -1419,7 +1443,7 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, private SwapFile right; /** */ - private final SwapValuesQueue que = new SwapValuesQueue(writeBufSize, maxWriteQueSize); + private final SwapValuesQueue que; /** Partitions. */ private final ConcurrentMap> parts = @@ -1442,11 +1466,13 @@ public class FileSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceSpi, /** * @param name Space name. + * @param log Logger. */ - private Space(String name) { + private Space(String name, IgniteLogger log) { assert name != null; this.name = name; + this.que = new SwapValuesQueue(writeBufSize, maxWriteQueSize, log); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java new file mode 100644 index 0000000..8d189fe --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSwapUnswapGetTestSmallQueueSize.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi; + +/** + * + */ +public class CacheSwapUnswapGetTestSmallQueueSize extends CacheSwapUnswapGetTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((FileSwapSpaceSpi)cfg.getSwapSpaceSpi()).setMaxWriteQueueSize(2); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java index 64652b1..ab21165 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/file/GridFileSwapSpaceSpiSelfTest.java @@ -25,11 +25,14 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; @@ -37,8 +40,10 @@ import org.apache.ignite.spi.IgniteSpiCloseableIterator; import org.apache.ignite.spi.swapspace.GridSwapSpaceSpiAbstractSelfTest; import org.apache.ignite.spi.swapspace.SwapKey; import org.apache.ignite.spi.swapspace.SwapSpaceSpi; +import org.apache.ignite.testframework.GridTestUtils; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import org.junit.Assert; /** * Test for {@link FileSwapSpaceSpi}. @@ -364,4 +369,88 @@ public class GridFileSwapSpaceSpiSelfTest extends GridSwapSpaceSpiAbstractSelfTe assertEquals(hash0, hash1); } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testSaveValueLargeThenQueueSize() throws IgniteCheckedException { + final String spaceName = "mySpace"; + final SwapKey key = new SwapKey("key"); + + final byte[] val = new byte[FileSwapSpaceSpi.DFLT_QUE_SIZE * 2]; + Arrays.fill(val, (byte)1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public byte[] call() throws Exception { + return saveAndGet(spaceName, key, val); + } + }); + + byte[] bytes = fut.get(10_000); + + Assert.assertArrayEquals(val, bytes); + } + + /** + * @throws IgniteCheckedException If failed. + */ + public void testSaveValueLargeThenQueueSizeMultiThreaded() throws Exception { + final String spaceName = "mySpace"; + + final int threads = 5; + + long DURATION = 30_000; + + final int maxSize = FileSwapSpaceSpi.DFLT_QUE_SIZE * 2; + + final AtomicBoolean done = new AtomicBoolean(); + + try { + IgniteInternalFuture fut = multithreadedAsync(new Callable() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!done.get()) { + SwapKey key = new SwapKey(rnd.nextInt(1000)); + + spi.store(spaceName, key, new byte[rnd.nextInt(0, maxSize)], context()); + } + + return null; + } + }, threads, " async-put"); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + finally { + done.set(true); + } + } + + /** + * @param spaceName Space name. + * @param key Key. + * @param val Value. + * @throws Exception If failed. + * @return Read bytes. + */ + private byte[] saveAndGet(final String spaceName, final SwapKey key, byte[] val) throws Exception { + spi.store(spaceName, key, val, context()); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return spi.read(spaceName, key, context()) != null; + } + }, 10_000); + + byte[] res = spi.read(spaceName, key, context()); + + assertNotNull(res); + + return res; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/780bf23d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 60d59d7..c494e73 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeDynam import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest; import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest; import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTest; +import org.apache.ignite.internal.processors.cache.CacheSwapUnswapGetTestSmallQueueSize; import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest; import org.apache.ignite.internal.processors.cache.CrossCacheLockTest; import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest; @@ -304,6 +305,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheVersionedEntryReplicatedTransactionalOffHeapSelfTest.class); suite.addTestSuite(CacheSwapUnswapGetTest.class); + suite.addTestSuite(CacheSwapUnswapGetTestSmallQueueSize.class); suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class); suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);