Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 03AE2200C7E for ; Tue, 23 May 2017 14:28:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 02465160BD3; Tue, 23 May 2017 12:28:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EE2B8160BC3 for ; Tue, 23 May 2017 14:28:38 +0200 (CEST) Received: (qmail 15651 invoked by uid 500); 23 May 2017 12:28:38 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 15642 invoked by uid 99); 23 May 2017 12:28:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 23 May 2017 12:28:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 135E9DFC2E; Tue, 23 May 2017 12:28:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sergi@apache.org To: commits@ignite.apache.org Message-Id: <92a55ff27f0c4c1bb4d15f8e94abe326@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: Fixed segmented indices snapshots. - Fixes #1936. Date: Tue, 23 May 2017 12:28:38 +0000 (UTC) archived-at: Tue, 23 May 2017 12:28:40 -0000 Repository: ignite Updated Branches: refs/heads/master 647fd195b -> 1554a1606 Fixed segmented indices snapshots. - Fixes #1936. Signed-off-by: Sergi Vladykin Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1554a160 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1554a160 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1554a160 Branch: refs/heads/master Commit: 1554a1606244b46f042ecbf6aeb7eb09c3a2abb8 Parents: 647fd19 Author: Andrey V. Mashenkov Authored: Tue May 23 15:26:00 2017 +0300 Committer: Sergi Vladykin Committed: Tue May 23 15:26:00 2017 +0300 ---------------------------------------------------------------------- .../processors/query/h2/opt/GridH2Table.java | 107 ++++++++++++------- .../query/h2/twostep/GridMapQueryExecutor.java | 2 +- .../query/IgniteSqlSegmentedIndexSelfTest.java | 25 +++++ 3 files changed, 93 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 41cf68b..ec728de 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -74,6 +74,9 @@ public class GridH2Table extends TableBase { private volatile ArrayList idxs; /** */ + private final int pkIndexPos; + + /** */ private final Map tmpIdxs = new HashMap<>(); /** */ @@ -86,7 +89,7 @@ public class GridH2Table extends TableBase { private final ConcurrentMap sessions = new ConcurrentHashMap8<>(); /** */ - private final AtomicReference actualSnapshot = new AtomicReference<>(); + private final AtomicReferenceArray actualSnapshot; /** */ private IndexColumn affKeyCol; @@ -164,21 +167,31 @@ public class GridH2Table extends TableBase { assert idxs != null; List clones = new ArrayList<>(idxs.size()); - for (Index index: idxs) { + for (Index index : idxs) { Index clone = createDuplicateIndexIfNeeded(index); if (clone != null) - clones.add(clone); + clones.add(clone); } idxs.addAll(clones); + boolean hasHashIndex = idxs.size() >= 2 && index(0).getIndexType().isHash(); + // Add scan index at 0 which is required by H2. - if (idxs.size() >= 2 && index(0).getIndexType().isHash()) + if (hasHashIndex) idxs.add(0, new GridH2PrimaryScanIndex(this, index(1), index(0))); else idxs.add(0, new GridH2PrimaryScanIndex(this, index(0), null)); snapshotEnabled = desc == null || desc.snapshotableIndex(); + pkIndexPos = hasHashIndex ? 2 : 1; + + final int segments = desc != null ? desc.configuration().getQueryParallelism() : + // Get index segments count from PK index. Null desc can be passed from tests. + index(pkIndexPos).segmentsCount(); + + actualSnapshot = snapshotEnabled ? new AtomicReferenceArray(Math.max(segments, 1)) : null; + lock = new ReentrantReadWriteLock(); } @@ -233,8 +246,13 @@ public class GridH2Table extends TableBase { throw new IllegalStateException("Table " + identifierString() + " already destroyed."); } - if (snapshotInLock()) - snapshotIndexes(null); + if (snapshotInLock()) { + final GridH2QueryContext qctx = GridH2QueryContext.get(); + + assert qctx != null; + + snapshotIndexes(null, qctx.segment()); + } return false; } @@ -255,21 +273,22 @@ public class GridH2Table extends TableBase { /** * @param qctx Query context. + * @param segment id of index segment to be snapshoted. */ - public void snapshotIndexes(GridH2QueryContext qctx) { + public void snapshotIndexes(GridH2QueryContext qctx, int segment) { if (!snapshotEnabled) return; - Object[] snapshots; + Object[] segmentSnapshot; // Try to reuse existing snapshots outside of the lock. - for (long waitTime = 200;; waitTime *= 2) { // Increase wait time to avoid starvation. - snapshots = actualSnapshot.get(); + for (long waitTime = 200; ; waitTime *= 2) { // Increase wait time to avoid starvation. + segmentSnapshot = actualSnapshot.get(segment); - if (snapshots != null) { // Reuse existing snapshot without locking. - snapshots = doSnapshotIndexes(snapshots, qctx); + if (segmentSnapshot != null) { // Reuse existing snapshot without locking. + segmentSnapshot = doSnapshotIndexes(segment, segmentSnapshot, qctx); - if (snapshots != null) + if (segmentSnapshot != null) return; // Reused successfully. } @@ -281,17 +300,17 @@ public class GridH2Table extends TableBase { ensureNotDestroyed(); // Try again inside of the lock. - snapshots = actualSnapshot.get(); + segmentSnapshot = actualSnapshot.get(segment); - if (snapshots != null) // Try reusing. - snapshots = doSnapshotIndexes(snapshots, qctx); + if (segmentSnapshot != null) // Try reusing. + segmentSnapshot = doSnapshotIndexes(segment, segmentSnapshot, qctx); - if (snapshots == null) { // Reuse failed, produce new snapshots. - snapshots = doSnapshotIndexes(null, qctx); + if (segmentSnapshot == null) { // Reuse failed, produce new snapshots. + segmentSnapshot = doSnapshotIndexes(segment,null, qctx); - assert snapshots != null; + assert segmentSnapshot != null; - actualSnapshot.set(snapshots); + actualSnapshot.set(segment, segmentSnapshot); } } finally { @@ -375,19 +394,22 @@ public class GridH2Table extends TableBase { * Must be called inside of write lock because when using multiple indexes we have to ensure that all of them have * the same contents at snapshot taking time. * + * @param segment id of index segment snapshot. + * @param segmentSnapshot snapshot to be reused. * @param qctx Query context. * @return New indexes data snapshot. */ @SuppressWarnings("unchecked") - private Object[] doSnapshotIndexes(Object[] snapshots, GridH2QueryContext qctx) { + private Object[] doSnapshotIndexes(int segment, Object[] segmentSnapshot, GridH2QueryContext qctx) { assert snapshotEnabled; - if (snapshots == null) // Nothing to reuse, create new snapshots. - snapshots = new Object[idxs.size() - 2]; + //TODO: make HashIndex snapshotable or remove it at all? + if (segmentSnapshot == null) // Nothing to reuse, create new snapshots. + segmentSnapshot = new Object[idxs.size() - pkIndexPos]; - // Take snapshots on all except first which is scan and second which is hash. - for (int i = 2, len = idxs.size(); i < len; i++) { - Object s = snapshots[i - 2]; + // Take snapshots on all except first which is scan. + for (int i = pkIndexPos, len = idxs.size(); i < len; i++) { + Object s = segmentSnapshot[i - pkIndexPos]; boolean reuseExisting = s != null; @@ -401,20 +423,20 @@ public class GridH2Table extends TableBase { if (qctx != null) qctx.clearSnapshots(); - for (int j = 2; j < i; j++) + for (int j = pkIndexPos; j < i; j++) if ((idxs.get(j) instanceof GridH2IndexBase)) index(j).releaseSnapshot(); // Drop invalidated snapshot. - actualSnapshot.compareAndSet(snapshots, null); + actualSnapshot.compareAndSet(segment, segmentSnapshot, null); return null; } - snapshots[i - 2] = s; + segmentSnapshot[i - pkIndexPos] = s; } - return snapshots; + return segmentSnapshot; } /** {@inheritDoc} */ @@ -587,7 +609,7 @@ public class GridH2Table extends TableBase { int len = idxs.size(); - int i = 2; + int i = pkIndexPos; // Put row if absent to all indexes sequentially. // Start from 3 because 0 - Scan (don't need to update), 1 - PK hash (already updated), 2 - PK (already updated). @@ -609,7 +631,7 @@ public class GridH2Table extends TableBase { if (old != null) { // Remove row from all indexes. // Start from 3 because 0 - Scan (don't need to update), 1 - PK hash (already updated), 2 - PK (already updated). - for (int i = 3, len = idxs.size(); i < len; i++) { + for (int i = pkIndexPos + 1, len = idxs.size(); i < len; i++) { if (!(idxs.get(i) instanceof GridH2IndexBase)) continue; Row res = index(i).remove(old); @@ -627,7 +649,8 @@ public class GridH2Table extends TableBase { } // The snapshot is not actual after update. - actualSnapshot.set(null); + if (actualSnapshot != null) + actualSnapshot.set(pk.segmentForRow(row), null); return true; } @@ -684,9 +707,10 @@ public class GridH2Table extends TableBase { ArrayList indexes() { ArrayList res = new ArrayList<>(idxs.size() - 2); - for (int i = 2, len = idxs.size(); i < len; i++) + for (int i = pkIndexPos, len = idxs.size(); i < len; i++) { if (idxs.get(i) instanceof GridH2IndexBase) res.add(index(i)); + } return res; } @@ -695,6 +719,8 @@ public class GridH2Table extends TableBase { * */ public void markRebuildFromHashInProgress(boolean value) { + assert !value || (idxs.size() >= 2 && index(1).getIndexType().isHash()) : "Table has no hash index."; + rebuildFromHashInProgress = value; } @@ -759,7 +785,8 @@ public class GridH2Table extends TableBase { Index cloneIdx = createDuplicateIndexIfNeeded(idx); - ArrayList newIdxs = new ArrayList<>(idxs.size() + ((cloneIdx == null) ? 1 : 2)); + ArrayList newIdxs = new ArrayList<>( + idxs.size() + ((cloneIdx == null) ? 1 : 2)); newIdxs.addAll(idxs); @@ -837,14 +864,14 @@ public class GridH2Table extends TableBase { try { ArrayList idxs = new ArrayList<>(this.idxs); - Index targetIdx = (h2Idx instanceof GridH2ProxyIndex)? - ((GridH2ProxyIndex)h2Idx).underlyingIndex(): h2Idx; + Index targetIdx = (h2Idx instanceof GridH2ProxyIndex) ? + ((GridH2ProxyIndex)h2Idx).underlyingIndex() : h2Idx; - for (int i = 2; i < idxs.size();) { + for (int i = pkIndexPos; i < idxs.size();) { Index idx = idxs.get(i); if (idx == targetIdx || (idx instanceof GridH2ProxyIndex && - ((GridH2ProxyIndex)idx).underlyingIndex() == targetIdx)) { + ((GridH2ProxyIndex)idx).underlyingIndex() == targetIdx)) { idxs.remove(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 43cc230..6d76eea 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -573,7 +573,7 @@ public class GridMapQueryExecutor { Objects.requireNonNull(h2Tbl, tbl.toString()); - h2Tbl.snapshotIndexes(qctx); + h2Tbl.snapshotIndexes(qctx, segmentId); snapshotedTbls.add(h2Tbl); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1554a160/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java index 586b81e..03c3f1e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSegmentedIndexSelfTest.java @@ -135,6 +135,31 @@ public class IgniteSqlSegmentedIndexSelfTest extends GridCommonAbstractTest { } /** + * Check correct index snapshots with segmented indices. + * @throws Exception If failed. + */ + public void testSegmentedIndexReproducableResults() throws Exception { + ignite(0).createCache(cacheConfig(ORG_CACHE_NAME, true, Integer.class, Organization.class)); + + IgniteCache cache = ignite(0).cache(ORG_CACHE_NAME); + + // Unequal entries distribution among partitions. + int expectedSize = nodesCount() * QRY_PARALLELISM_LVL * 3 / 2; + + for (int i = 0; i < expectedSize; i++) + cache.put(i, new Organization("org-" + i)); + + String select0 = "select * from \"org\".Organization o"; + + // Check for stable results. + for(int i = 0; i < 10; i++) { + List> result = cache.query(new SqlFieldsQuery(select0)).getAll(); + + assertEquals(expectedSize, result.size()); + } + } + + /** * Run tests on single-node grid * * @throws Exception If failed.