Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 81EF5200BBE for ; Fri, 11 Nov 2016 15:18:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 809EC160B01; Fri, 11 Nov 2016 14:18:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6E718160B17 for ; Fri, 11 Nov 2016 15:18:51 +0100 (CET) Received: (qmail 68828 invoked by uid 500); 11 Nov 2016 14:18:50 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 68576 invoked by uid 99); 11 Nov 2016 14:18:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 14:18:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 60CC5F175A; Fri, 11 Nov 2016 14:18:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 11 Nov 2016 14:18:59 -0000 Message-Id: <6d58c15ada044d4fbd85dcb0ea56656c@git.apache.org> In-Reply-To: <0482337ea45f4529bb80dc4e4f31578a@git.apache.org> References: <0482337ea45f4529bb80dc4e4f31578a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/17] ignite git commit: IGNITE-4145: Fixes "No query result found for request" exception when running multiple queries concurrently. This closes #1218. archived-at: Fri, 11 Nov 2016 14:18:52 -0000 IGNITE-4145: Fixes "No query result found for request" exception when running multiple queries concurrently. This closes #1218. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a70f0bac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a70f0bac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a70f0bac Branch: refs/heads/ignite-4154-opt2 Commit: a70f0bac3ac2487b8ab58598ad921daa952b485f Parents: 53876d3 Author: Andrey V. Mashenkov Authored: Fri Nov 11 13:03:40 2016 +0300 Committer: Andrey V. Mashenkov Committed: Fri Nov 11 13:03:40 2016 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMergeIndex.java | 49 +++++++++++----- .../IgniteCacheQueryMultiThreadedSelfTest.java | 59 ++++++++++++++++++++ 2 files changed, 93 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a70f0bac/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 3914bd7..7ac2ee3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -177,36 +177,50 @@ public abstract class GridMergeIndex extends BaseIndex { public final void addPage(GridResultPage page) { int pageRowsCnt = page.rowsInPage(); - if (pageRowsCnt != 0) - addPage0(page); - Counter cnt = remainingRows.get(page.source()); + // RemainingRowsCount should be updated before page adding to avoid race + // in GridMergeIndexUnsorted cursor iterator + int remainingRowsCount; + int allRows = page.response().allRows(); if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter. - assert !cnt.initialized : "Counter is already initialized."; + assert cnt.state == State.UNINITIALIZED : "Counter is already initialized."; + + remainingRowsCount = cnt.addAndGet(allRows - pageRowsCnt); - cnt.addAndGet(allRows); expRowsCnt.addAndGet(allRows); + // Add page before setting initialized flag to avoid race condition with adding last page + if (pageRowsCnt > 0) + addPage0(page); + // We need this separate flag to handle case when the first source contains only one page // and it will signal that all remaining counters are zero and fetch is finished. - cnt.initialized = true; + cnt.state = State.INITIALIZED; } + else { + remainingRowsCount = cnt.addAndGet(-pageRowsCnt); - if (cnt.addAndGet(-pageRowsCnt) == 0) { // Result can be negative in case of race between messages, it is ok. - boolean last = true; + if (pageRowsCnt > 0) + addPage0(page); + } - for (Counter c : remainingRows.values()) { // Check all the sources. - if (c.get() != 0 || !c.initialized) { - last = false; + if (remainingRowsCount == 0) { // Result can be negative in case of race between messages, it is ok. + if (cnt.state == State.UNINITIALIZED) + return; - break; - } + // Guarantee that finished state possible only if counter is zero and all pages was added + cnt.state = State.FINISHED; + + for (Counter c : remainingRows.values()) { // Check all the sources. + if (c.state != State.FINISHED) + return; } - if (last && lastSubmitted.compareAndSet(false, true)) { + if (lastSubmitted.compareAndSet(false, true)) { + // Add page-marker that last page was added addPage0(new GridResultPage(null, page.source(), null) { @Override public boolean isLast() { return true; @@ -426,11 +440,16 @@ public abstract class GridMergeIndex extends BaseIndex { } } + /** */ + enum State { + UNINITIALIZED, INITIALIZED, FINISHED + } + /** * Counter with initialization flag. */ private static class Counter extends AtomicInteger { /** */ - volatile boolean initialized; + volatile State state = State.UNINITIALIZED; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a70f0bac/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java index be644e2..efa6bd6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheQueryMultiThreadedSelfTest.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -34,6 +35,7 @@ import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy; import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; @@ -730,6 +732,63 @@ public class IgniteCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTes } /** + * SqlFieldsQuery paging mechanics stress test + * + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testMultiThreadedSqlFieldsQuery() throws Throwable { + int threadCnt = 16; + final int keyCnt = 1100; // set resultSet size bigger than page size + final int logMod = 5000; + + final Ignite g = grid(0); + + // Put test values into cache. + final IgniteCache c = g.cache(null); + + for (int i = 0; i < keyCnt; i++) + c.put(i, new TestValue(i)); + + final AtomicInteger cnt = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteInternalFuture fut = multithreadedAsync( + new CAX() { + @Override public void applyx() throws IgniteCheckedException { + int iter = 0; + + while (!done.get() && !Thread.currentThread().isInterrupted()) { + iter++; + + List> entries = + c.query(new SqlFieldsQuery("SELECT * from TestValue").setPageSize(100)).getAll(); + + assert entries != null; + + assertEquals("Entries count is not as expected on iteration: " + iter, keyCnt, entries.size()); + + if (cnt.incrementAndGet() % logMod == 0) { + GridCacheQueryManager qryMgr = + ((IgniteKernal)g).internalCache().context().queries(); + + assert qryMgr != null; + + qryMgr.printMemoryStats(); + } + } + } + }, threadCnt); + + Thread.sleep(DURATION); + + done.set(true); + + fut.get(); + } + + /** * Test value. */ private static class TestValue implements Serializable {