Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EF0EB200C80 for ; Wed, 19 Apr 2017 10:45:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EDB9C160B86; Wed, 19 Apr 2017 08:45:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 96975160BBE for ; Wed, 19 Apr 2017 10:45:52 +0200 (CEST) Received: (qmail 56181 invoked by uid 500); 19 Apr 2017 08:45:51 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 54110 invoked by uid 99); 19 Apr 2017 08:45:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Apr 2017 08:45:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 53E81E180B; Wed, 19 Apr 2017 08:45:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Wed, 19 Apr 2017 08:46:26 -0000 Message-Id: <609772c47a434acbb05a821553190221@git.apache.org> In-Reply-To: <083dac2d42b24ab28afe70316095cf17@git.apache.org> References: <083dac2d42b24ab28afe70316095cf17@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [45/50] [abbrv] ignite git commit: IGNITE-4565: Implemented CREATE INDEX and DROP INDEX. This closes #1773. This closes #1804. archived-at: Wed, 19 Apr 2017 08:45:55 -0000 IGNITE-4565: Implemented CREATE INDEX and DROP INDEX. This closes #1773. This closes #1804. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2edb935c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2edb935c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2edb935c Branch: refs/heads/ignite-4929 Commit: 2edb935cbf87198993c403724e26efc655710c25 Parents: 9e7421f Author: devozerov Authored: Tue Apr 18 17:11:34 2017 +0300 Committer: devozerov Committed: Tue Apr 18 17:11:36 2017 +0300 ---------------------------------------------------------------------- .../jdbc2/JdbcAbstractDmlStatementSelfTest.java | 92 +- ...BinaryMarshallerInsertStatementSelfTest.java | 9 +- ...cBinaryMarshallerMergeStatementSelfTest.java | 9 +- .../jdbc2/JdbcDynamicIndexAbstractSelfTest.java | 367 ++++ ...namicIndexAtomicPartitionedNearSelfTest.java | 26 + ...bcDynamicIndexAtomicPartitionedSelfTest.java | 39 + ...dbcDynamicIndexAtomicReplicatedSelfTest.java | 39 + ...dexTransactionalPartitionedNearSelfTest.java | 26 + ...icIndexTransactionalPartitionedSelfTest.java | 39 + ...micIndexTransactionalReplicatedSelfTest.java | 39 + .../jdbc2/JdbcInsertStatementSelfTest.java | 4 +- .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 8 + .../apache/ignite/IgniteSystemProperties.java | 8 + .../org/apache/ignite/cache/QueryEntity.java | 82 +- .../org/apache/ignite/cache/QueryIndex.java | 9 + .../configuration/CacheConfiguration.java | 7 +- .../apache/ignite/internal/GridComponent.java | 5 +- .../ignite/internal/GridKernalContext.java | 7 + .../ignite/internal/GridKernalContextImpl.java | 13 +- .../org/apache/ignite/internal/GridTopic.java | 5 +- .../apache/ignite/internal/IgniteKernal.java | 27 +- .../org/apache/ignite/internal/IgnitionEx.java | 18 + .../managers/communication/GridIoManager.java | 2 + .../communication/GridIoMessageFactory.java | 6 + .../managers/communication/GridIoPolicy.java | 3 + .../cache/CacheAffinitySharedManager.java | 3 +- .../cache/CachePartitionExchangeWorkerTask.java | 6 +- .../cache/DynamicCacheChangeRequest.java | 23 +- .../cache/DynamicCacheDescriptor.java | 50 +- .../processors/cache/GridCacheEntryEx.java | 13 +- .../processors/cache/GridCacheMapEntry.java | 17 + .../GridCachePartitionExchangeManager.java | 36 +- .../processors/cache/GridCacheProcessor.java | 135 +- .../cache/GridCacheSharedContext.java | 2 + .../GridDhtPartitionsExchangeFuture.java | 5 - .../cache/query/GridCacheQueryManager.java | 7 +- .../cache/query/IgniteQueryErrorCode.java | 27 +- .../internal/processors/pool/PoolProcessor.java | 5 + .../query/GridQueryIndexDescriptor.java | 5 + .../processors/query/GridQueryIndexing.java | 43 +- .../processors/query/GridQueryProcessor.java | 1707 ++++++++++++++++-- .../query/GridQueryTypeDescriptor.java | 7 + .../processors/query/IgniteSQLException.java | 7 + .../query/QueryIndexDescriptorImpl.java | 42 +- .../processors/query/QueryIndexKey.java | 85 + .../internal/processors/query/QuerySchema.java | 168 ++ .../query/QueryTypeDescriptorImpl.java | 150 +- .../internal/processors/query/QueryUtils.java | 219 ++- .../query/schema/SchemaExchangeWorkerTask.java | 53 + .../query/schema/SchemaIndexCacheVisitor.java | 33 + .../schema/SchemaIndexCacheVisitorClosure.java | 42 + .../schema/SchemaIndexCacheVisitorImpl.java | 197 ++ .../SchemaIndexOperationCancellationToken.java | 53 + .../processors/query/schema/SchemaKey.java | 59 + .../SchemaNodeLeaveExchangeWorkerTask.java | 53 + .../schema/SchemaOperationClientFuture.java | 52 + .../query/schema/SchemaOperationException.java | 138 ++ .../query/schema/SchemaOperationManager.java | 292 +++ .../query/schema/SchemaOperationWorker.java | 205 +++ .../message/SchemaAbstractDiscoveryMessage.java | 70 + .../message/SchemaFinishDiscoveryMessage.java | 98 + .../message/SchemaOperationStatusMessage.java | 168 ++ .../message/SchemaProposeDiscoveryMessage.java | 133 ++ .../operation/SchemaAbstractOperation.java | 67 + .../operation/SchemaIndexAbstractOperation.java | 40 + .../operation/SchemaIndexCreateOperation.java | 91 + .../operation/SchemaIndexDropOperation.java | 68 + .../processors/cache/GridCacheTestEntryEx.java | 7 + .../junits/GridTestKernalContext.java | 1 + .../query/h2/GridH2IndexingGeoSelfTest.java | 4 +- .../query/h2/DmlStatementsProcessor.java | 49 +- .../processors/query/h2/IgniteH2Indexing.java | 678 +++++-- .../query/h2/ddl/DdlStatementsProcessor.java | 208 +++ .../query/h2/opt/GridH2IndexBase.java | 2 +- .../query/h2/opt/GridH2PrimaryScanIndex.java | 87 + .../query/h2/opt/GridH2SystemIndexFactory.java | 38 + .../processors/query/h2/opt/GridH2Table.java | 382 ++-- .../query/h2/opt/GridLuceneIndex.java | 10 +- .../query/h2/sql/GridSqlCreateIndex.java | 121 ++ .../query/h2/sql/GridSqlDropIndex.java | 82 + .../query/h2/sql/GridSqlQueryParser.java | 123 ++ .../cache/index/AbstractSchemaSelfTest.java | 512 ++++++ .../DynamicIndexAbstractBasicSelfTest.java | 950 ++++++++++ .../DynamicIndexAbstractConcurrentSelfTest.java | 921 ++++++++++ .../index/DynamicIndexAbstractSelfTest.java | 467 +++++ .../index/DynamicIndexClientBasicSelfTest.java | 28 + ...ndexPartitionedAtomicConcurrentSelfTest.java | 33 + ...titionedTransactionalConcurrentSelfTest.java | 33 + ...IndexReplicatedAtomicConcurrentSelfTest.java | 33 + ...plicatedTransactionalConcurrentSelfTest.java | 33 + .../index/DynamicIndexServerBasicSelfTest.java | 28 + ...amicIndexServerCoordinatorBasicSelfTest.java | 28 + ...namicIndexServerNodeFIlterBasicSelfTest.java | 28 + ...erverNodeFilterCoordinatorBasicSelfTest.java | 30 + .../index/H2DynamicIndexAbstractSelfTest.java | 400 ++++ ...namicIndexAtomicPartitionedNearSelfTest.java | 26 + ...H2DynamicIndexAtomicPartitionedSelfTest.java | 39 + .../H2DynamicIndexAtomicReplicatedSelfTest.java | 39 + ...dexTransactionalPartitionedNearSelfTest.java | 26 + ...icIndexTransactionalPartitionedSelfTest.java | 39 + ...micIndexTransactionalReplicatedSelfTest.java | 39 + .../cache/index/SchemaExchangeSelfTest.java | 589 ++++++ .../local/IgniteCacheLocalQuerySelfTest.java | 2 +- .../query/IgniteQueryDedicatedPoolTest.java | 1 - .../query/IgniteSqlSplitterSelfTest.java | 2 +- .../h2/GridIndexingSpiAbstractSelfTest.java | 109 +- .../query/h2/IgniteSqlQueryMinMaxTest.java | 16 +- .../query/h2/opt/GridH2TableSelfTest.java | 171 +- .../query/h2/sql/GridQueryParsingTest.java | 212 ++- .../IgniteCacheQuerySelfTestSuite.java | 37 +- .../IgniteCacheQuerySelfTestSuite2.java | 11 + 111 files changed, 11221 insertions(+), 1016 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java index 440f6d0..f23dde7 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcAbstractDmlStatementSelfTest.java @@ -21,17 +21,10 @@ import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.util.Collections; -import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.ConnectorConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX; @@ -42,9 +35,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; * Statement test. */ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - /** JDBC URL. */ private static final String BASE_URL = CFG_URL_PREFIX + "modules/clients/src/test/config/jdbc-config.xml"; @@ -58,18 +48,28 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac protected Connection conn; /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return getConfiguration0(igniteInstanceName); + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + Class.forName("org.apache.ignite.IgniteJdbcDriver"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + conn = DriverManager.getConnection(getCfgUrl()); + + ignite(0).getOrCreateCache(cacheConfig()); } /** - * @param igniteInstanceName Ignite instance name. - * @return Grid configuration used for starting the grid. - * @throws Exception If failed. + * @return Cache configuration for non binary marshaller tests. */ - private IgniteConfiguration getConfiguration0(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - + private CacheConfiguration nonBinCacheConfig() { CacheConfiguration cache = defaultCacheConfiguration(); cache.setCacheMode(PARTITIONED); @@ -79,32 +79,18 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac String.class, Person.class ); - cfg.setCacheConfiguration(cache); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(disco); - - cfg.setConnectorConfiguration(new ConnectorConfiguration()); - - return cfg; + return cache; } /** - * @param igniteInstanceName Ignite instance name. - * @return Grid configuration used for starting the grid ready for manipulating binary objects. - * @throws Exception If failed. + * @return Cache configuration for binary marshaller tests. */ - IgniteConfiguration getBinaryConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = getConfiguration0(igniteInstanceName); - - cfg.setMarshaller(new BinaryMarshaller()); - - CacheConfiguration ccfg = cfg.getCacheConfiguration()[0]; + final CacheConfiguration binaryCacheConfig() { + CacheConfiguration cache = defaultCacheConfiguration(); - ccfg.getQueryEntities().clear(); + cache.setCacheMode(PARTITIONED); + cache.setBackups(1); + cache.setWriteSynchronizationMode(FULL_SYNC); QueryEntity e = new QueryEntity(); @@ -116,26 +102,16 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac e.addQueryField("firstName", String.class.getName(), null); e.addQueryField("lastName", String.class.getName(), null); - ccfg.setQueryEntities(Collections.singletonList(e)); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGridsMultiThreaded(3); + cache.setQueryEntities(Collections.singletonList(e)); - Class.forName("org.apache.ignite.IgniteJdbcDriver"); + return cache; } - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - conn = DriverManager.getConnection(getCfgUrl()); + /** + * @return Configuration of cache to create. + */ + CacheConfiguration cacheConfig() { + return nonBinCacheConfig(); } /** @@ -147,9 +123,7 @@ public abstract class JdbcAbstractDmlStatementSelfTest extends GridCommonAbstrac /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - grid(0).cache(null).clear(); - - assertEquals(0, grid(0).cache(null).size(CachePeekMode.ALL)); + grid(0).destroyCache(null); conn.close(); assertTrue(conn.isClosed()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerInsertStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerInsertStatementSelfTest.java index e8a09d9..878e4de 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerInsertStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerInsertStatementSelfTest.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.jdbc2; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; /** * JDBC test of INSERT statement w/binary marshaller - no nodes know about classes. @@ -30,6 +32,11 @@ public class JdbcBinaryMarshallerInsertStatementSelfTest extends JdbcInsertState /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return getBinaryConfiguration(igniteInstanceName); + return super.getConfiguration(igniteInstanceName).setMarshaller(new BinaryMarshaller()); + } + + /** {@inheritDoc} */ + @Override CacheConfiguration cacheConfig() { + return binaryCacheConfig(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerMergeStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerMergeStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerMergeStatementSelfTest.java index 5e4b559..8b4d3c7 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerMergeStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBinaryMarshallerMergeStatementSelfTest.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.jdbc2; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; /** * JDBC test of MERGE statement w/binary marshaller - no nodes know about classes. @@ -30,6 +32,11 @@ public class JdbcBinaryMarshallerMergeStatementSelfTest extends JdbcMergeStateme /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - return getBinaryConfiguration(igniteInstanceName); + return super.getConfiguration(igniteInstanceName).setMarshaller(new BinaryMarshaller()); + } + + /** {@inheritDoc} */ + @Override CacheConfiguration cacheConfig() { + return binaryCacheConfig(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java new file mode 100644 index 0000000..84ffc28 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAbstractSelfTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.List; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.util.typedef.F; + +/** + * Test that checks indexes handling with JDBC. + */ +public abstract class JdbcDynamicIndexAbstractSelfTest extends JdbcAbstractDmlStatementSelfTest { + /** */ + private final static String CREATE_INDEX = "create index idx on Person (id desc)"; + + /** */ + private final static String DROP_INDEX = "drop index idx"; + + /** */ + private final static String CREATE_INDEX_IF_NOT_EXISTS = "create index if not exists idx on Person (id desc)"; + + /** */ + private final static String DROP_INDEX_IF_EXISTS = "drop index idx if exists"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + try (PreparedStatement ps = + conn.prepareStatement("INSERT INTO Person (_key, id, age, firstName, lastName) values (?, ?, ?, ?, ?)")) { + + ps.setString(1, "j"); + ps.setInt(2, 1); + ps.setInt(3, 10); + ps.setString(4, "John"); + ps.setString(5, "Smith"); + ps.executeUpdate(); + + ps.setString(1, "m"); + ps.setInt(2, 2); + ps.setInt(3, 20); + ps.setString(4, "Mark"); + ps.setString(5, "Stone"); + ps.executeUpdate(); + + ps.setString(1, "s"); + ps.setInt(2, 3); + ps.setInt(3, 30); + ps.setString(4, "Sarah"); + ps.setString(5, "Pazzi"); + ps.executeUpdate(); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override CacheConfiguration cacheConfig() { + CacheConfiguration ccfg = super.cacheConfig(); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + ccfg.setCacheMode(cacheMode()); + ccfg.setAtomicityMode(atomicityMode()); + + if (nearCache()) + ccfg.setNearConfiguration(new NearCacheConfiguration()); + + return ccfg; + } + + /** + * @return Cache mode to use. + */ + protected abstract CacheMode cacheMode(); + + /** + * @return Cache atomicity mode to use. + */ + protected abstract CacheAtomicityMode atomicityMode(); + + /** + * @return Whether to use near cache. + */ + protected abstract boolean nearCache(); + + /** + * Execute given SQL statement. + * @param sql Statement. + * @throws SQLException if failed. + */ + private void jdbcRun(String sql) throws SQLException { + try (Statement stmt = conn.createStatement()) { + stmt.execute(sql); + } + } + + /** */ + private Object getSingleValue(ResultSet rs) throws SQLException { + assertEquals(1, rs.getMetaData().getColumnCount()); + + assertTrue(rs.next()); + + Object res = rs.getObject(1); + + assertTrue(rs.isLast()); + + return res; + } + + /** + * Test that after index creation index is used by queries. + */ + public void testCreateIndex() throws SQLException { + assertSize(3); + + assertColumnValues(30, 20, 10); + + jdbcRun(CREATE_INDEX); + + // Test that local queries on all server nodes use new index. + for (int i = 0 ; i < 3; i++) { + List> locRes = ignite(i).cache(null).query(new SqlFieldsQuery("explain select id from " + + "Person where id = 5").setLocal(true)).getAll(); + + assertEquals(F.asList( + Collections.singletonList("SELECT\n" + + " ID\n" + + "FROM \"\".PERSON\n" + + " /* \"\".IDX: ID = 5 */\n" + + "WHERE ID = 5") + ), locRes); + } + + assertSize(3); + + assertColumnValues(30, 20, 10); + } + + /** + * Test that creating an index with duplicate name yields an error. + */ + public void testCreateIndexWithDuplicateName() throws SQLException { + jdbcRun(CREATE_INDEX); + + assertSqlException(new RunnableX() { + /** {@inheritDoc} */ + @Override public void run() throws Exception { + jdbcRun(CREATE_INDEX); + } + }, IgniteQueryErrorCode.INDEX_ALREADY_EXISTS); + } + + /** + * Test that creating an index with duplicate name does not yield an error with {@code IF NOT EXISTS}. + */ + public void testCreateIndexIfNotExists() throws SQLException { + jdbcRun(CREATE_INDEX); + + // Despite duplicate name, this does not yield an error. + jdbcRun(CREATE_INDEX_IF_NOT_EXISTS); + } + + /** + * Test that after index drop there are no attempts to use it, and data state remains intact. + */ + public void testDropIndex() throws SQLException { + assertSize(3); + + jdbcRun(CREATE_INDEX); + + assertSize(3); + + jdbcRun(DROP_INDEX); + + // Test that no local queries on server nodes use new index. + for (int i = 0 ; i < 3; i++) { + List> locRes = ignite(i).cache(null).query(new SqlFieldsQuery("explain select id from " + + "Person where id = 5").setLocal(true)).getAll(); + + assertEquals(F.asList( + Collections.singletonList("SELECT\n" + + " ID\n" + + "FROM \"\".PERSON\n" + + " /* \"\".PERSON.__SCAN_ */\n" + + "WHERE ID = 5") + ), locRes); + } + + assertSize(3); + } + + /** + * Test that dropping a non-existent index yields an error. + */ + public void testDropMissingIndex() { + assertSqlException(new RunnableX() { + /** {@inheritDoc} */ + @Override public void run() throws Exception { + jdbcRun(DROP_INDEX); + } + }, IgniteQueryErrorCode.INDEX_NOT_FOUND); + } + + /** + * Test that dropping a non-existent index does not yield an error with {@code IF EXISTS}. + */ + public void testDropMissingIndexIfExists() throws SQLException { + // Despite index missing, this does not yield an error. + jdbcRun(DROP_INDEX_IF_EXISTS); + } + + /** + * Test that changes in cache affect index, and vice versa. + */ + public void testIndexState() throws SQLException { + IgniteCache cache = cache(); + + assertSize(3); + + assertColumnValues(30, 20, 10); + + jdbcRun(CREATE_INDEX); + + assertSize(3); + + assertColumnValues(30, 20, 10); + + cache.remove("m"); + + assertColumnValues(30, 10); + + cache.put("a", new Person(4, "someVal", "a", 5)); + + assertColumnValues(5, 30, 10); + + jdbcRun(DROP_INDEX); + + assertColumnValues(5, 30, 10); + } + + /** + * Check that values of {@code field1} match what we expect. + * @param vals Expected values. + */ + private void assertColumnValues(int... vals) throws SQLException { + try (Statement stmt = conn.createStatement()) { + try (ResultSet rs = stmt.executeQuery("SELECT age FROM Person ORDER BY id desc")) { + assertEquals(1, rs.getMetaData().getColumnCount()); + + for (int i = 0; i < vals.length; i++) { + assertTrue("Result set must have " + vals.length + " rows, got " + i, rs.next()); + + assertEquals(vals[i], rs.getInt(1)); + } + + assertFalse("Result set must have exactly " + vals.length + " rows", rs.next()); + } + } + } + + /** + * Do a {@code SELECT COUNT(*)} query to check index state correctness. + * @param expSize Expected number of items in table. + */ + private void assertSize(long expSize) throws SQLException { + assertEquals(expSize, cache().size()); + + try (Statement stmt = conn.createStatement()) { + try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) from Person")) { + assertEquals(expSize, getSingleValue(rs)); + } + } + } + + /** + * @return Cache. + */ + private IgniteCache cache() { + return grid(0).cache(null); + } + + /** + * Ensure that SQL exception is thrown. + * + * @param r Runnable. + * @param expCode Error code. + */ + private static void assertSqlException(RunnableX r, int expCode) { + // We expect IgniteSQLException with given code inside CacheException inside JDBC SQLException. + + try { + r.run(); + } + catch (SQLException ex) { + if (ex.getCause() != null) { + try { + throw ex.getCause(); + } + catch (CacheException ex1) { + if (ex1.getCause() != null) { + try { + throw ex1.getCause(); + } + catch (IgniteSQLException e) { + assertEquals("Unexpected error code [expected=" + expCode + ", actual=" + e.statusCode() + ']', + expCode, e.statusCode()); + + return; + } + catch (Throwable t) { + fail("Unexpected exception: " + t); + } + } + } + catch (Throwable t) { + fail("Unexpected exception: " + t); + } + } + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + + fail(IgniteSQLException.class.getSimpleName() + " is not thrown."); + } + + /** + * Runnable which can throw checked exceptions. + */ + private interface RunnableX { + /** + * Do run. + * + * @throws Exception If failed. + */ + public void run() throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedNearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedNearSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedNearSelfTest.java new file mode 100644 index 0000000..c2b5011 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedNearSelfTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +/** */ +public class JdbcDynamicIndexAtomicPartitionedNearSelfTest extends JdbcDynamicIndexAtomicPartitionedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedSelfTest.java new file mode 100644 index 0000000..41e07e7 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicPartitionedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** */ +public class JdbcDynamicIndexAtomicPartitionedSelfTest extends JdbcDynamicIndexAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicReplicatedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicReplicatedSelfTest.java new file mode 100644 index 0000000..7a5b015 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexAtomicReplicatedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** */ +public class JdbcDynamicIndexAtomicReplicatedSelfTest extends JdbcDynamicIndexAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedNearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedNearSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedNearSelfTest.java new file mode 100644 index 0000000..2815dff --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedNearSelfTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +/** */ +public class JdbcDynamicIndexTransactionalPartitionedNearSelfTest extends JdbcDynamicIndexTransactionalPartitionedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedSelfTest.java new file mode 100644 index 0000000..47b257f --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalPartitionedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** */ +public class JdbcDynamicIndexTransactionalPartitionedSelfTest extends JdbcDynamicIndexAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalReplicatedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalReplicatedSelfTest.java new file mode 100644 index 0000000..9b135d8 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDynamicIndexTransactionalReplicatedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** */ +public class JdbcDynamicIndexTransactionalReplicatedSelfTest extends JdbcDynamicIndexAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java index 1bd6d34..9e01bc7 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java @@ -24,8 +24,8 @@ import java.sql.Statement; import java.util.Arrays; import java.util.HashSet; import java.util.concurrent.Callable; -import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.testframework.GridTestUtils; /** @@ -164,7 +164,7 @@ public class JdbcInsertStatementSelfTest extends JdbcAbstractDmlStatementSelfTes assertNotNull(reason); - assertEquals(IgniteException.class, reason.getClass()); + assertEquals(IgniteSQLException.class, reason.getClass()); assertEquals("Failed to INSERT some keys because they are already in cache [keys=[p2]]", reason.getMessage()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index 2489de9..75671de 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -73,6 +73,14 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDeleteStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class)); + // DDL tests. + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedNearSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicReplicatedSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalPartitionedNearSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalPartitionedSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalReplicatedSelfTest.class)); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 1216db8..37e8c6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -575,6 +575,14 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_MAX_INDEX_PAYLOAD_SIZE = "IGNITE_MAX_INDEX_PAYLOAD_SIZE"; + /** + * Indexing discovery history size. Protects from duplicate messages maintaining the list of IDs of recently + * arrived discovery messages. + *

+ * Defaults to {@code 1000}. + */ + public static final String IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE = "IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE"; + /** Returns true for system properties only avoiding sending sensitive information. */ private static final IgnitePredicate> PROPS_FILTER = new IgnitePredicate>() { @Override public boolean apply(final Map.Entry entry) { http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java index 9f4313e..31fe264 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryEntity.java @@ -20,9 +20,12 @@ package org.apache.ignite.cache; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; + +import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; @@ -63,6 +66,24 @@ public class QueryEntity implements Serializable { } /** + * Copy constructor. + * + * @param other Other entity. + */ + public QueryEntity(QueryEntity other) { + keyType = other.keyType; + valType = other.valType; + + fields = new LinkedHashMap<>(other.fields); + keyFields = other.keyFields != null ? new HashSet<>(other.keyFields) : null; + + aliases = new HashMap<>(other.aliases); + idxs = new HashMap<>(other.idxs); + + tableName = other.tableName; + } + + /** * Creates a query entity with the given key and value types. * * @param keyType Key type. @@ -204,7 +225,7 @@ public class QueryEntity implements Serializable { for (QueryIndex idx : idxs) { if (!F.isEmpty(idx.getFields())) { if (idx.getName() == null) - idx.setName(defaultIndexName(idx)); + idx.setName(QueryUtils.indexName(this, idx)); if (idx.getIndexType() == null) throw new IllegalArgumentException("Index type is not set " + idx.getName()); @@ -220,6 +241,13 @@ public class QueryEntity implements Serializable { } /** + * Clear indexes. + */ + public void clearIndexes() { + this.idxs.clear(); + } + + /** * Gets table name for this query entity. * * @return table name @@ -254,56 +282,4 @@ public class QueryEntity implements Serializable { return this; } - - /** - * Ensures that index with the given name exists. - * - * @param idxName Index name. - * @param idxType Index type. - */ - public void ensureIndex(String idxName, QueryIndexType idxType) { - QueryIndex idx = idxs.get(idxName); - - if (idx == null) { - idx = new QueryIndex(); - - idx.setName(idxName); - idx.setIndexType(idxType); - - idxs.put(idxName, idx); - } - else - throw new IllegalArgumentException("An index with the same name and of a different type already exists " + - "[idxName=" + idxName + ", existingIdxType=" + idx.getIndexType() + ", newIdxType=" + idxType + ']'); - } - - /** - * Generates default index name by concatenating all index field names. - * - * @param idx Index to build name for. - * @return Index name. - */ - public static String defaultIndexName(QueryIndex idx) { - StringBuilder idxName = new StringBuilder(); - - for (Map.Entry field : idx.getFields().entrySet()) { - idxName.append(field.getKey()); - - idxName.append('_'); - idxName.append(field.getValue() ? "asc_" : "desc_"); - } - - for (int i = 0; i < idxName.length(); i++) { - char ch = idxName.charAt(i); - - if (Character.isWhitespace(ch)) - idxName.setCharAt(i, '_'); - else - idxName.setCharAt(i, Character.toLowerCase(ch)); - } - - idxName.append("idx"); - - return idxName.toString(); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java index 750d3e1..555a006 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/QueryIndex.java @@ -16,6 +16,9 @@ */ package org.apache.ignite.cache; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; + import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -38,6 +41,7 @@ public class QueryIndex implements Serializable { private String name; /** */ + @GridToStringInclude private LinkedHashMap fields; /** */ @@ -260,4 +264,9 @@ public class QueryIndex implements Serializable { public void setInlineSize(int inlineSize) { this.inlineSize = inlineSize; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryIndex.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 2308a10..a2f7cc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -2096,7 +2096,7 @@ public class CacheConfiguration extends MutableConfiguration { @Nullable ClassProperty parent) { if (U.isJdk(cls) || QueryUtils.isGeometryClass(cls)) { if (parent == null && !key && QueryUtils.isSqlType(cls)) { // We have to index primitive _val. - String idxName = QueryUtils._VAL + "_idx"; + String idxName = cls.getSimpleName() + "_" + QueryUtils._VAL + "_idx"; type.addIndex(idxName, QueryUtils.isGeometryClass(cls) ? QueryIndexType.GEOSPATIAL : QueryIndexType.SORTED); @@ -2527,6 +2527,11 @@ public class CacheConfiguration extends MutableConfiguration { } /** {@inheritDoc} */ + @Override public String name() { + return null; + } + + /** {@inheritDoc} */ @Override public Collection fields() { Collection res = new ArrayList<>(fields.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index be9a1d6..98edf0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -58,7 +58,10 @@ public interface GridComponent { MARSHALLER_PROC, /** */ - BINARY_PROC + BINARY_PROC, + + /** Query processor. */ + QUERY_PROC } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 6fefb68..8462e5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -564,6 +564,13 @@ public interface GridKernalContext extends Iterable { public ExecutorService getQueryExecutorService(); /** + * Executor service that is in charge of processing schema change messages. + * + * @return Executor service that is in charge of processing schema change messages. + */ + public ExecutorService getSchemaExecutorService(); + + /** * Gets exception registry. * * @return Exception registry. http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 664e47c..213cf86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -340,6 +340,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected ExecutorService schemaExecSvc; + + /** */ + @GridToStringExclude private Map attrs = new HashMap<>(); /** */ @@ -396,8 +400,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param idxExecSvc Indexing executor service. * @param callbackExecSvc Callback executor service. * @param qryExecSvc Query executor service. + * @param schemaExecSvc Schema executor service. * @param plugins Plugin providers. - * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("TypeMayBeWeakened") protected GridKernalContextImpl( @@ -419,6 +423,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @Nullable ExecutorService idxExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, ExecutorService qryExecSvc, + ExecutorService schemaExecSvc, List plugins ) { assert grid != null; @@ -442,6 +447,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.idxExecSvc = idxExecSvc; this.callbackExecSvc = callbackExecSvc; this.qryExecSvc = qryExecSvc; + this.schemaExecSvc = schemaExecSvc; marshCtx = new MarshallerContextImpl(plugins); @@ -987,6 +993,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public ExecutorService getSchemaExecutorService() { + return schemaExecSvc; + } + + /** {@inheritDoc} */ @Override public IgniteExceptionRegistry exceptionRegistry() { return IgniteExceptionRegistry.get(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 7acc070..c382999 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -108,7 +108,10 @@ public enum GridTopic { TOPIC_HADOOP_MSG, /** */ - TOPIC_METADATA_REQ; + TOPIC_METADATA_REQ, + + /** */ + TOPIC_SCHEMA; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 9b41b58..922dd55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -311,6 +311,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** */ @GridToStringExclude + private ObjectName schemaExecSvcMBean; + + /** */ + @GridToStringExclude private ObjectName stripedExecSvcMBean; /** Kernal start timestamp. */ @@ -694,6 +698,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param idxExecSvc Indexing executor service. * @param callbackExecSvc Callback executor service. * @param qryExecSvc Query executor service. + * @param schemaExecSvc Schema executor service. * @param errHnd Error handler to use for notification about startup problems. * @throws IgniteCheckedException Thrown in case of any errors. */ @@ -714,6 +719,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @Nullable ExecutorService idxExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, ExecutorService qryExecSvc, + ExecutorService schemaExecSvc, GridAbsClosure errHnd ) throws IgniteCheckedException @@ -828,6 +834,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { idxExecSvc, callbackExecSvc, qryExecSvc, + schemaExecSvc, plugins ); @@ -1019,7 +1026,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Register MBeans. registerKernalMBean(); registerLocalNodeMBean(); - registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc, qryExecSvc); + registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc, qryExecSvc, + schemaExecSvc); + registerStripedExecutorMBean(stripedExecSvc); // Lifecycle bean notifications. @@ -1611,11 +1620,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** - * @param execSvc - * @param sysExecSvc - * @param p2pExecSvc - * @param mgmtExecSvc - * @param restExecSvc + * @param execSvc Public executor service. + * @param sysExecSvc System executor service. + * @param p2pExecSvc P2P executor service. + * @param mgmtExecSvc Management executor service. + * @param restExecSvc Query executor service. + * @param schemaExecSvc Schema executor service. * @throws IgniteCheckedException If failed. */ private void registerExecutorMBeans(ExecutorService execSvc, @@ -1623,12 +1633,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService restExecSvc, - ExecutorService qryExecSvc) throws IgniteCheckedException { + ExecutorService qryExecSvc, + ExecutorService schemaExecSvc) throws IgniteCheckedException { pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor"); sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor"); mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor"); p2PExecSvcMBean = registerExecutorMBean(p2pExecSvc, "GridClassLoadingExecutor"); qryExecSvcMBean = registerExecutorMBean(qryExecSvc, "GridQueryExecutor"); + schemaExecSvcMBean = registerExecutorMBean(schemaExecSvc, "GridSchemaExecutor"); ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration(); @@ -2151,6 +2163,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { unregisterMBean(locNodeMBean) & unregisterMBean(restExecSvcMBean) & unregisterMBean(qryExecSvcMBean) & + unregisterMBean(schemaExecSvcMBean) & unregisterMBean(stripedExecSvcMBean) )) errOnStop = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 61e93cf..2eda01c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1527,6 +1527,9 @@ public class IgnitionEx { /** Query executor service. */ private ThreadPoolExecutor qryExecSvc; + /** Query executor service. */ + private ThreadPoolExecutor schemaExecSvc; + /** Grid state. */ private volatile IgniteState state = STOPPED; @@ -1845,6 +1848,16 @@ public class IgnitionEx { qryExecSvc.allowCoreThreadTimeOut(true); + schemaExecSvc = new IgniteThreadPoolExecutor( + "schema", + cfg.getIgniteInstanceName(), + 2, + 2, + DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue()); + + schemaExecSvc.allowCoreThreadTimeOut(true); + // Register Ignite MBean for current grid instance. registerFactoryMbean(myCfg.getMBeanServer()); @@ -1872,6 +1885,7 @@ public class IgnitionEx { idxExecSvc, callbackExecSvc, qryExecSvc, + schemaExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); @@ -2464,6 +2478,10 @@ public class IgnitionEx { qryExecSvc = null; + U.shutdownNow(getClass(), schemaExecSvc, log); + + schemaExecSvc = null; + U.shutdownNow(getClass(), stripedExecSvc, log); stripedExecSvc = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index b615c35..83fc3b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -97,6 +97,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGF import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SCHEMA_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SERVICE_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL; @@ -700,6 +701,7 @@ public class GridIoManager extends GridManagerAdapter extends GridCacheSharedManagerAdap req.startCacheConfiguration(), req.cacheType(), false, - req.deploymentId()); + req.deploymentId(), + req.schema()); DynamicCacheDescriptor old = registeredCaches.put(cacheId, desc); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java index ca99511..ad0dcc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java @@ -21,9 +21,5 @@ package org.apache.ignite.internal.processors.cache; * Cache partition exchange worker task marker interface. */ public interface CachePartitionExchangeWorkerTask { - /** - * @return {@code True} if task denotes standard exchange task, {@code false} if this is a custom task which - * must be executed from within exchange thread. - */ - boolean isExchange(); + // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 1e5ab88..9d2563d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -17,16 +17,18 @@ package org.apache.ignite.internal.processors.cache; -import java.io.Serializable; -import java.util.UUID; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.QuerySchema; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import java.io.Serializable; +import java.util.UUID; + /** * Cache start/stop request. */ @@ -83,6 +85,9 @@ public class DynamicCacheChangeRequest implements Serializable { /** Reset lost partitions flag. */ private boolean resetLostPartitions; + /** Dynamic schema. */ + private QuerySchema schema; + /** */ private transient boolean exchangeNeeded; @@ -353,6 +358,20 @@ public class DynamicCacheChangeRequest implements Serializable { return rcvdFrom; } + /** + * @return Dynamic schema. + */ + public QuerySchema schema() { + return schema; + } + + /** + * @param schema Dynamic schema. + */ + public void schema(QuerySchema schema) { + this.schema = schema != null ? schema.copy() : null; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeRequest.class, this, "cacheName", cacheName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java index 5938785..92a7af3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java @@ -24,6 +24,8 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.plugin.CachePluginManager; +import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -84,6 +86,12 @@ public class DynamicCacheDescriptor { /** */ private transient AffinityTopologyVersion clientCacheStartVer; + /** Mutex to control schema. */ + private final Object schemaMux = new Object(); + + /** Current schema. */ + private QuerySchema schema; + /** * @param ctx Context. * @param cacheCfg Cache configuration. @@ -91,12 +99,15 @@ public class DynamicCacheDescriptor { * @param template {@code True} if this is template configuration. * @param deploymentId Deployment ID. */ + @SuppressWarnings("unchecked") public DynamicCacheDescriptor(GridKernalContext ctx, CacheConfiguration cacheCfg, CacheType cacheType, boolean template, - IgniteUuid deploymentId) { + IgniteUuid deploymentId, + QuerySchema schema) { assert cacheCfg != null; + assert schema != null; this.cacheCfg = cacheCfg; this.cacheType = cacheType; @@ -106,6 +117,10 @@ public class DynamicCacheDescriptor { pluginMgr = new CachePluginManager(ctx, cacheCfg); cacheId = CU.cacheId(cacheCfg.getName()); + + synchronized (schemaMux) { + this.schema = schema.copy(); + } } /** @@ -319,6 +334,39 @@ public class DynamicCacheDescriptor { this.clientCacheStartVer = clientCacheStartVer; } + /** + * @return Schema. + */ + public QuerySchema schema() { + synchronized (schemaMux) { + return schema.copy(); + } + } + + /** + * Set schema + * + * @param schema Schema. + */ + public void schema(QuerySchema schema) { + assert schema != null; + + synchronized (schemaMux) { + this.schema = schema.copy(); + } + } + + /** + * Try applying finish message. + * + * @param msg Message. + */ + public void schemaChangeFinish(SchemaFinishDiscoveryMessage msg) { + synchronized (schemaMux) { + schema.finish(msg); + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName())); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 2066342..1eab04e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -33,8 +33,8 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.util.lang.GridTuple3; -import org.apache.ignite.internal.util.typedef.T2; import org.jetbrains.annotations.Nullable; /** @@ -902,6 +902,17 @@ public interface GridCacheEntryEx { throws IgniteCheckedException, GridCacheEntryRemovedException; /** + * Update index from within entry lock, passing key, value, and expiration time to provided closure. + * + * @param clo Closure to apply to key, value, and expiration time. + * @param link Link. + * @throws IgniteCheckedException If failed. + * @throws GridCacheEntryRemovedException If entry was removed. + */ + public void updateIndex(SchemaIndexCacheVisitorClosure clo, long link) throws IgniteCheckedException, + GridCacheEntryRemovedException; + + /** * @return Expire time, without accounting for transactions or removals. */ public long rawExpireTime(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f4d4258..ddec684 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.util.IgniteTree; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@ -3374,6 +3375,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ + @Override public void updateIndex(SchemaIndexCacheVisitorClosure clo, long link) throws IgniteCheckedException, + GridCacheEntryRemovedException { + synchronized (this) { + if (isInternal()) + return; + + checkObsolete(); + + unswap(false); + + if (val != null) + clo.apply(key, partition(), val, ver, expireTimeUnlocked(), link); + } + } + + /** {@inheritDoc} */ @Override public EvictableEntry wrapEviction() { return new CacheEvictableEntryImpl<>(this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 2775aa7..4775ea1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -78,6 +78,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.schema.SchemaNodeLeaveExchangeWorkerTask; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -309,6 +310,10 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Do not start exchange for discovery event: " + evt); } + + // Notify indexing engine about node leave so that we can re-map coordinator accordingly. + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) + exchWorker.addCustomTask(new SchemaNodeLeaveExchangeWorkerTask(evt.eventNode())); } finally { leaveBusy(); @@ -752,17 +757,6 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** - * Add custom task. - * - * @param task Task. - */ - public void addCustomTask(CachePartitionExchangeWorkerTask task) { - assert !task.isExchange(); - - exchWorker.addCustomTask(task); - } - - /** * @param evt Discovery event. * @return Affinity topology version. */ @@ -1536,6 +1530,16 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana } /** + * Check if provided task from exchange queue is exchange task. + * + * @param task Task. + * @return {@code True} if this is exchange task. + */ + private static boolean isExchangeTask(CachePartitionExchangeWorkerTask task) { + return task instanceof GridDhtPartitionsExchangeFuture; + } + + /** * @param exchTopVer Exchange topology version. */ private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) { @@ -1682,7 +1686,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana void addCustomTask(CachePartitionExchangeWorkerTask task) { assert task != null; - assert !task.isExchange(); + assert !isExchangeTask(task); futQ.offer(task); } @@ -1693,6 +1697,8 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana * @param task Task. */ void processCustomTask(CachePartitionExchangeWorkerTask task) { + assert !isExchangeTask(task); + try { cctx.cache().processCustomExchangeTask(task); } @@ -1707,7 +1713,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana boolean hasPendingExchange() { if (!futQ.isEmpty()) { for (CachePartitionExchangeWorkerTask task : futQ) { - if (task.isExchange()) + if (isExchangeTask(task)) return true; } } @@ -1722,7 +1728,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana U.warn(log, "Pending exchange futures:"); for (CachePartitionExchangeWorkerTask task: futQ) { - if (task.isExchange()) + if (isExchangeTask(task)) U.warn(log, ">>> " + task); } } @@ -1773,7 +1779,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana if (task == null) continue; // Main while loop. - if (!task.isExchange()) { + if (!isExchangeTask(task)) { processCustomTask(task); continue;