Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AFB0B196CF for ; Mon, 21 Mar 2016 14:20:26 +0000 (UTC) Received: (qmail 17113 invoked by uid 500); 21 Mar 2016 14:20:26 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 17072 invoked by uid 500); 21 Mar 2016 14:20:26 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 17045 invoked by uid 99); 21 Mar 2016 14:20:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Mar 2016 14:20:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6EA62DFC55; Mon, 21 Mar 2016 14:20:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: av@apache.org To: commits@ignite.apache.org Date: Mon, 21 Mar 2016 14:20:27 -0000 Message-Id: <796a46e2889f4a30b91a11b18b31a28e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/50] [abbrv] ignite git commit: IGNITE-2709 - Fixed potential SOE on high-contented cache locks - Fixes #509. IGNITE-2709 - Fixed potential SOE on high-contented cache locks - Fixes #509. Signed-off-by: Alexey Goncharuk Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99bbc723 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99bbc723 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99bbc723 Branch: refs/heads/ignite-2801 Commit: 99bbc72383375c41c980a40bb23461e0b4ec8b7e Parents: 1a3ec64 Author: Alexey Goncharuk Authored: Wed Feb 24 18:45:00 2016 -0800 Committer: Alexey Goncharuk Committed: Thu Feb 25 16:38:45 2016 -0800 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../processors/cache/GridCacheMvccManager.java | 142 ++++++++++++------- .../IgniteCachePutStackOverflowSelfTest.java | 133 +++++++++++++++++ .../testsuites/IgniteCacheTestSuite5.java | 2 + 4 files changed, 230 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/99bbc723/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index de7c10b..095d199 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -367,6 +367,9 @@ public final class IgniteSystemProperties { /** JDBC driver cursor remove delay. */ public static final String IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY = "IGNITE_JDBC_DRIVER_CURSOR_RMV_DELAY"; + /** Maximum number of nested listener calls before listener notification becomes asynchronous. */ + public static final String IGNITE_MAX_NESTED_LISTENER_CALLS = "IGNITE_MAX_NESTED_LISTENER_CALLS"; + /** * Manages {@link OptimizedMarshaller} behavior of {@code serialVersionUID} computation for * {@link Serializable} classes. http://git-wip-us.apache.org/repos/asf/ignite/blob/99bbc723/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index dbc6992..fb05ca5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -48,6 +49,7 @@ import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -63,6 +65,8 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_NESTED_LISTENER_CALLS; +import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; @@ -75,6 +79,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** Maxim number of removed locks. */ private static final int MAX_REMOVED_LOCKS = 10240; + /** */ + private static final int MAX_NESTED_LSNR_CALLS = getInteger(IGNITE_MAX_NESTED_LISTENER_CALLS, 5); + /** Pending locks per thread. */ private final ThreadLocal> pending = new ThreadLocal>() { @@ -115,6 +122,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** Finish futures. */ private final ConcurrentLinkedDeque8 finishFuts = new ConcurrentLinkedDeque8<>(); + /** Nested listener calls. */ + private final ThreadLocal nestedLsnrCalls = new ThreadLocal() { + @Override protected Integer initialValue() { + return 0; + } + }; + /** Logger. */ @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"}) private IgniteLogger exchLog; @@ -127,60 +141,26 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final GridCacheMvccCallback cb = new GridCacheMvccCallback() { /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) - @Override public void onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate prev, - GridCacheMvccCandidate owner) { - assert entry != null; - assert owner != prev : "New and previous owner are identical instances: " + owner; - assert owner == null || prev == null || !owner.version().equals(prev.version()) : - "New and previous owners have identical versions [owner=" + owner + ", prev=" + prev + ']'; - - if (log.isDebugEnabled()) - log.debug("Received owner changed callback [" + entry.key() + ", owner=" + owner + ", prev=" + - prev + ']'); - - if (owner != null && (owner.local() || owner.nearLocal())) { - Collection> futCol = mvccFuts.get(owner.version()); - - if (futCol != null) { - ArrayList> futColCp; + @Override public void onOwnerChanged(final GridCacheEntryEx entry, final GridCacheMvccCandidate prev, + final GridCacheMvccCandidate owner) { + int nested = nestedLsnrCalls.get(); - synchronized (futCol) { - futColCp = new ArrayList<>(futCol.size()); + if (nested < MAX_NESTED_LSNR_CALLS) { + nestedLsnrCalls.set(nested + 1); - futColCp.addAll(futCol); - } - - // Must invoke onOwnerChanged outside of synchronization block. - for (GridCacheMvccFuture fut : futColCp) { - if (!fut.isDone()) { - GridCacheMvccFuture mvccFut = (GridCacheMvccFuture)fut; - - // Since this method is called outside of entry synchronization, - // we can safely invoke any method on the future. - // Also note that we don't remove future here if it is done. - // The removal is initiated from within future itself. - if (mvccFut.onOwnerChanged(entry, owner)) - return; - } - } + try { + notifyOwnerChanged(entry, prev, owner); + } + finally { + nestedLsnrCalls.set(nested); } } - - if (log.isDebugEnabled()) - log.debug("Lock future not found for owner change callback (will try transaction futures) [owner=" + - owner + ", prev=" + prev + ", entry=" + entry + ']'); - - // If no future was found, delegate to transaction manager. - if (cctx.tm().onOwnerChanged(entry, owner)) { - if (log.isDebugEnabled()) - log.debug("Found transaction for changed owner: " + owner); - } - else if (log.isDebugEnabled()) - log.debug("Failed to find transaction for changed owner: " + owner); - - if (!finishFuts.isEmptyx()) { - for (FinishLockFuture f : finishFuts) - f.recheck(entry); + else { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override public void run() { + notifyOwnerChanged(entry, prev, owner); + } + }, true); } } @@ -201,6 +181,68 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } }; + /** + * @param entry Entry to notify callback for. + * @param prev Previous lock owner. + * @param owner Current lock owner. + */ + private void notifyOwnerChanged(final GridCacheEntryEx entry, final GridCacheMvccCandidate prev, + final GridCacheMvccCandidate owner) { + assert entry != null; + assert owner != prev : "New and previous owner are identical instances: " + owner; + assert owner == null || prev == null || !owner.version().equals(prev.version()) : + "New and previous owners have identical versions [owner=" + owner + ", prev=" + prev + ']'; + + if (log.isDebugEnabled()) + log.debug("Received owner changed callback [" + entry.key() + ", owner=" + owner + ", prev=" + + prev + ']'); + + if (owner != null && (owner.local() || owner.nearLocal())) { + Collection> futCol = mvccFuts.get(owner.version()); + + if (futCol != null) { + ArrayList> futColCp; + + synchronized (futCol) { + futColCp = new ArrayList<>(futCol.size()); + + futColCp.addAll(futCol); + } + + // Must invoke onOwnerChanged outside of synchronization block. + for (GridCacheMvccFuture fut : futColCp) { + if (!fut.isDone()) { + final GridCacheMvccFuture mvccFut = (GridCacheMvccFuture)fut; + + // Since this method is called outside of entry synchronization, + // we can safely invoke any method on the future. + // Also note that we don't remove future here if it is done. + // The removal is initiated from within future itself. + if (mvccFut.onOwnerChanged(entry, owner)) + return; + } + } + } + } + + if (log.isDebugEnabled()) + log.debug("Lock future not found for owner change callback (will try transaction futures) [owner=" + + owner + ", prev=" + prev + ", entry=" + entry + ']'); + + // If no future was found, delegate to transaction manager. + if (cctx.tm().onOwnerChanged(entry, owner)) { + if (log.isDebugEnabled()) + log.debug("Found transaction for changed owner: " + owner); + } + else if (log.isDebugEnabled()) + log.debug("Failed to find transaction for changed owner: " + owner); + + if (!finishFuts.isEmptyx()) { + for (FinishLockFuture f : finishFuts) + f.recheck(entry); + } + } + /** Discovery listener. */ @GridToStringExclude private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { http://git-wip-us.apache.org/repos/asf/ignite/blob/99bbc723/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java new file mode 100644 index 0000000..55d7192 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutStackOverflowSelfTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class IgniteCachePutStackOverflowSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception if failed. + */ + public void testStackLocal() throws Exception { + checkCache(CacheMode.LOCAL); + } + + /** + * @throws Exception if failed. + */ + public void testStackPartitioned() throws Exception { + checkCache(CacheMode.PARTITIONED); + } + + /** + * @throws Exception if failed. + */ + public void testStackReplicated() throws Exception { + checkCache(CacheMode.REPLICATED); + } + + /** + * @throws Exception if failed. + */ + private void checkCache(CacheMode mode) throws Exception { + final Ignite ignite = ignite(0); + + final IgniteCache cache = ignite.getOrCreateCache(new CacheConfiguration<>("cache") + .setCacheMode(mode) + .setAtomicityMode(TRANSACTIONAL)); + + try { + Thread[] threads = new Thread[256]; + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + // Lock the key. + final String key = "key"; + + cache.get(key); + + // Simulate high contention. + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread() { + @Override public void run() { + cache.put(key, 1); + } + }; + + threads[i].start(); + } + + U.sleep(2_000); + + cache.put(key, 1); + + tx.commit(); + } + + System.out.println("Waiting for threads to finish..."); + + for (Thread thread : threads) + thread.join(); + } + finally { + ignite.destroyCache("cache"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/99bbc723/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java index 34b12a9..3eb0b13 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java @@ -20,6 +20,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.CacheNearReaderUpdateTest; import org.apache.ignite.internal.processors.cache.CacheSerializableTransactionsTest; +import org.apache.ignite.internal.processors.cache.IgniteCachePutStackOverflowSelfTest; import org.apache.ignite.internal.processors.cache.IgniteCacheStoreCollectionTest; import org.apache.ignite.internal.processors.cache.store.IgniteCacheWriteBehindNoUpdateSelfTest; @@ -38,6 +39,7 @@ public class IgniteCacheTestSuite5 extends TestSuite { suite.addTestSuite(CacheNearReaderUpdateTest.class); suite.addTestSuite(IgniteCacheStoreCollectionTest.class); suite.addTestSuite(IgniteCacheWriteBehindNoUpdateSelfTest.class); + suite.addTestSuite(IgniteCachePutStackOverflowSelfTest.class); return suite; }