Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B54A200C28 for ; Mon, 13 Mar 2017 10:07:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2A0A0160B60; Mon, 13 Mar 2017 09:07:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6033B160BA7 for ; Mon, 13 Mar 2017 10:07:13 +0100 (CET) Received: (qmail 49514 invoked by uid 500); 13 Mar 2017 09:07:12 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 47211 invoked by uid 99); 13 Mar 2017 09:07:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Mar 2017 09:07:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0AB77E178B; Mon, 13 Mar 2017 09:07:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 13 Mar 2017 09:07:45 -0000 Message-Id: <407cd3aed0b1425095fce87b1abeb63d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [38/50] [abbrv] ignite git commit: ignite-1.9 - SQL related fixes and improvements: - Sorted MERGE index - EXPLAIN fixes - Replicated subqueries fixes archived-at: Mon, 13 Mar 2017 09:07:17 -0000 ignite-1.9 - SQL related fixes and improvements: - Sorted MERGE index - EXPLAIN fixes - Replicated subqueries fixes Squashed commit of the following: commit 423c2155c85ed9be8dffb3517b7331b753e1ce5c Author: Sergi Vladykin Date: Thu Mar 9 23:21:38 2017 +0300 ignite-1.9.1 - test fix commit ff3c1f2967905b0bcac7661014656d1c080fa803 Author: Sergi Vladykin Date: Thu Mar 9 11:08:34 2017 +0300 ignite-1.9.0 - replicated subqueries fix commit bc0801a3c976f5d87cab2c414f76f69dc28b43d7 Author: Sergi Vladykin Date: Wed Mar 8 16:03:40 2017 +0300 ignite-1.9.0 - fix for distributed join test commit f1f1d96c6babaadab9e3ed1fbb3c9740c94d8209 Author: Sergi Vladykin Date: Wed Mar 8 15:28:44 2017 +0300 ignite-1.9.0 - fix for distributed join test commit a8751d535b3e025a804c441204465e94035a5247 Author: Sergi Vladykin Date: Tue Feb 28 18:46:07 2017 +0300 ignite-1.9 - splitter fixes commit 0601ce6e291eb4689d526e922b02fd9e21df5b08 Author: Sergi Vladykin Date: Sun Feb 26 23:24:14 2017 +0300 ignite-1.9 - merge index test commit 4ad048e248157d799a325b3ce9975d4ad8a9fb49 Author: Sergi Vladykin Date: Sun Feb 26 23:19:49 2017 +0300 ignite-1.9 - merge index commit 4ea63d7335000b8f30bfbd1bb907e411cd62a5e8 Author: Sergi Vladykin Date: Sun Feb 26 22:44:51 2017 +0300 ignite-1.9 - unsorted index fixed commit a639bff6f25a8397e49a892f830c9de23c847127 Author: Sergi Vladykin Date: Sun Feb 26 20:08:26 2017 +0300 ignite-1.9 - sorted index fixes2 commit ee9d524f5a0d6f1c416345822e8201c327f1e562 Author: Sergi Vladykin Date: Fri Feb 24 16:00:26 2017 +0300 ignite-1.9 - sorted index fixes commit fc42406a9e55851d53d9dfed8e6cf3c8b12af345 Author: Sergi Vladykin Date: Thu Feb 23 16:46:39 2017 +0300 ignite-1.9 - sorted index Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8817190e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8817190e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8817190e Branch: refs/heads/ignite-4768 Commit: 8817190e1dd31d869682df0167bb3e82fb597aad Parents: 8362fe7 Author: Sergi Vladykin Authored: Thu Mar 9 23:30:09 2017 +0300 Committer: Sergi Vladykin Committed: Thu Mar 9 23:30:09 2017 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheSqlQuery.java | 82 ++++- .../processors/query/h2/IgniteH2Indexing.java | 4 +- .../query/h2/opt/GridH2CollocationModel.java | 6 +- .../query/h2/opt/GridH2ScanIndex.java | 273 +++++++++++++++++ .../processors/query/h2/opt/GridH2Table.java | 244 +-------------- .../processors/query/h2/sql/GridSqlQuery.java | 17 -- .../query/h2/sql/GridSqlQueryParser.java | 4 +- .../query/h2/sql/GridSqlQuerySplitter.java | 38 ++- .../query/h2/sql/GridSqlSortColumn.java | 41 +++ .../query/h2/twostep/GridMapQueryExecutor.java | 83 +++-- .../query/h2/twostep/GridMergeIndex.java | 300 +++++++++++-------- .../query/h2/twostep/GridMergeIndexSorted.java | 172 ++++++++--- .../h2/twostep/GridMergeIndexUnsorted.java | 67 ++++- .../query/h2/twostep/GridMergeTable.java | 70 ++++- .../h2/twostep/GridReduceQueryExecutor.java | 101 +++++-- .../query/h2/twostep/GridResultPage.java | 34 ++- .../h2/twostep/msg/GridH2QueryRequest.java | 11 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 10 +- .../query/IgniteSqlSplitterSelfTest.java | 100 ++++++- .../query/h2/sql/H2CompareBigQueryTest.java | 4 +- .../IgniteCacheQuerySelfTestSuite.java | 2 + .../processors/query/h2/sql/bigQuery.sql | 34 ++- 22 files changed, 1138 insertions(+), 559 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java index 18688b7..c4bb205 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.query; import java.nio.ByteBuffer; import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.GridDirectTransient; @@ -74,6 +76,19 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { /** Field kept for backward compatibility. */ private String alias; + /** Sort columns. */ + @GridToStringInclude + @GridDirectTransient + private transient List sort; + + /** If we have partitioned tables in this query. */ + @GridToStringInclude + @GridDirectTransient + private transient boolean partitioned; + + /** Single node to execute the query on. */ + private UUID node; + /** * For {@link Message}. */ @@ -218,12 +233,18 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { writer.incrementState(); case 1: - if (!writer.writeByteArray("paramsBytes", paramsBytes)) + if (!writer.writeUuid("node", node)) return false; writer.incrementState(); case 2: + if (!writer.writeByteArray("paramsBytes", paramsBytes)) + return false; + + writer.incrementState(); + + case 3: if (!writer.writeString("qry", qry)) return false; @@ -251,7 +272,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 1: - paramsBytes = reader.readByteArray("paramsBytes"); + node = reader.readUuid("node"); if (!reader.isLastRead()) return false; @@ -259,6 +280,14 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { reader.incrementState(); case 2: + paramsBytes = reader.readByteArray("paramsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: qry = reader.readString("qry"); if (!reader.isLastRead()) @@ -278,7 +307,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 3; + return 4; } /** @@ -292,6 +321,8 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { cp.cols = cols; cp.paramIdxs = paramIdxs; cp.paramsSize = paramsSize; + cp.sort = sort; + cp.partitioned = partitioned; if (F.isEmpty(args)) cp.params = EMPTY_PARAMS; @@ -304,4 +335,49 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable { return cp; } + + /** + * @param sort Sort columns. + */ + public void sortColumns(List sort) { + this.sort = sort; + } + + /** + * @return Sort columns. + */ + public List sortColumns() { + return sort; + } + + /** + * @param partitioned If the query contains partitioned tables. + */ + public void partitioned(boolean partitioned) { + this.partitioned = partitioned; + } + + /** + * @return {@code true} If the query contains partitioned tables. + */ + public boolean isPartitioned() { + return partitioned; + } + + /** + * @return Single node to execute the query on or {@code null} if need to execute on all the nodes. + */ + public UUID node() { + return node; + } + + /** + * @param node Single node to execute the query on or {@code null} if need to execute on all the nodes. + * @return {@code this}. + */ + public GridCacheSqlQuery node(UUID node) { + this.node = node; + + return this; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index b4bf608..8de8dc4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -85,7 +85,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshalla import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; -import org.apache.ignite.internal.processors.query.GridRunningQueryInfo; import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; @@ -155,7 +154,6 @@ import org.h2.result.SortOrder; import org.h2.server.web.WebServer; import org.h2.table.Column; import org.h2.table.IndexColumn; -import org.h2.table.Table; import org.h2.tools.Server; import org.h2.util.JdbcUtils; import org.h2.value.DataType; @@ -1453,7 +1451,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } - Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt); + Prepared prepared = GridSqlQueryParser.prepared(stmt); if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery()) throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver", http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java index ce11fd5..4df355e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java @@ -300,10 +300,10 @@ public final class GridH2CollocationModel { assert childFilters == null; // We are at table instance. - GridH2Table tbl = (GridH2Table)filter().getTable(); + Table tbl = filter().getTable(); // Only partitioned tables will do distributed joins. - if (!tbl.isPartitioned()) { + if (!(tbl instanceof GridH2Table) || !((GridH2Table)tbl).isPartitioned()) { type = Type.REPLICATED; multiplier = MULTIPLIER_COLLOCATED; @@ -593,7 +593,7 @@ public final class GridH2CollocationModel { private GridH2CollocationModel child(int i, boolean create) { GridH2CollocationModel child = children[i]; - if (child == null && create && isChildTableOrView(i, null)) { + if (child == null && create) { TableFilter f = childFilters[i]; if (f.getTable().isView()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java new file mode 100644 index 0000000..3ddd490 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2ScanIndex.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.opt; + +import java.util.ArrayList; +import org.h2.engine.Database; +import org.h2.engine.DbObject; +import org.h2.engine.Session; +import org.h2.index.BaseIndex; +import org.h2.index.Cursor; +import org.h2.index.IndexLookupBatch; +import org.h2.index.IndexType; +import org.h2.message.DbException; +import org.h2.result.Row; +import org.h2.result.SearchRow; +import org.h2.schema.Schema; +import org.h2.table.Column; +import org.h2.table.IndexColumn; +import org.h2.table.Table; +import org.h2.table.TableFilter; + +/** + * Scan index base class. + */ +public abstract class GridH2ScanIndex extends BaseIndex { + /** */ + private static final IndexType TYPE = IndexType.createScan(false); + + /** */ + protected final D delegate; + + /** + * @param delegate Delegate. + */ + public GridH2ScanIndex(D delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public long getDiskSpaceUsed() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void add(Session ses, Row row) { + delegate.add(ses, row); + } + + /** {@inheritDoc} */ + @Override public boolean canFindNext() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean canGetFirstOrLast() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean canScan() { + return delegate.canScan(); + } + + /** {@inheritDoc} */ + @Override public final void close(Session ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void commit(int operation, Row row) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int compareRows(SearchRow rowData, SearchRow compare) { + return delegate.compareRows(rowData, compare); + } + + /** {@inheritDoc} */ + @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) { + return find(filter.getSession(), first, last); + } + + /** {@inheritDoc} */ + @Override public Cursor find(Session ses, SearchRow first, SearchRow last) { + return delegate.find(ses, null, null); + } + + /** {@inheritDoc} */ + @Override public Cursor findFirstOrLast(Session ses, boolean first) { + throw DbException.getUnsupportedException("SCAN"); + } + + /** {@inheritDoc} */ + @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) { + throw DbException.throwInternalError(); + } + + /** {@inheritDoc} */ + @Override public int getColumnIndex(Column col) { + return -1; + } + + /** {@inheritDoc} */ + @Override public Column[] getColumns() { + return delegate.getColumns(); + } + + /** {@inheritDoc} */ + @Override public IndexColumn[] getIndexColumns() { + return delegate.getIndexColumns(); + } + + /** {@inheritDoc} */ + @Override public IndexType getIndexType() { + return TYPE; + } + + /** {@inheritDoc} */ + @Override public Row getRow(Session ses, long key) { + return delegate.getRow(ses, key); + } + + /** {@inheritDoc} */ + @Override public long getRowCount(Session ses) { + return delegate.getRowCount(ses); + } + + /** {@inheritDoc} */ + @Override public long getRowCountApproximation() { + return delegate.getRowCountApproximation(); + } + + /** {@inheritDoc} */ + @Override public Table getTable() { + return delegate.getTable(); + } + + /** {@inheritDoc} */ + @Override public boolean isRowIdIndex() { + return delegate.isRowIdIndex(); + } + + /** {@inheritDoc} */ + @Override public boolean needRebuild() { + return false; + } + + /** {@inheritDoc} */ + @Override public void remove(Session ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void remove(Session ses, Row row) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void setSortedInsertMode(boolean sortedInsertMode) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IndexLookupBatch createLookupBatch(TableFilter filter) { + return delegate.createLookupBatch(filter); + } + + /** {@inheritDoc} */ + @Override public void truncate(Session ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Schema getSchema() { + return delegate.getSchema(); + } + + /** {@inheritDoc} */ + @Override public boolean isHidden() { + return delegate.isHidden(); + } + + /** {@inheritDoc} */ + @Override public void checkRename() { + throw DbException.getUnsupportedException("rename"); + } + + /** {@inheritDoc} */ + @Override public ArrayList getChildren() { + return delegate.getChildren(); + } + + /** {@inheritDoc} */ + @Override public String getComment() { + return delegate.getComment(); + } + + /** {@inheritDoc} */ + @Override public String getCreateSQL() { + return null; // Scan should return null. + } + + /** {@inheritDoc} */ + @Override public String getCreateSQLForCopy(Table tbl, String quotedName) { + return delegate.getCreateSQLForCopy(tbl, quotedName); + } + + /** {@inheritDoc} */ + @Override public Database getDatabase() { + return delegate.getDatabase(); + } + + /** {@inheritDoc} */ + @Override public String getDropSQL() { + return delegate.getDropSQL(); + } + + /** {@inheritDoc} */ + @Override public int getId() { + return delegate.getId(); + } + + /** {@inheritDoc} */ + @Override public String getSQL() { + return delegate.getSQL(); + } + + /** {@inheritDoc} */ + @Override public int getType() { + return delegate.getType(); + } + + /** {@inheritDoc} */ + @Override public boolean isTemporary() { + return delegate.isTemporary(); + } + + /** {@inheritDoc} */ + @Override public void removeChildrenAndResources(Session ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void rename(String newName) { + throw DbException.getUnsupportedException("rename"); + } + + /** {@inheritDoc} */ + @Override public void setComment(String comment) { + throw DbException.getUnsupportedException("comment"); + } + + /** {@inheritDoc} */ + @Override public void setTemporary(boolean temporary) { + throw DbException.getUnsupportedException("temporary"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 8d080ae..4d5ea4b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -34,22 +34,14 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.h2.api.TableEngine; import org.h2.command.ddl.CreateTableData; -import org.h2.engine.Database; -import org.h2.engine.DbObject; import org.h2.engine.Session; -import org.h2.index.BaseIndex; -import org.h2.index.Cursor; import org.h2.index.Index; -import org.h2.index.IndexLookupBatch; import org.h2.index.IndexType; import org.h2.message.DbException; import org.h2.result.Row; import org.h2.result.SearchRow; import org.h2.result.SortOrder; -import org.h2.schema.Schema; -import org.h2.table.Column; import org.h2.table.IndexColumn; -import org.h2.table.Table; import org.h2.table.TableBase; import org.h2.table.TableFilter; import org.h2.value.Value; @@ -857,93 +849,15 @@ public class GridH2Table extends TableBase { * Wrapper type for primary key. */ @SuppressWarnings("PackageVisibleInnerClass") - static class ScanIndex extends BaseIndex { + static class ScanIndex extends GridH2ScanIndex { /** */ static final String SCAN_INDEX_NAME_SUFFIX = "__SCAN_"; - /** */ - private static final IndexType TYPE = IndexType.createScan(false); - - /** */ - private final GridH2IndexBase delegate; - /** - * Constructor. - * - * @param delegate Index delegate to. + * @param delegate Delegate. */ - private ScanIndex(GridH2IndexBase delegate) { - this.delegate = delegate; - } - - /** {@inheritDoc} */ - @Override public long getDiskSpaceUsed() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void add(Session ses, Row row) { - delegate.add(ses, row); - } - - /** {@inheritDoc} */ - @Override public boolean canFindNext() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean canGetFirstOrLast() { - return false; - } - - /** {@inheritDoc} */ - @Override public boolean canScan() { - return delegate.canScan(); - } - - /** {@inheritDoc} */ - @Override public final void close(Session ses) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void commit(int operation, Row row) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int compareRows(SearchRow rowData, SearchRow compare) { - return delegate.compareRows(rowData, compare); - } - - /** {@inheritDoc} */ - @Override public Cursor find(TableFilter filter, SearchRow first, SearchRow last) { - return find(filter.getSession(), first, last); - } - - /** {@inheritDoc} */ - @Override public Cursor find(Session ses, SearchRow first, SearchRow last) { - return delegate.find(ses, null, null); - } - - /** {@inheritDoc} */ - @Override public Cursor findFirstOrLast(Session ses, boolean first) { - throw DbException.getUnsupportedException("SCAN"); - } - - /** {@inheritDoc} */ - @Override public Cursor findNext(Session ses, SearchRow higherThan, SearchRow last) { - throw DbException.throwInternalError(); - } - - /** {@inheritDoc} */ - @Override public int getColumnIndex(Column col) { - return -1; - } - - /** {@inheritDoc} */ - @Override public Column[] getColumns() { - return delegate.getColumns(); + public ScanIndex(GridH2IndexBase delegate) { + super(delegate); } /** {@inheritDoc} */ @@ -957,163 +871,13 @@ public class GridH2Table extends TableBase { } /** {@inheritDoc} */ - @Override public IndexColumn[] getIndexColumns() { - return delegate.getIndexColumns(); - } - - /** {@inheritDoc} */ - @Override public IndexType getIndexType() { - return TYPE; - } - - /** {@inheritDoc} */ @Override public String getPlanSQL() { return delegate.getTable().getSQL() + "." + SCAN_INDEX_NAME_SUFFIX; } /** {@inheritDoc} */ - @Override public Row getRow(Session ses, long key) { - return delegate.getRow(ses, key); - } - - /** {@inheritDoc} */ - @Override public long getRowCount(Session ses) { - return delegate.getRowCount(ses); - } - - /** {@inheritDoc} */ - @Override public long getRowCountApproximation() { - return delegate.getRowCountApproximation(); - } - - /** {@inheritDoc} */ - @Override public Table getTable() { - return delegate.getTable(); - } - - /** {@inheritDoc} */ - @Override public boolean isRowIdIndex() { - return delegate.isRowIdIndex(); - } - - /** {@inheritDoc} */ - @Override public boolean needRebuild() { - return false; - } - - /** {@inheritDoc} */ - @Override public void remove(Session ses) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void remove(Session ses, Row row) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void setSortedInsertMode(boolean sortedInsertMode) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IndexLookupBatch createLookupBatch(TableFilter filter) { - return delegate.createLookupBatch(filter); - } - - /** {@inheritDoc} */ - @Override public void truncate(Session ses) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Schema getSchema() { - return delegate.getSchema(); - } - - /** {@inheritDoc} */ - @Override public boolean isHidden() { - return delegate.isHidden(); - } - - /** {@inheritDoc} */ - @Override public void checkRename() { - throw DbException.getUnsupportedException("rename"); - } - - /** {@inheritDoc} */ - @Override public ArrayList getChildren() { - return delegate.getChildren(); - } - - /** {@inheritDoc} */ - @Override public String getComment() { - return delegate.getComment(); - } - - /** {@inheritDoc} */ - @Override public String getCreateSQL() { - return null; // Scan should return null. - } - - /** {@inheritDoc} */ - @Override public String getCreateSQLForCopy(Table tbl, String quotedName) { - return delegate.getCreateSQLForCopy(tbl, quotedName); - } - - /** {@inheritDoc} */ - @Override public Database getDatabase() { - return delegate.getDatabase(); - } - - /** {@inheritDoc} */ - @Override public String getDropSQL() { - return delegate.getDropSQL(); - } - - /** {@inheritDoc} */ - @Override public int getId() { - return delegate.getId(); - } - - /** {@inheritDoc} */ @Override public String getName() { return delegate.getName() + SCAN_INDEX_NAME_SUFFIX; } - - /** {@inheritDoc} */ - @Override public String getSQL() { - return delegate.getSQL(); - } - - /** {@inheritDoc} */ - @Override public int getType() { - return delegate.getType(); - } - - /** {@inheritDoc} */ - @Override public boolean isTemporary() { - return delegate.isTemporary(); - } - - /** {@inheritDoc} */ - @Override public void removeChildrenAndResources(Session ses) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void rename(String newName) { - throw DbException.getUnsupportedException("rename"); - } - - /** {@inheritDoc} */ - @Override public void setComment(String comment) { - throw DbException.getUnsupportedException("comment"); - } - - /** {@inheritDoc} */ - @Override public void setTemporary(boolean temporary) { - throw DbException.getUnsupportedException("temporary"); - } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java index 7d4b7f0..9511866 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuery.java @@ -38,9 +38,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs /** */ private GridSqlAst offset; - /** */ - private boolean distinct; - /** * @return Offset. */ @@ -56,20 +53,6 @@ public abstract class GridSqlQuery extends GridSqlStatement implements GridSqlAs } /** - * @return Distinct. - */ - public boolean distinct() { - return distinct; - } - - /** - * @param distinct New distinct. - */ - public void distinct(boolean distinct) { - this.distinct = distinct; - } - - /** * @return Sort. */ public List sort() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index 024529c..16d7105 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -415,7 +415,7 @@ public class GridSqlQueryParser { res.distinct(select.isDistinct()); Expression where = CONDITION.get(select); - res.where(parseExpression(where, false)); + res.where(parseExpression(where, true)); ArrayList tableFilters = new ArrayList<>(); @@ -447,7 +447,7 @@ public class GridSqlQueryParser { GridSqlElement gridFilter = parseTableFilter(f); from = from == null ? gridFilter : new GridSqlJoin(from, gridFilter, f.isJoinOuter(), - parseExpression(f.getJoinCondition(), false)); + parseExpression(f.getJoinCondition(), true)); } res.from(from); http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 277cabc..aec0b36 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -207,7 +207,6 @@ public class GridSqlQuerySplitter { // If we have distributed joins, then we have to optimize all MAP side queries // to have a correct join order with respect to batched joins and check if we need // distributed joins at all. - // TODO Also we need to have a list of table aliases to filter by primary or explicit partitions. if (distributedJoins) { boolean allCollocated = true; @@ -220,7 +219,7 @@ public class GridSqlQuerySplitter { mapSqlQry.query(parse(prepared, true).getSQL()); } - // We do not need distributed joins if all MAP queries are colocated. + // We do not need distributed joins if all MAP queries are collocated. if (allCollocated) distributedJoins = false; } @@ -861,6 +860,7 @@ public class GridSqlQuerySplitter { if (!tblAliases.contains(tblAlias)) return; + GridSqlType resType = col.resultType(); String uniqueColAlias = uniqueColumnAlias(col); GridSqlAlias colAlias = cols.get(uniqueColAlias); @@ -874,6 +874,7 @@ public class GridSqlQuerySplitter { col = column(uniqueColAlias); // col.tableAlias(wrapAlias.alias()); col.expressionInFrom(wrapAlias); + col.resultType(resType); prnt.child(childIdx, col); } @@ -1066,7 +1067,7 @@ public class GridSqlQuerySplitter { else if (qrym.type == Type.UNION) { // If it is not a UNION ALL, then we have to split because otherwise we can produce duplicates or // wrong results for UNION DISTINCT, EXCEPT, INTERSECT queries. - if (!qrym.needSplitChild && !qrym.unionAll) + if (!qrym.needSplitChild && (!qrym.unionAll || hasOffsetLimit(qrym.ast()))) qrym.needSplitChild = true; // If we have to split some child SELECT in this UNION, then we have to enforce split @@ -1151,6 +1152,14 @@ public class GridSqlQuerySplitter { } /** + * @param qry Query. + * @return {@code true} If we have OFFSET LIMIT. + */ + private static boolean hasOffsetLimit(GridSqlQuery qry) { + return qry.limit() != null || qry.offset() != null; + } + + /** * @param select Select to check. * @return {@code true} If we need to split this select. */ @@ -1158,6 +1167,9 @@ public class GridSqlQuerySplitter { if (select.distinct()) return true; + if (hasOffsetLimit(select)) + return true; + if (collocatedGrpBy) return false; @@ -1304,11 +1316,29 @@ public class GridSqlQuerySplitter { setupParameters(map, mapQry, params); map.columns(collectColumns(mapExps)); + map.sortColumns(mapQry.sort()); + map.partitioned(hasPartitionedTables(mapQry)); mapSqlQrys.add(map); } /** + * @param ast Map query AST. + * @return {@code true} If the given AST has partitioned tables. + */ + private static boolean hasPartitionedTables(GridSqlAst ast) { + if (ast instanceof GridSqlTable) + return ((GridSqlTable)ast).dataTable().isPartitioned(); + + for (int i = 0; i < ast.size(); i++) { + if (hasPartitionedTables(ast.child(i))) + return true; + } + + return false; + } + + /** * @param sqlQry Query. * @param qryAst Select AST. * @param params All parameters. @@ -1333,7 +1363,7 @@ public class GridSqlQuerySplitter { GridSqlType t = col.resultType(); if (t == null) - throw new NullPointerException("Column type."); + throw new NullPointerException("Column type: " + col); if (t == GridSqlType.UNKNOWN) throw new IllegalStateException("Unknown type: " + col); http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java index 8e8947f..d870ac5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlSortColumn.java @@ -17,6 +17,13 @@ package org.apache.ignite.internal.processors.query.h2.sql; +import java.util.List; +import org.apache.ignite.internal.util.typedef.F; +import org.h2.result.SortOrder; +import org.h2.table.Column; +import org.h2.table.IndexColumn; +import org.h2.table.Table; + /** * Sort order for ORDER BY clause. */ @@ -47,6 +54,40 @@ public class GridSqlSortColumn { } /** + * @param tbl Table. + * @param sortCols Sort columns. + * @return Index columns. + */ + public static IndexColumn[] toIndexColumns(Table tbl, List sortCols) { + assert !F.isEmpty(sortCols); + + IndexColumn[] res = new IndexColumn[sortCols.size()]; + + for (int i = 0; i < res.length; i++) { + GridSqlSortColumn sc = sortCols.get(i); + + Column col = tbl.getColumn(sc.column()); + + IndexColumn c = new IndexColumn(); + + c.column = col; + c.columnName = col.getName(); + + c.sortType = sc.asc ? SortOrder.ASCENDING : SortOrder.DESCENDING; + + if (sc.nullsFirst) + c.sortType |= SortOrder.NULLS_FIRST; + + if (sc.nullsLast) + c.sortType |= SortOrder.NULLS_LAST; + + res[i] = c; + } + + return res; + } + + /** * @return Column index. */ public int column() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index f002a5e..6416b21 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -459,8 +459,11 @@ public class GridMapQueryExecutor { req.isFlagSet(GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS)); final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER); + final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN); - for (int i = 1; i < mainCctx.config().getQueryParallelism(); i++) { + int segments = explain ? 1 : mainCctx.config().getQueryParallelism(); + + for (int i = 1; i < segments; i++) { final int segment = i; ctx.closure().callLocal( @@ -587,7 +590,6 @@ public class GridMapQueryExecutor { Connection conn = h2.connectionForSpace(mainCctx.name()); - // Here we enforce join order to have the same behavior on all the nodes. setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder); GridH2QueryContext.set(qctx); @@ -610,28 +612,34 @@ public class GridMapQueryExecutor { boolean evt = ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED); for (GridCacheSqlQuery qry : qrys) { - ResultSet rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(), - F.asList(qry.parameters()), true, - timeout, - qr.cancels[qryIdx]); - - if (evt) { - ctx.event().record(new CacheQueryExecutedEvent<>( - node, - "SQL query executed.", - EVT_CACHE_QUERY_EXECUTED, - CacheQueryType.SQL.name(), - mainCctx.namex(), - null, - qry.query(), - null, - null, - qry.parameters(), - node.id(), - null)); - } + ResultSet rs = null; + + // If we are not the target node for this replicated query, just ignore it. + if (qry.node() == null || + (segmentId == 0 && qry.node().equals(ctx.localNodeId()))) { + rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(), + F.asList(qry.parameters()), true, + timeout, + qr.cancels[qryIdx]); + + if (evt) { + ctx.event().record(new CacheQueryExecutedEvent<>( + node, + "SQL query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SQL.name(), + mainCctx.namex(), + null, + qry.query(), + null, + null, + qry.parameters(), + node.id(), + null)); + } - assert rs instanceof JdbcResultSet : rs.getClass(); + assert rs instanceof JdbcResultSet : rs.getClass(); + } qr.addResult(qryIdx, qry, node.id(), rs); @@ -751,6 +759,9 @@ public class GridMapQueryExecutor { assert res != null; + if (res.closed) + return; + int page = res.page; List rows = new ArrayList<>(Math.min(64, pageSize)); @@ -1081,21 +1092,31 @@ public class GridMapQueryExecutor { * @param qry Query. */ private QueryResult(ResultSet rs, GridCacheContext cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) { - this.rs = rs; this.cctx = cctx; this.qry = qry; this.qrySrcNodeId = qrySrcNodeId; this.cpNeeded = cctx.isLocalNode(qrySrcNodeId); - try { - res = (ResultInterface)RESULT_FIELD.get(rs); - } - catch (IllegalAccessException e) { - throw new IllegalStateException(e); // Must not happen. + if (rs != null) { + this.rs = rs; + try { + res = (ResultInterface)RESULT_FIELD.get(rs); + } + catch (IllegalAccessException e) { + throw new IllegalStateException(e); // Must not happen. + } + + rowCnt = res.getRowCount(); + cols = res.getVisibleColumnCount(); } + else { + this.rs = null; + this.res = null; + this.cols = -1; + this.rowCnt = -1; - rowCnt = res.getRowCount(); - cols = res.getVisibleColumnCount(); + closed = true; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 6a6e045..27622bb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -22,20 +22,22 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.RandomAccess; import java.util.Set; import java.util.UUID; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.cache.CacheException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.h2.engine.Session; import org.h2.index.BaseIndex; @@ -44,13 +46,10 @@ import org.h2.index.IndexType; import org.h2.message.DbException; import org.h2.result.Row; import org.h2.result.SearchRow; -import org.h2.result.SortOrder; import org.h2.table.IndexColumn; -import org.h2.table.TableFilter; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; -import static java.util.Collections.emptyIterator; import static java.util.Objects.requireNonNull; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE; @@ -67,6 +66,22 @@ public abstract class GridMergeIndex extends BaseIndex { private static final int PREFETCH_SIZE = getInteger(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE, 1024); /** */ + private static final AtomicReferenceFieldUpdater lastPagesUpdater = + AtomicReferenceFieldUpdater.newUpdater(GridMergeIndex.class, ConcurrentMap.class, "lastPages"); + + static { + if (!U.isPow2(PREFETCH_SIZE)) { + throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE + + ") must be positive and a power of 2."); + } + + if (PREFETCH_SIZE >= MAX_FETCH_SIZE) { + throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE + + ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ")."); + } + } + + /** */ protected final Comparator firstRowCmp = new Comparator() { @Override public int compare(SearchRow rowInList, SearchRow searchRow) { int res = compareRows(rowInList, searchRow); @@ -84,14 +99,11 @@ public abstract class GridMergeIndex extends BaseIndex { } }; - /** All rows number. */ - private final AtomicInteger expRowsCnt = new AtomicInteger(0); - - /** Remaining rows per source node ID. */ - private Map remainingRows; + /** Row source nodes. */ + private Set sources; /** */ - private final AtomicBoolean lastSubmitted = new AtomicBoolean(); + private int pageSize; /** * Will be r/w from query execution thread only, does not need to be threadsafe. @@ -107,6 +119,9 @@ public abstract class GridMergeIndex extends BaseIndex { /** */ private final GridKernalContext ctx; + /** */ + private volatile ConcurrentMap lastPages; + /** * @param ctx Context. * @param tbl Table. @@ -129,16 +144,6 @@ public abstract class GridMergeIndex extends BaseIndex { * @param ctx Context. */ protected GridMergeIndex(GridKernalContext ctx) { - if (!U.isPow2(PREFETCH_SIZE)) { - throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE + - ") must be positive and a power of 2."); - } - - if (PREFETCH_SIZE >= MAX_FETCH_SIZE) { - throw new IllegalArgumentException(IGNITE_SQL_MERGE_TABLE_PREFETCH_SIZE + " (" + PREFETCH_SIZE + - ") must be less than " + IGNITE_SQL_MERGE_TABLE_MAX_SIZE + " (" + MAX_FETCH_SIZE + ")."); - } - this.ctx = ctx; fetched = new BlockList<>(PREFETCH_SIZE); @@ -148,7 +153,7 @@ public abstract class GridMergeIndex extends BaseIndex { * @return Return source nodes for this merge index. */ public Set sources() { - return remainingRows.keySet(); + return sources; } /** @@ -169,17 +174,24 @@ public abstract class GridMergeIndex extends BaseIndex { * @return {@code true} If this index needs data from the given source node. */ public boolean hasSource(UUID nodeId) { - return remainingRows.containsKey(nodeId); + return sources.contains(nodeId); } /** {@inheritDoc} */ @Override public long getRowCount(Session ses) { - return expRowsCnt.get(); + Cursor c = find(ses, null, null); + + long cnt = 0; + + while (c.next()) + cnt++; + + return cnt; } /** {@inheritDoc} */ @Override public long getRowCountApproximation() { - return getRowCount(null); + return 10_000; } /** @@ -189,27 +201,28 @@ public abstract class GridMergeIndex extends BaseIndex { * @param segmentsCnt Index segments per table. */ public void setSources(Collection nodes, int segmentsCnt) { - assert remainingRows == null; + assert sources == null; - remainingRows = U.newHashMap(nodes.size()); + sources = new HashSet<>(); for (ClusterNode node : nodes) { - Counter[] counters = new Counter[segmentsCnt]; - - for (int i = 0; i < segmentsCnt; i++) - counters[i] = new Counter(); - - if (remainingRows.put(node.id(), counters) != null) - throw new IllegalStateException("Duplicate node id: " + node.id()); - + if (!sources.add(node.id())) + throw new IllegalStateException(); } } /** + * @param pageSize Page size. + */ + public void setPageSize(int pageSize) { + this.pageSize = pageSize; + } + + /** * @param queue Queue to poll. * @return Next page. */ - private GridResultPage takeNextPage(BlockingQueue queue) { + private GridResultPage takeNextPage(Pollable queue) { GridResultPage page; for (;;) { @@ -234,16 +247,17 @@ public abstract class GridMergeIndex extends BaseIndex { * @param iter Current iterator. * @return The same or new iterator. */ - protected final Iterator pollNextIterator(BlockingQueue queue, Iterator iter) { - while (!iter.hasNext()) { + protected final Iterator pollNextIterator(Pollable queue, Iterator iter) { + if (!iter.hasNext()) { GridResultPage page = takeNextPage(queue); - if (page.isLast()) - return emptyIterator(); // We are done. - - fetchNextPage(page); + if (!page.isLast()) + page.fetchNextPage(); // Failed will throw an exception here. iter = page.rows(); + + // The received iterator must be empty in the dummy last page or on failure. + assert iter.hasNext() || page.isDummyLast() || page.isFail(); } return iter; @@ -253,23 +267,18 @@ public abstract class GridMergeIndex extends BaseIndex { * @param e Error. */ public void fail(final CacheException e) { - for (UUID nodeId0 : remainingRows.keySet()) { - addPage0(new GridResultPage(null, nodeId0, null) { - @Override public boolean isFail() { - return true; - } - - @Override public void fetchNextPage() { - throw e; - } - }); - } + for (UUID nodeId : sources) + fail(nodeId, e); } /** * @param nodeId Node ID. + * @param e Exception. */ public void fail(UUID nodeId, final CacheException e) { + if (nodeId == null) + nodeId = F.first(sources); + addPage0(new GridResultPage(null, nodeId, null) { @Override public boolean isFail() { return true; @@ -285,91 +294,88 @@ public abstract class GridMergeIndex extends BaseIndex { } /** - * @param page Page. + * @param nodeId Node ID. + * @param res Response. */ - public final void addPage(GridResultPage page) { - int pageRowsCnt = page.rowsInPage(); + private void initLastPages(UUID nodeId, GridQueryNextPageResponse res) { + int allRows = res.allRows(); - Counter cnt = remainingRows.get(page.source())[page.res.segmentId()]; + // If the old protocol we send all rows number in the page 0, other pages have -1. + // In the new protocol we do not know it and always have -1, except terminating page, + // which has -2. Thus we have to init page counters only when we receive positive value + // in the first page. + if (allRows < 0 || res.page() != 0) + return; - // RemainingRowsCount should be updated before page adding to avoid race - // in GridMergeIndexUnsorted cursor iterator - int remainingRowsCount; + ConcurrentMap lp = lastPages; - int allRows = page.response().allRows(); + if (lp == null && !lastPagesUpdater.compareAndSet(this, null, lp = new ConcurrentHashMap<>())) + lp = lastPages; - if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter. - assert cnt.state == State.UNINITIALIZED : "Counter is already initialized."; + assert pageSize > 0: pageSize; - remainingRowsCount = cnt.addAndGet(allRows - pageRowsCnt); + int lastPage = allRows == 0 ? 0 : (allRows - 1) / pageSize; - expRowsCnt.addAndGet(allRows); + assert lastPage >= 0: lastPage; - // Add page before setting initialized flag to avoid race condition with adding last page - if (pageRowsCnt > 0) - addPage0(page); + if (lp.put(new SourceKey(nodeId, res.segmentId()), lastPage) != null) + throw new IllegalStateException(); + } - // We need this separate flag to handle case when the first source contains only one page - // and it will signal that all remaining counters are zero and fetch is finished. - cnt.state = State.INITIALIZED; - } - else { - remainingRowsCount = cnt.addAndGet(-pageRowsCnt); + /** + * @param page Page. + */ + private void markLastPage(GridResultPage page) { + GridQueryNextPageResponse res = page.response(); - if (pageRowsCnt > 0) - addPage0(page); - } + if (res.allRows() != -2) { // -2 means the last page. + UUID nodeId = page.source(); - if (remainingRowsCount == 0) { // Result can be negative in case of race between messages, it is ok. - if (cnt.state == State.UNINITIALIZED) - return; + initLastPages(nodeId, res); - // Guarantee that finished state possible only if counter is zero and all pages was added - cnt.state = State.FINISHED; + ConcurrentMap lp = lastPages; - for (Counter[] cntrs : remainingRows.values()) { // Check all the sources. - for(int i = 0; i < cntrs.length; i++) { - if (cntrs[i].state != State.FINISHED) - return; - } - } + if (lp == null) + return; // It was not initialized --> wait for -2. - if (lastSubmitted.compareAndSet(false, true)) { - addPage0(new GridResultPage(null, page.source(), null) { - @Override public boolean isLast() { - return true; - } - }); + Integer lastPage = lp.get(new SourceKey(nodeId, res.segmentId())); + + if (lastPage == null) + return; // This node may use the new protocol --> wait for -2. + + if (lastPage != res.page()) { + assert lastPage > res.page(); + + return; // This is not the last page. } } + + page.setLast(true); } /** * @param page Page. */ - protected abstract void addPage0(GridResultPage page); + public final void addPage(GridResultPage page) { + markLastPage(page); + addPage0(page); + } /** - * @param page Page. + * @param lastPage Real last page. + * @return Created dummy page. */ - protected void fetchNextPage(GridResultPage page) { - assert !page.isLast(); - - if(page.isFail()) - page.fetchNextPage(); //rethrow exceptions - - assert page.res != null; - - Counter[] counters = remainingRows.get(page.source()); + protected final GridResultPage createDummyLastPage(GridResultPage lastPage) { + assert !lastPage.isDummyLast(); // It must be a real last page. - int segId = page.res.segmentId(); - - Counter counter = counters[segId]; - - if (counter.get() != 0) - page.fetchNextPage(); + return new GridResultPage(ctx, lastPage.source(), null).setLast(true); } + /** + * @param page Page. + */ + protected abstract void addPage0(GridResultPage page); + /** {@inheritDoc} */ @Override public final Cursor find(Session ses, SearchRow first, SearchRow last) { checkBounds(lastEvictedRow, first, last); @@ -381,11 +387,9 @@ public abstract class GridMergeIndex extends BaseIndex { } /** - * @return {@code true} If we have fetched all the remote rows. + * @return {@code true} If we have fetched all the remote rows into a fetched list. */ - public boolean fetchedAll() { - return fetchedCnt == expRowsCnt.get(); - } + public abstract boolean fetchedAll(); /** * @param lastEvictedRow Last evicted fetched row. @@ -433,11 +437,6 @@ public abstract class GridMergeIndex extends BaseIndex { } /** {@inheritDoc} */ - @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) { - return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true); - } - - /** {@inheritDoc} */ @Override public void remove(Session ses) { throw DbException.getUnsupportedException("remove index"); } @@ -683,14 +682,6 @@ public abstract class GridMergeIndex extends BaseIndex { } /** - * Counter with initialization flag. - */ - private static class Counter extends AtomicInteger { - /** */ - volatile State state = State.UNINITIALIZED; - } - - /** */ private static final class BlockList extends AbstractList implements RandomAccess { /** */ @@ -766,4 +757,53 @@ public abstract class GridMergeIndex extends BaseIndex { return res; } } + + /** + * Pollable. + */ + protected static interface Pollable { + /** + * @param timeout Timeout. + * @param unit Time unit. + * @return Polled value or {@code null} if none. + * @throws InterruptedException If interrupted. + */ + E poll(long timeout, TimeUnit unit) throws InterruptedException; + } + + /** + */ + private static class SourceKey { + final UUID nodeId; + + /** */ + final int segment; + + /** + * @param nodeId Node ID. + * @param segment Segment. + */ + SourceKey(UUID nodeId, int segment) { + this.nodeId = nodeId; + this.segment = segment; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SourceKey sourceKey = (SourceKey)o; + + if (segment != sourceKey.segment) return false; + return nodeId.equals(sourceKey.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = nodeId.hashCode(); + result = 31 * result + segment; + return result; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java index 32c676d..361bb2d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexSorted.java @@ -25,18 +25,24 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.IndexType; import org.h2.result.Row; import org.h2.result.SearchRow; +import org.h2.result.SortOrder; import org.h2.table.IndexColumn; +import org.h2.table.TableFilter; import org.h2.value.Value; import org.jetbrains.annotations.Nullable; @@ -48,6 +54,9 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase */ public final class GridMergeIndexSorted extends GridMergeIndex { /** */ + private static final IndexType TYPE = IndexType.createNonUnique(false); + + /** */ private final Comparator streamCmp = new Comparator() { @Override public int compare(RowStream o1, RowStream o2) { // Nulls at the beginning. @@ -62,26 +71,33 @@ public final class GridMergeIndexSorted extends GridMergeIndex { }; /** */ - private Map streamsMap; + private Map streamsMap; /** */ - private RowStream[] streams; + private final Lock lock = new ReentrantLock(); + + /** */ + private final Condition notEmpty = lock.newCondition(); + + /** */ + private GridResultPage failPage; + + /** */ + private MergeStreamIterator it; /** * @param ctx Kernal context. * @param tbl Table. * @param name Index name, - * @param type Index type. * @param cols Columns. */ public GridMergeIndexSorted( GridKernalContext ctx, GridMergeTable tbl, String name, - IndexType type, IndexColumn[] cols ) { - super(ctx, tbl, name, type, cols); + super(ctx, tbl, name, TYPE, cols); } /** {@inheritDoc} */ @@ -89,33 +105,48 @@ public final class GridMergeIndexSorted extends GridMergeIndex { super.setSources(nodes, segmentsCnt); streamsMap = U.newHashMap(nodes.size()); - streams = new RowStream[nodes.size()]; + RowStream[] streams = new RowStream[nodes.size() * segmentsCnt]; int i = 0; for (ClusterNode node : nodes) { - RowStream stream = new RowStream(node.id()); + RowStream[] segments = new RowStream[segmentsCnt]; - streams[i] = stream; + for (int s = 0; s < segmentsCnt; s++) + streams[i++] = segments[s] = new RowStream(); - if (streamsMap.put(stream.src, stream) != null) + if (streamsMap.put(node.id(), segments) != null) throw new IllegalStateException(); } + + it = new MergeStreamIterator(streams); + } + + /** {@inheritDoc} */ + @Override public boolean fetchedAll() { + return it.fetchedAll(); } /** {@inheritDoc} */ @Override protected void addPage0(GridResultPage page) { - if (page.isLast() || page.isFail()) { - // Finish all the streams. - for (RowStream stream : streams) - stream.addPage(page); + if (page.isFail()) { + lock.lock(); + + try { + if (failPage == null) { + failPage = page; + + notEmpty.signalAll(); + } + } + finally { + lock.unlock(); + } } else { - assert page.rowsInPage() > 0; - UUID src = page.source(); - streamsMap.get(src).addPage(page); + streamsMap.get(src)[page.segmentId()].addPage(page); } } @@ -153,8 +184,13 @@ public final class GridMergeIndexSorted extends GridMergeIndex { } /** {@inheritDoc} */ + @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) { + return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, false); + } + + /** {@inheritDoc} */ @Override protected Cursor findInStream(@Nullable SearchRow first, @Nullable SearchRow last) { - return new FetchingCursor(first, last, new MergeStreamIterator()); + return new FetchingCursor(first, last, it); } /** @@ -165,17 +201,42 @@ public final class GridMergeIndexSorted extends GridMergeIndex { private boolean first = true; /** */ - private int off; + private volatile int off; /** */ private boolean hasNext; + /** */ + private final RowStream[] streams; + + /** + * @param streams Streams. + */ + MergeStreamIterator(RowStream[] streams) { + assert !F.isEmpty(streams); + + this.streams = streams; + } + + /** + * @return {@code true} If fetched all. + */ + private boolean fetchedAll() { + return off == streams.length; + } + /** * */ private void goFirst() { + assert first; + + first = false; + for (int i = 0; i < streams.length; i++) { - if (!streams[i].next()) { + RowStream s = streams[i]; + + if (!s.next()) { streams[i] = null; off++; // Move left bound. } @@ -183,8 +244,6 @@ public final class GridMergeIndexSorted extends GridMergeIndex { if (off < streams.length) Arrays.sort(streams, streamCmp); - - first = false; } /** @@ -229,31 +288,68 @@ public final class GridMergeIndexSorted extends GridMergeIndex { /** * Row stream. */ - private final class RowStream { - /** */ - final UUID src; - - /** */ - final BlockingQueue queue = new ArrayBlockingQueue<>(8); - + private final class RowStream implements Pollable { /** */ Iterator iter = emptyIterator(); /** */ Row cur; - /** - * @param src Source. - */ - private RowStream(UUID src) { - this.src = src; - } + /** */ + GridResultPage nextPage; /** * @param page Page. */ private void addPage(GridResultPage page) { - queue.offer(page); + assert !page.isFail(); + + if (page.isLast() && page.rowsInPage() == 0) + page = createDummyLastPage(page); // Terminate. + + lock.lock(); + + try { + // We can fetch the next page only when we have polled the previous one. + assert nextPage == null; + + nextPage = page; + + notEmpty.signalAll(); + } + finally { + lock.unlock(); + } + } + + /** {@inheritDoc} */ + @Override public GridResultPage poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + + lock.lock(); + + try { + for (;;) { + if (failPage != null) + return failPage; + + GridResultPage page = nextPage; + + if (page != null) { + // isLast && !isDummyLast + nextPage = page.isLast() && page.response() != null + ? createDummyLastPage(page) : null; // Terminate with empty iterator. + + return page; + } + + if ((nanos = notEmpty.awaitNanos(nanos)) <= 0) + return null; + } + } + finally { + lock.unlock(); + } } /** @@ -262,7 +358,7 @@ public final class GridMergeIndexSorted extends GridMergeIndex { private boolean next() { cur = null; - iter = pollNextIterator(queue, iter); + iter = pollNextIterator(this, iter); if (!iter.hasNext()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index b69c898..430a687 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -17,19 +17,24 @@ package org.apache.ignite.internal.processors.query.h2.twostep; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowFactory; +import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.IndexType; import org.h2.result.Row; import org.h2.result.SearchRow; +import org.h2.result.SortOrder; import org.h2.table.IndexColumn; +import org.h2.table.TableFilter; import org.h2.value.Value; /** @@ -37,7 +42,16 @@ import org.h2.value.Value; */ public final class GridMergeIndexUnsorted extends GridMergeIndex { /** */ - private final BlockingQueue queue = new LinkedBlockingQueue<>(); + private static final IndexType TYPE = IndexType.createScan(false); + + /** */ + private final PollableQueue queue = new PollableQueue<>(); + + /** */ + private final AtomicInteger activeSources = new AtomicInteger(-1); + + /** */ + private Iterator iter = Collections.emptyIterator(); /** * @param ctx Context. @@ -45,7 +59,7 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex { * @param name Index name. */ public GridMergeIndexUnsorted(GridKernalContext ctx, GridMergeTable tbl, String name) { - super(ctx, tbl, name, IndexType.createScan(false), IndexColumn.wrap(tbl.getColumns())); + super(ctx, tbl, name, TYPE, IndexColumn.wrap(tbl.getColumns())); } /** @@ -64,10 +78,46 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex { } /** {@inheritDoc} */ + @Override public void setSources(Collection nodes, int segmentsCnt) { + super.setSources(nodes, segmentsCnt); + + int x = nodes.size() * segmentsCnt; + + assert x > 0: x; + + activeSources.set(x); + } + + /** {@inheritDoc} */ + @Override public boolean fetchedAll() { + int x = activeSources.get(); + + assert x >= 0: x; // This method must not be called if the sources were not set. + + return x == 0 && queue.isEmpty(); + } + + /** {@inheritDoc} */ @Override protected void addPage0(GridResultPage page) { assert page.rowsInPage() > 0 || page.isLast() || page.isFail(); - queue.add(page); + // Do not add empty page to avoid premature stream termination. + if (page.rowsInPage() != 0 || page.isFail()) + queue.add(page); + + if (page.isLast()) { + int x = activeSources.decrementAndGet(); + + assert x >= 0: x; + + if (x == 0) // Always terminate with empty iterator. + queue.add(createDummyLastPage(page)); + } + } + + /** {@inheritDoc} */ + @Override public double getCost(Session ses, int[] masks, TableFilter[] filters, int filter, SortOrder sortOrder) { + return getCostRangeIndex(masks, getRowCountApproximation(), filters, filter, sortOrder, true); } /** {@inheritDoc} */ @@ -80,9 +130,6 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex { @Override protected Cursor findInStream(SearchRow first, SearchRow last) { // This index is unsorted: have to ignore bounds. return new FetchingCursor(null, null, new Iterator() { - /** */ - Iterator iter = Collections.emptyIterator(); - @Override public boolean hasNext() { iter = pollNextIterator(queue, iter); @@ -98,4 +145,10 @@ public final class GridMergeIndexUnsorted extends GridMergeIndex { } }); } + + /** + */ + private static class PollableQueue extends LinkedBlockingQueue implements Pollable { + // No-op. + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8817190e/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java index 1489021..f7495c0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java @@ -18,35 +18,55 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import java.util.ArrayList; -import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2ScanIndex; +import org.apache.ignite.internal.util.typedef.F; import org.h2.command.ddl.CreateTableData; import org.h2.engine.Session; import org.h2.index.Index; import org.h2.index.IndexType; import org.h2.message.DbException; import org.h2.result.Row; +import org.h2.result.SortOrder; import org.h2.table.IndexColumn; import org.h2.table.TableBase; +import org.h2.table.TableFilter; /** * Merge table for distributed queries. */ public class GridMergeTable extends TableBase { /** */ - private final GridKernalContext ctx; - - /** */ - private final GridMergeIndex idx; + private ArrayList idxs; /** * @param data Data. - * @param ctx Kernal context. */ - public GridMergeTable(CreateTableData data, GridKernalContext ctx) { + public GridMergeTable(CreateTableData data) { super(data); + } + + /** + * @param idxs Indexes. + */ + public void indexes(ArrayList idxs) { + assert !F.isEmpty(idxs); + + this.idxs = idxs; + } - this.ctx = ctx; - idx = new GridMergeIndexUnsorted(ctx, this, "merge_scan"); + /** + * @return Merge index. + */ + public GridMergeIndex getMergeIndex() { + return (GridMergeIndex)idxs.get(idxs.size() - 1); // Sorted index must be the last. + } + + /** + * @param idx Index. + * @return Scan index. + */ + public static GridH2ScanIndex createScanIndex(GridMergeIndex idx) { + return new ScanIndex(idx); } /** {@inheritDoc} */ @@ -56,7 +76,7 @@ public class GridMergeTable extends TableBase { /** {@inheritDoc} */ @Override public void close(Session ses) { - idx.close(ses); + // No-op. } /** {@inheritDoc} */ @@ -96,8 +116,8 @@ public class GridMergeTable extends TableBase { } /** {@inheritDoc} */ - @Override public GridMergeIndex getScanIndex(Session session) { - return idx; + @Override public Index getScanIndex(Session session) { + return idxs.get(0); // Must be always at 0. } /** {@inheritDoc} */ @@ -107,7 +127,7 @@ public class GridMergeTable extends TableBase { /** {@inheritDoc} */ @Override public ArrayList getIndexes() { - return null; + return idxs; } /** {@inheritDoc} */ @@ -137,12 +157,12 @@ public class GridMergeTable extends TableBase { /** {@inheritDoc} */ @Override public long getRowCount(Session ses) { - return idx.getRowCount(ses); + return getScanIndex(ses).getRowCount(ses); } /** {@inheritDoc} */ @Override public long getRowCountApproximation() { - return idx.getRowCountApproximation(); + return getScanIndex(null).getRowCountApproximation(); } /** {@inheritDoc} */ @@ -154,4 +174,24 @@ public class GridMergeTable extends TableBase { @Override public void checkRename() { throw DbException.getUnsupportedException("rename"); } + + /** + * Scan index wrapper. + */ + private static class ScanIndex extends GridH2ScanIndex { + /** + * @param delegate Delegate. + */ + public ScanIndex(GridMergeIndex delegate) { + super(delegate); + } + + /** {@inheritDoc} */ + @Override public double getCost(Session session, int[] masks, TableFilter[] filters, int filter, + SortOrder sortOrder) { + long rows = getRowCountApproximation(); + + return getCostRangeIndex(masks, rows, filters, filter, sortOrder, true); + } + } } \ No newline at end of file