Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 238A8200D02 for ; Fri, 8 Sep 2017 09:55:50 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 225F71609C8; Fri, 8 Sep 2017 07:55:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8438E160BFE for ; Fri, 8 Sep 2017 09:55:47 +0200 (CEST) Received: (qmail 20932 invoked by uid 500); 8 Sep 2017 07:55:46 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 20286 invoked by uid 99); 8 Sep 2017 07:55:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Sep 2017 07:55:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2006FF570E; Fri, 8 Sep 2017 07:55:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 08 Sep 2017 07:55:57 -0000 Message-Id: <850cf0e0fd8243428c87c657dbacdfd0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [15/27] ignite git commit: IGNITE-5572: SQL: ALTER TABLE ADD COLUMN support. This closes #2344. archived-at: Fri, 08 Sep 2017 07:55:50 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java new file mode 100644 index 0000000..969c985 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractConcurrentSelfTest.java @@ -0,0 +1,1056 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteClientReconnectAbstractTest; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.QueryField; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; +import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; + +import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi; + +/** + * Concurrency tests for dynamic index create/drop. + */ +@SuppressWarnings("unchecked") +public abstract class DynamicColumnsAbstractConcurrentSelfTest extends DynamicColumnsAbstractTest { + /** Test duration. */ + private static final long TEST_DUR = 10_000L; + + /** Large cache size. */ + private static final int LARGE_CACHE_SIZE = 100_000; + + /** Table name. */ + private static final String TBL_NAME = "PERSON"; + + /** Cache name. */ + private static final String CACHE_NAME = QueryUtils.createTableCacheName(QueryUtils.DFLT_SCHEMA, TBL_NAME); + + /** Attribute to filter node out of cache data nodes. */ + private static final String ATTR_FILTERED = "FILTERED"; + + /** SQL statement to create test table accompanied by template specification. */ + private static final String CREATE_SQL_WITH_TEMPLATE = CREATE_SQL + " WITH \"template=TPL\""; + + /** Latches to block certain index operations. */ + private static final ConcurrentHashMap> BLOCKS = + new ConcurrentHashMap<>(); + + /** Cache mode. */ + private final CacheMode cacheMode; + + /** Atomicity mode. */ + private final CacheAtomicityMode atomicityMode; + + /** + * Constructor. + * + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + */ + DynamicColumnsAbstractConcurrentSelfTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode) { + this.cacheMode = cacheMode; + this.atomicityMode = atomicityMode; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + GridQueryProcessor.idxCls = BlockingIndexing.class; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override protected void afterTest() throws Exception { + GridQueryProcessor.idxCls = null; + + for (T3 block : BLOCKS.values()) + block.get1().countDown(); + + BLOCKS.clear(); + + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000L; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception { + return super.commonConfiguration(idx).setDiscoverySpi(new TestTcpDiscoverySpi()); + } + + /** + * Make sure that coordinator migrates correctly between nodes. + * + * @throws Exception If failed. + */ + public void testCoordinatorChange() throws Exception { + CountDownLatch finishLatch = new CountDownLatch(2); + + // Start servers. + Ignite srv1 = ignitionStart(serverConfiguration(1), null); + Ignite srv2 = ignitionStart(serverConfiguration(2), null); + ignitionStart(serverConfiguration(3, true), finishLatch); + + UUID srv1Id = srv1.cluster().localNode().id(); + UUID srv2Id = srv2.cluster().localNode().id(); + + // Start client which will execute operations. + IgniteEx cli = (IgniteEx)ignitionStart(clientConfiguration(4), finishLatch); + + createSqlCache(cli); + + run(cli, CREATE_SQL_WITH_TEMPLATE); + + // Test migration between normal servers. + CountDownLatch idxLatch = blockIndexing(srv1Id); + + IgniteInternalFuture colFut1 = addCols(cli, QueryUtils.DFLT_SCHEMA, c("age", Integer.class.getName())); + + U.await(idxLatch); + + //srv1.close(); + Ignition.stop(srv1.name(), true); + + unblockIndexing(srv1Id); + + colFut1.get(); + + checkNodesState(TBL_NAME, c("age", Integer.class.getName())); + + // Test migration from normal server to non-affinity server. + idxLatch = blockIndexing(srv2Id); + + IgniteInternalFuture colFut2 = addCols(cli, QueryUtils.DFLT_SCHEMA, c("city", String.class.getName())); + + idxLatch.countDown(); + + //srv2.close(); + Ignition.stop(srv2.name(), true); + + U.await(idxLatch); + + colFut2.get(); + + checkNodesState(TBL_NAME, c("city", String.class.getName())); + } + + /** + * Test operations join. + * + * @throws Exception If failed. + */ + public void testOperationChaining() throws Exception { + // 7 nodes * 2 columns = 14 latch countdowns. + CountDownLatch finishLatch = new CountDownLatch(14); + + Ignite srv1 = ignitionStart(serverConfiguration(1), finishLatch); + + ignitionStart(serverConfiguration(2), finishLatch); + ignitionStart(serverConfiguration(3, true), finishLatch); + ignitionStart(clientConfiguration(4), finishLatch); + + createSqlCache(srv1); + + run(srv1, CREATE_SQL_WITH_TEMPLATE); + + CountDownLatch idxLatch = blockIndexing(srv1); + + QueryField c1 = c("age", Integer.class.getName()); + QueryField c2 = c("city", String.class.getName()); + + IgniteInternalFuture colFut1 = addCols(srv1, QueryUtils.DFLT_SCHEMA, c1); + + IgniteInternalFuture colFut2 = addCols(srv1, QueryUtils.DFLT_SCHEMA, c2); + + U.await(idxLatch); + + // Start even more nodes of different flavors + ignitionStart(serverConfiguration(5), finishLatch); + ignitionStart(serverConfiguration(6, true), finishLatch); + ignitionStart(clientConfiguration(7), finishLatch); + + assert !colFut1.isDone(); + assert !colFut2.isDone(); + + unblockIndexing(srv1); + + colFut1.get(); + colFut2.get(); + + U.await(finishLatch); + + checkNodesState(TBL_NAME, c1, c2); + } + + /** + * Test node join on pending operation. + * + * @throws Exception If failed. + */ + public void testNodeJoinOnPendingOperation() throws Exception { + CountDownLatch finishLatch = new CountDownLatch(4); + + Ignite srv1 = ignitionStart(serverConfiguration(1), finishLatch); + + createSqlCache(srv1); + + run(srv1, CREATE_SQL_WITH_TEMPLATE); + + CountDownLatch idxLatch = blockIndexing(srv1); + + QueryField c = c("age", Integer.class.getName()); + + IgniteInternalFuture idxFut = addCols(srv1, QueryUtils.DFLT_SCHEMA, c); + + U.await(idxLatch); + + ignitionStart(serverConfiguration(2), finishLatch); + ignitionStart(serverConfiguration(3, true), finishLatch); + ignitionStart(clientConfiguration(4), finishLatch); + + assertFalse(idxFut.isDone()); + + unblockIndexing(srv1); + + idxFut.get(); + + U.await(finishLatch); + + checkNodesState(TBL_NAME, c); + } + + /** + * PUT/REMOVE data from cache and add column concurrently. + * + * @throws Exception If failed, + */ + public void testConcurrentPutRemove() throws Exception { + CountDownLatch finishLatch = new CountDownLatch(4); + + // Start several nodes. + Ignite srv1 = ignitionStart(serverConfiguration(1), finishLatch); + ignitionStart(serverConfiguration(2), finishLatch); + ignitionStart(serverConfiguration(3), finishLatch); + ignitionStart(serverConfiguration(4), finishLatch); + + awaitPartitionMapExchange(); + + createSqlCache(srv1); + + run(srv1, CREATE_SQL_WITH_TEMPLATE); + + // Start data change operations from several threads. + final AtomicBoolean stopped = new AtomicBoolean(); + + IgniteInternalFuture updateFut = multithreadedAsync(new Callable() { + @Override public Void call() throws Exception { + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + int key = ThreadLocalRandom.current().nextInt(0, LARGE_CACHE_SIZE); + int val = ThreadLocalRandom.current().nextInt(); + + IgniteCache cache = node.cache(CACHE_NAME); + + if (ThreadLocalRandom.current().nextBoolean()) + cache.put(key(node, key), val(node, val)); + else + cache.remove(key(node, key)); + } + + return null; + } + }, 4); + + // Let some to arrive. + Thread.sleep(500L); + + addCols(srv1, QueryUtils.DFLT_SCHEMA, c("v", Integer.class.getName())).get(); + + // Stop updates once index is ready. + stopped.set(true); + + updateFut.get(); + + finishLatch.await(); + + // Make sure new column is there. + checkNodesState(TBL_NAME, c("v", Integer.class.getName())); + + run(srv1, "update person set \"v\" = case when mod(id, 2) <> 0 then substring(name, 7, length(name) - 6) " + + "else null end"); + + // Get expected values. + Set expKeys = new HashSet<>(); + + IgniteCache cache = srv1.cache(CACHE_NAME).withKeepBinary(); + + for (int i = 0; i < LARGE_CACHE_SIZE; i++) { + BinaryObject key = key(srv1, i); + + BinaryObject val = cache.get(key); + + if (val != null) { + int id = key.field("ID"); + + assertEquals(i, id); + + if (id % 2 != 0) + expKeys.add(i); + } + } + + String valTypeName = ((IgniteEx)srv1).context().query().types(CACHE_NAME).iterator().next().valueTypeName(); + + // Validate query result. + for (Ignite node : Ignition.allGrids()) { + IgniteCache nodeCache = node.cache(CACHE_NAME).withKeepBinary(); + + SqlQuery qry = new SqlQuery(valTypeName, "from " + TBL_NAME + " where mod(id, 2) <> 0"); + + List> res = nodeCache.query(qry).getAll(); + + assertEquals("Cache size mismatch [exp=" + expKeys.size() + ", actual=" + res.size() + ']', + expKeys.size(), res.size()); + + for (Cache.Entry entry : res) { + int key = entry.getKey().field("ID"); + int v = entry.getValue().field("v"); + + String name = entry.getValue().field("NAME"); + + assertTrue("Expected key is not in result set: " + key, expKeys.contains(key)); + + assertEquals(Integer.parseInt(name.substring(6)), v); + } + + } + } + + /** + * @param node Node. + * @param val Number to form string field value from. + * @return PERSON cache value. + */ + private BinaryObject val(Ignite node, int val) { + String valTypeName = ((IgniteEx)node).context().query().types(CACHE_NAME).iterator().next().valueTypeName(); + + return node.binary().builder(valTypeName).setField("name", "person" + val).build(); + } + + /** + * @param node Node. + * @param key Value for ID field. + * @return PERSON cache key. + */ + private BinaryObject key(Ignite node, int key) { + String keyTypeName = ((IgniteEx)node).context().query().types(CACHE_NAME).iterator().next().keyTypeName(); + + return node.binary().builder(keyTypeName).setField("ID", key).build(); + } + + /** + * Test index consistency on re-balance. + * + * @throws Exception If failed. + */ + public void testConcurrentRebalance() throws Exception { + // Start cache and populate it with data. + Ignite srv1 = ignitionStart(serverConfiguration(1)); + Ignite srv2 = ignitionStart(serverConfiguration(2)); + + createSqlCache(srv1); + + run(srv1, CREATE_SQL_WITH_TEMPLATE); + + awaitPartitionMapExchange(); + + put(srv1, 0, LARGE_CACHE_SIZE); + + // Start index operation in blocked state. + CountDownLatch idxLatch1 = blockIndexing(srv1); + CountDownLatch idxLatch2 = blockIndexing(srv2); + + QueryField c = c("salary", Float.class.getName()); + + final IgniteInternalFuture idxFut = addCols(srv1, QueryUtils.DFLT_SCHEMA, c); + + U.await(idxLatch1); + U.await(idxLatch2); + + // Start two more nodes and unblock index operation in the middle. + ignitionStart(serverConfiguration(3)); + + unblockIndexing(srv1); + unblockIndexing(srv2); + + ignitionStart(serverConfiguration(4)); + + awaitPartitionMapExchange(); + + // Validate index state. + idxFut.get(); + + checkNodesState(TBL_NAME, c); + } + + /** + * Put to cache keys and values for range from startIdx to endIdx. + * @param node Node. + * @param startIdx Starting index. + * @param endIdx Ending index. + */ + private void put(Ignite node, int startIdx, int endIdx) { + for (int i = startIdx; i < endIdx; i++) + node.cache(CACHE_NAME).put(key(node, i), val(node, i)); + } + + /** + * Check what happens in case cache is destroyed before operation is started. + * + * @throws Exception If failed. + */ + public void testConcurrentCacheDestroy() throws Exception { + // Start complex topology. + Ignite srv1 = ignitionStart(serverConfiguration(1)); + + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); + + Ignite cli = ignitionStart(clientConfiguration(4)); + + // Start cache and populate it with data. + createSqlCache(cli); + + run(cli, CREATE_SQL_WITH_TEMPLATE); + + put(cli, 0, LARGE_CACHE_SIZE); + + // Start index operation and block it on coordinator. + CountDownLatch idxLatch = blockIndexing(srv1); + + QueryField c = c("city", String.class.getName()); + + final IgniteInternalFuture idxFut = addCols(srv1, QueryUtils.DFLT_SCHEMA, c); + + idxLatch.await(); + + // Destroy cache (drop table). + run(cli, DROP_SQL); + + // Unblock indexing and see what happens. + unblockIndexing(srv1); + + try { + idxFut.get(); + + fail("Exception has not been thrown."); + } + catch (SchemaOperationException e) { + // No-op. + } + } + + /** + * Make sure that contended operations on the same index from different nodes do not hang when we issue both + * CREATE/DROP and SELECT statements. + * + * @throws Exception If failed. + */ + public void testQueryConsistencyMultithreaded() throws Exception { + // Start complex topology. + ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); + + Ignite cli = ignitionStart(clientConfiguration(4)); + + createSqlCache(cli); + + run(cli, CREATE_SQL_WITH_TEMPLATE); + + put(cli, 0, 5000); + + final AtomicBoolean stopped = new AtomicBoolean(); + + final AtomicInteger dynColCnt = new AtomicInteger(); + + IgniteInternalFuture fut = multithreadedAsync(new Callable() { + @Override public Void call() throws Exception { + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + IgniteInternalFuture fut = addCols(node, QueryUtils.DFLT_SCHEMA, c("newCol" + dynColCnt.getAndIncrement(), + Integer.class.getName())); + + try { + fut.get(); + } + catch (SchemaOperationException e) { + // No-op. + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + + return null; + } + }, 1); + + IgniteInternalFuture qryFut = multithreadedAsync(new Callable() { + @Override public Void call() throws Exception { + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + IgniteCache cache = node.cache(CACHE_NAME).withKeepBinary(); + + String valTypeName = ((IgniteEx)node).context().query().types(CACHE_NAME) + .iterator().next().valueTypeName(); + + List> res = cache.query( + new SqlQuery(valTypeName, "from " + TBL_NAME)).getAll(); + + assertEquals(5000, res.size()); + } + + return null; + } + }, 8); + + Thread.sleep(TEST_DUR); + + stopped.set(true); + + // Make sure nothing hanged. + fut.get(); + qryFut.get(); + } + + /** + * Make sure that client receives schema changes made while it was disconnected. + * + * @throws Exception If failed. + */ + public void testClientReconnect() throws Exception { + checkClientReconnect(false, true); + } + + /** + * Make sure that client receives schema changes made while it was disconnected, even with cache recreation. + * + * @throws Exception If failed. + */ + public void testClientReconnectWithCacheRestart() throws Exception { + checkClientReconnect(true, true); + } + + /** + * Make sure that client receives schema changes made while it was disconnected. + * + * @throws Exception If failed. + */ + public void testClientReconnectWithNonDynamicCache() throws Exception { + checkClientReconnect(false, false); + } + + /** + * Make sure that client receives schema changes made while it was disconnected, even with cache recreation. + * + * @throws Exception If failed. + */ + public void testClientReconnectWithNonDynamicCacheRestart() throws Exception { + checkClientReconnect(true, false); + } + + /** + * Make sure that client receives schema changes made while it was disconnected, optionally with cache restart + * in the interim. + * + * @param restartCache Whether cache needs to be recreated during client's absence. + * @param dynamicCache Whether recreate, if needed, should be done on dynamic or static cache. + * @throws Exception If failed. + */ + private void checkClientReconnect(final boolean restartCache, boolean dynamicCache) throws Exception { + // Start complex topology. + final Ignite srv = ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); + + final Ignite cli = ignitionStart(clientConfiguration(4)); + + if (dynamicCache) { + createSqlCache(cli); + + run(cli, CREATE_SQL_WITH_TEMPLATE); + } + + final String schemaName = dynamicCache ? QueryUtils.DFLT_SCHEMA : "idx"; + + final QueryField[] cols = + new QueryField[] { c("age", Integer.class.getName()), c("city", String.class.getName()) }; + + // Check index create. + reconnectClientNode(srv, cli, restartCache, dynamicCache, new RunnableX() { + @Override public void run() throws Exception { + addCols(srv, schemaName, cols).get(); + } + }); + + checkNodeState((IgniteEx)cli, schemaName, TBL_NAME, cols); + } + + /** + * Reconnect the client and run specified actions while it's out. + * + * @param srvNode Server node. + * @param cliNode Client node. + * @param restart Whether cache has to be recreated prior to executing required actions. + * @param dynamicCache Whether recreate, if needed, should be done on dynamic or static cache. + * @param clo Closure to run + * @throws Exception If failed. + */ + private void reconnectClientNode(final Ignite srvNode, final Ignite cliNode, final boolean restart, + final boolean dynamicCache, final RunnableX clo) throws Exception { + IgniteClientReconnectAbstractTest.reconnectClientNode(log, cliNode, srvNode, new Runnable() { + @Override public void run() { + if (restart) { + if (dynamicCache) { + DynamicColumnsAbstractConcurrentSelfTest.this.run(srvNode, DROP_SQL); + + DynamicColumnsAbstractConcurrentSelfTest.this.run(srvNode, CREATE_SQL_WITH_TEMPLATE); + } + else { + srvNode.destroyCache("idx"); + + CacheConfiguration ccfg; + + try { + ccfg = clientConfiguration(0).getCacheConfiguration()[0]; + } + catch (Exception e) { + throw new AssertionError(e); + } + + srvNode.createCache(ccfg); + } + } + + try { + clo.run(); + } + catch (Exception e) { + throw new IgniteException("Test reconnect runnable failed.", e); + } + } + }); + + if (restart) + cliNode.cache(CACHE_NAME); + } + + /** + * Test concurrent node start/stop along with index operations. Nothing should hang. + * + * @throws Exception If failed. + */ + @SuppressWarnings("StringConcatenationInLoop") + public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Exception { + // Start several stable nodes. + ignitionStart(serverConfiguration(1)); + ignitionStart(serverConfiguration(2)); + ignitionStart(serverConfiguration(3, true)); + + final Ignite cli = ignitionStart(clientConfiguration(4)); + + createSqlCache(cli); + + run(cli, CREATE_SQL_WITH_TEMPLATE); + + final AtomicBoolean stopped = new AtomicBoolean(); + + // Start node start/stop worker. + final AtomicInteger nodeIdx = new AtomicInteger(4); + + final AtomicInteger dynColCnt = new AtomicInteger(); + + IgniteInternalFuture startStopFut = multithreadedAsync(new Callable() { + @Override public Void call() throws Exception { + boolean exists = false; + + int lastIdx = 0; + + while (!stopped.get()) { + if (exists) { + stopGrid(lastIdx); + + exists = false; + } + else { + lastIdx = nodeIdx.incrementAndGet(); + + IgniteConfiguration cfg; + + switch (ThreadLocalRandom.current().nextInt(0, 3)) { + case 1: + cfg = serverConfiguration(lastIdx, false); + + break; + + case 2: + + cfg = serverConfiguration(lastIdx, true); + + break; + + default: + cfg = clientConfiguration(lastIdx); + } + + ignitionStart(cfg); + + exists = true; + } + + Thread.sleep(ThreadLocalRandom.current().nextLong(500L, 1500L)); + } + + return null; + } + }, 1); + + IgniteInternalFuture idxFut = multithreadedAsync(new Callable() { + @Override public Void call() throws Exception { + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + IgniteInternalFuture fut = addCols(node, QueryUtils.DFLT_SCHEMA, c("newCol" + dynColCnt.getAndIncrement(), + Integer.class.getName())); + + try { + fut.get(); + } + catch (SchemaOperationException e) { + // No-op. + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + + return null; + } + }, 1); + + Thread.sleep(TEST_DUR); + + stopped.set(true); + + // Make sure nothing hanged. + startStopFut.get(); + idxFut.get(); + + // Make sure cache is operational at this point. + createSqlCache(cli); + + QueryField[] expCols = new QueryField[dynColCnt.get()]; + + // Too many index columns kills indexing internals, have to limit number of the columns + // to build the index on. + int idxColsCnt = Math.min(300, expCols.length); + + Integer[] args = new Integer[idxColsCnt]; + + String updQry = "UPDATE " + TBL_NAME + " SET "; + + String idxQry = "CREATE INDEX idx ON " + TBL_NAME + '('; + + for (int i = 0; i < expCols.length; i++) { + expCols[i] = c("newCol" + i, Integer.class.getName()); + + if (i >= idxColsCnt) + continue; + + if (i > 0) { + updQry += ", "; + + idxQry += ", "; + } + + updQry += "\"newCol" + i + "\" = id + ?"; + + idxQry += "\"newCol" + i + '"'; + + args[i] = i; + } + + idxQry += ')'; + + checkNodesState(TBL_NAME, expCols); + + put(cli, 0, 500); + + run(cli.cache(CACHE_NAME), updQry, (Object[])args); + + run(cli, idxQry); + + run(cli, "DROP INDEX idx"); + } + + /** + * Block indexing. + * + * @param node Node. + */ + @SuppressWarnings("SuspiciousMethodCalls") + private static CountDownLatch blockIndexing(Ignite node) { + UUID nodeId = ((IgniteEx)node).localNode().id(); + + return blockIndexing(nodeId); + } + + /** + * Block indexing. + * + * @param nodeId Node. + */ + @SuppressWarnings("SuspiciousMethodCalls") + private static CountDownLatch blockIndexing(UUID nodeId) { + assertFalse(BLOCKS.contains(nodeId)); + + CountDownLatch idxLatch = new CountDownLatch(1); + + BLOCKS.put(nodeId, new T3<>(new CountDownLatch(1), new AtomicBoolean(), idxLatch)); + + return idxLatch; + } + + /** + * Unblock indexing. + * + * @param node Node. + */ + private static void unblockIndexing(Ignite node) { + UUID nodeId = ((IgniteEx)node).localNode().id(); + + unblockIndexing(nodeId); + } + + /** + * Unblock indexing. + * + * @param nodeId Node ID. + */ + @SuppressWarnings("ConstantConditions") + private static void unblockIndexing(UUID nodeId) { + T3 blocker = BLOCKS.remove(nodeId); + + assertNotNull(blocker); + + blocker.get1().countDown(); + } + + /** + * Await indexing. + * + * @param nodeId Node ID. + */ + @SuppressWarnings("ConstantConditions") + private static void awaitIndexing(UUID nodeId) { + T3 blocker = BLOCKS.get(nodeId); + + if (blocker != null) { + assertTrue(blocker.get2().compareAndSet(false, true)); + + blocker.get3().countDown(); + + while (true) { + try { + blocker.get1().await(); + + break; + } + catch (InterruptedException e) { + // No-op. + } + } + } + } + + /** + * Blocking indexing processor. + */ + private static class BlockingIndexing extends IgniteH2Indexing { + /** {@inheritDoc} */ + @Override public void dynamicAddColumn(String schemaName, String tblName, List cols, + boolean ifTblExists, boolean ifColNotExists) + throws IgniteCheckedException { + awaitIndexing(ctx.localNodeId()); + + super.dynamicAddColumn(schemaName, tblName, cols, ifTblExists, ifColNotExists); + } + } + + /** + * + * @param node Target node. + * @param schemaName Schema name. + * @param flds Columns to add. + * @return DDL operation future. + */ + private static IgniteInternalFuture addCols(Ignite node, String schemaName, QueryField... flds) { + final String cacheName = F.eq(schemaName, QueryUtils.DFLT_SCHEMA) ? CACHE_NAME : "idx"; + + return ((IgniteEx)node).context().query().dynamicColumnAdd(cacheName, schemaName, TBL_NAME, + Arrays.asList(flds), false, false); + } + + /** + * Start SQL cache on given node. + * @param node Node to create cache on. + * @return Created cache. + */ + private IgniteCache createSqlCache(Ignite node) throws IgniteCheckedException { + node.addCacheConfiguration(new CacheConfiguration<>("TPL") + .setCacheMode(cacheMode) + .setAtomicityMode(atomicityMode) + .setNodeFilter(new NodeFilter())); + + return node.getOrCreateCache(new CacheConfiguration<>("idx").setIndexedTypes(Integer.class, Integer.class)); + } + + /** + * Spoof blocking indexing class and start new node. + * @param cfg Node configuration. + * @return New node. + */ + private static Ignite ignitionStart(IgniteConfiguration cfg) { + return ignitionStart(cfg, null); + } + + /** + * Spoof blocking indexing class and start new node. + * @param cfg Node configuration. + * @param latch Latch to await for ultimate completion of DDL operations. + * @return New node. + */ + private static Ignite ignitionStart(IgniteConfiguration cfg, final CountDownLatch latch) { + // Have to do this for each starting node - see GridQueryProcessor ctor, it nulls + // idxCls static field on each call. + GridQueryProcessor.idxCls = BlockingIndexing.class; + + IgniteEx node = (IgniteEx)Ignition.start(cfg); + + if (latch != null) + node.context().discovery().setCustomEventListener(SchemaFinishDiscoveryMessage.class, + new CustomEventListener() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, + SchemaFinishDiscoveryMessage msg) { + latch.countDown(); + } + }); + + return node; + } + + /** + * Create server configuration. + * @param nodeIdx Node index. + * @param filtered Whether this node should not be treated as affinity node. + * @return Configuration. + * @throws Exception if failed. + */ + private IgniteConfiguration serverConfiguration(int nodeIdx, boolean filtered) throws Exception { + IgniteConfiguration cfg = serverConfiguration(nodeIdx); + + if (filtered) + cfg.setUserAttributes(Collections.singletonMap(ATTR_FILTERED, true)); + + return cfg; + } + + /** + * Runnable which can throw checked exceptions. + */ + interface RunnableX { + /** + * Do run. + * + * @throws Exception If failed. + */ + @SuppressWarnings("UnnecessaryInterfaceModifier") + public void run() throws Exception; + } + + /** + * Node filter. + */ + protected static class NodeFilter implements IgnitePredicate, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return node.attribute(ATTR_FILTERED) == null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractTest.java new file mode 100644 index 0000000..75b9a30 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsAbstractTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.configuration.MemoryPolicyConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.QueryField; +import org.apache.ignite.internal.processors.query.QuerySchema; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.h2.table.Column; +import org.h2.value.DataType; + +/** + * Common stuff for dynamic columns tests. + */ +public abstract class DynamicColumnsAbstractTest extends GridCommonAbstractTest { + /** SQL to create test table. */ + final static String CREATE_SQL = "CREATE TABLE IF NOT EXISTS Person (id int primary key, name varchar)"; + + /** SQL to drop test table. */ + final static String DROP_SQL = "DROP TABLE Person"; + + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** + * Check that given columns have been added to all related structures on target node exactly where needed + * (namely, schema in cache descriptor, type descriptor on started cache, and H2 state on started cache). + * @param node Target node. + * @param schemaName Schema name to look for the table in. + * @param tblName Table name to check. + * @param cols Columns whose presence must be checked. + */ + static void checkNodeState(IgniteEx node, String schemaName, String tblName, QueryField... cols) { + String cacheName = F.eq(schemaName, QueryUtils.DFLT_SCHEMA) ? + QueryUtils.createTableCacheName(schemaName, tblName) : schemaName; + + // Schema state check - should pass regardless of cache state. + { + DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName); + + assertNotNull("Cache descriptor not found", desc); + + assertTrue(desc.sql() == F.eq(schemaName, QueryUtils.DFLT_SCHEMA)); + + QuerySchema schema = desc.schema(); + + assertNotNull(schema); + + QueryEntity entity = null; + + for (QueryEntity e : schema.entities()) { + if (F.eq(tblName, e.getTableName())) { + entity = e; + + break; + } + } + + assertNotNull("Query entity not found", entity); + + Iterator> it = entity.getFields().entrySet().iterator(); + + for (int i = entity.getFields().size() - cols.length; i > 0 && it.hasNext(); i--) + it.next(); + + for (QueryField col : cols) { + assertTrue("New column not found in query entity: " + col.name(), it.hasNext()); + + Map.Entry e = it.next(); + + assertEquals(col.name(), e.getKey()); + + assertEquals(col.typeName(), e.getValue()); + } + } + + // Start cache on this node if we haven't yet. + node.cache(cacheName); + + // Type descriptor state check. + { + Collection descs = node.context().query().types(cacheName); + + GridQueryTypeDescriptor desc = null; + + for (GridQueryTypeDescriptor d : descs) { + if (F.eq(tblName, d.tableName())) { + desc = d; + + break; + } + } + + assertNotNull("Type descriptor not found", desc); + + Iterator>> it = desc.fields().entrySet().iterator(); + + for (int i = desc.fields().size() - cols.length; i > 0 && it.hasNext(); i--) + it.next(); + + for (QueryField col : cols) { + assertTrue("New column not found in type descriptor: " + col.name(), it.hasNext()); + + Map.Entry> e = it.next(); + + assertEquals(col.name(), e.getKey()); + + assertEquals(col.typeName(), e.getValue().getName()); + } + } + + // H2 table state check. + { + GridH2Table tbl = ((IgniteH2Indexing)node.context().query().getIndexing()).dataTable(schemaName, + tblName); + + assertNotNull("Table not found", tbl); + + Iterator colIt = Arrays.asList(tbl.getColumns()).iterator(); + + GridH2RowDescriptor rowDesc = tbl.rowDescriptor(); + + int i = 0; + + for (; i < tbl.getColumns().length - cols.length && colIt.hasNext(); i++) + colIt.next(); + + for (QueryField col : cols) { + assertTrue("New column not found in H2 table: " + col.name(), colIt.hasNext()); + + assertTrue(colIt.hasNext()); + + Column c = colIt.next(); + + assertEquals(col.name(), c.getName()); + + assertEquals(col.typeName(), DataType.getTypeClassName(c.getType())); + + assertFalse(rowDesc.isKeyValueOrVersionColumn(i)); + + try { + assertEquals(DataType.getTypeFromClass(Class.forName(col.typeName())), + rowDesc.fieldType(i - GridH2AbstractKeyValueRow.DEFAULT_COLUMNS_COUNT)); + } + catch (ClassNotFoundException e) { + throw new AssertionError(e); + } + + i++; + } + } + } + + /** + * Check that given columns have been added to all related structures on all started nodes (namely, schema + * in cache descriptor, type descriptor on started cache, and H2 state on started cache). + * @param tblName Table name to check. + * @param cols Columns whose presence must be checked. + */ + static void checkNodesState(String tblName, QueryField... cols) { + for (Ignite node : Ignition.allGrids()) + checkNodeState((IgniteEx)node, QueryUtils.DFLT_SCHEMA, tblName, cols); + } + + /** + * @param name New column name. + * @param typeName Class name for this new column's data type. + * @return New column with given name and type. + */ + protected static QueryField c(String name, String typeName) { + return new QueryField(name, typeName); + } + + /** + * @param idx Node index. + * @return Client configuration. + * @throws Exception if failed. + */ + protected IgniteConfiguration clientConfiguration(int idx) throws Exception { + QueryEntity e = new QueryEntity(Integer.class.getName(), "Person"); + + LinkedHashMap flds = new LinkedHashMap<>(); + + flds.put("name", String.class.getName()); + + e.setFields(flds); + + return commonConfiguration(idx).setClientMode(true).setCacheConfiguration( + new CacheConfiguration<>("idx").setQueryEntities(Collections.singletonList(e)) + ); + } + + /** + * Create common node configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + protected IgniteConfiguration commonConfiguration(int idx) throws Exception { + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(idx)); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + MemoryConfiguration memCfg = new MemoryConfiguration() + .setDefaultMemoryPolicyName("default") + .setMemoryPolicies( + new MemoryPolicyConfiguration() + .setName("default") + .setMaxSize(32 * 1024 * 1024L) + .setInitialSize(32 * 1024 * 1024L) + ); + + cfg.setMemoryConfiguration(memCfg); + + return optimize(cfg); + } + + /** + * Create server node configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + IgniteConfiguration serverConfiguration(int idx) throws Exception { + return commonConfiguration(idx); + } + + /** + * Execute SQL command and return resulting dataset. + * @param node Node to run query from. + * @param sql Statement. + * @return result. + */ + protected List> run(Ignite node, String sql) { + return ((IgniteEx)node).context().query() + .querySqlFieldsNoCache(new SqlFieldsQuery(sql).setSchema(QueryUtils.DFLT_SCHEMA), true).getAll(); + } + + /** + * Execute SQL command and return resulting dataset. + * @param cache Cache to initiate query from. + * @param sql Statement. + * @return result. + */ + protected List> run(IgniteCache cache, String sql, Object... args) { + return cache.query(new SqlFieldsQuery(sql).setSchema(QueryUtils.DFLT_SCHEMA).setArgs(args)).getAll(); + } + + /** + * Run specified statement expected to throw {@code IgniteSqlException} with expected specified message. + * @param sql Statement. + * @param msg Expected message. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + protected void assertThrows(final Ignite node, final String sql, String msg) { + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + run(node, sql); + + return null; + } + }, IgniteSQLException.class, msg); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicPartitionedSelfTest.java new file mode 100644 index 0000000..ca68903 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicPartitionedSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check concurrent operations on dynamic columns on ATOMIC PARTITIONED cache. + */ +public class DynamicColumnsConcurrentAtomicPartitionedSelfTest extends DynamicColumnsAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicColumnsConcurrentAtomicPartitionedSelfTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicReplicatedSelfTest.java new file mode 100644 index 0000000..9a3a32c --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentAtomicReplicatedSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check concurrent operations on dynamic columns on ATOMIC REPLICATED cache. + */ +public class DynamicColumnsConcurrentAtomicReplicatedSelfTest extends DynamicColumnsAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicColumnsConcurrentAtomicReplicatedSelfTest() { + super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalPartitionedSelfTest.java new file mode 100644 index 0000000..f42a447 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalPartitionedSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check concurrent operations on dynamic columns on TRANSACTIONAL PARTITIONED cache. + */ +public class DynamicColumnsConcurrentTransactionalPartitionedSelfTest extends DynamicColumnsAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicColumnsConcurrentTransactionalPartitionedSelfTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalReplicatedSelfTest.java new file mode 100644 index 0000000..2b53e42 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicColumnsConcurrentTransactionalReplicatedSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Test to check concurrent operations on dynamic columns on TRANSACTIONAL REPLICATED cache. + */ +public class DynamicColumnsConcurrentTransactionalReplicatedSelfTest extends DynamicColumnsAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicColumnsConcurrentTransactionalReplicatedSelfTest() { + super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsAbstractBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsAbstractBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsAbstractBasicSelfTest.java new file mode 100644 index 0000000..651a3f7 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsAbstractBasicSelfTest.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.query.QueryField; +import org.apache.ignite.internal.processors.query.QueryUtils; + +/** + * Test to check dynamic columns related features. + */ +public abstract class H2DynamicColumnsAbstractBasicSelfTest extends DynamicColumnsAbstractTest { + /** + * Index of coordinator node. + */ + final static int SRV_CRD_IDX = 0; + + /** + * Index of non coordinator server node. + */ + final static int SRV_IDX = 1; + + /** + * Index of client. + */ + final static int CLI_IDX = 2; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (IgniteConfiguration cfg : configurations()) + Ignition.start(cfg); + } + + /** + * @return Grid configurations to start. + * @throws Exception if failed. + */ + private IgniteConfiguration[] configurations() throws Exception { + return new IgniteConfiguration[] { + commonConfiguration(0), + commonConfiguration(1), + clientConfiguration(2) + }; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + run(CREATE_SQL); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + run(DROP_SQL); + + super.afterTest(); + } + + /** + * Test column addition to the end of the columns list. + */ + public void testAddColumnSimple() { + run("ALTER TABLE Person ADD COLUMN age int"); + + doSleep(500); + + QueryField c = c("AGE", Integer.class.getName()); + + for (Ignite node : Ignition.allGrids()) + checkNodeState((IgniteEx)node, QueryUtils.DFLT_SCHEMA, "PERSON", c); + } + + /** + * Test column addition to the end of the columns list. + */ + public void testAddFewColumnsSimple() { + run("ALTER TABLE Person ADD COLUMN (age int, \"city\" varchar)"); + + doSleep(500); + + for (Ignite node : Ignition.allGrids()) + checkNodeState((IgniteEx)node, QueryUtils.DFLT_SCHEMA, "PERSON", + c("AGE", Integer.class.getName()), + c("city", String.class.getName())); + } + + /** + * Test {@code IF EXISTS} handling. + */ + public void testIfTableExists() { + run("ALTER TABLE if exists City ADD COLUMN population int"); + } + + /** + * Test {@code IF NOT EXISTS} handling. + */ + public void testIfColumnNotExists() { + run("ALTER TABLE Person ADD COLUMN if not exists name varchar"); + } + + /** + * Test {@code IF NOT EXISTS} handling. + */ + public void testDuplicateColumnName() { + assertThrows("ALTER TABLE Person ADD COLUMN name varchar", "Column already exists: NAME"); + } + + /** + * Test behavior in case of missing table. + */ + public void testMissingTable() { + assertThrows("ALTER TABLE City ADD COLUMN name varchar", "Table doesn't exist: CITY"); + } + + /** */ + @SuppressWarnings("unchecked") + public void testComplexOperations() { + IgniteCache cache = ignite(nodeIndex()) + .cache(QueryUtils.createTableCacheName(QueryUtils.DFLT_SCHEMA, "PERSON")); + + run(cache, "ALTER TABLE Person ADD COLUMN city varchar"); + + run(cache, "INSERT INTO Person (id, name, city) values (1, 'John Doe', 'New York')"); + + run(cache, "INSERT INTO Person (id, name, city) values (2, 'Mike Watts', 'Denver')"); + + run(cache, "INSERT INTO Person (id, name, city) values (3, 'Ann Pierce', 'New York')"); + + run(cache, "CREATE INDEX pidx1 ON Person(name, city desc)"); + + CacheConfiguration ccfg = defaultCacheConfiguration().setName("City") + .setIndexedTypes(Integer.class, City.class).setSqlSchema(QueryUtils.DFLT_SCHEMA); + + ccfg.getQueryEntities().iterator().next().setKeyFieldName("id"); + + ignite(nodeIndex()).getOrCreateCache(ccfg); + + run(cache, "ALTER TABLE City ADD COLUMN population int"); + + run(cache, "CREATE INDEX cidx1 ON City(population)"); + + run(cache, "CREATE INDEX cidx2 ON City(name)"); + + run(cache, "INSERT INTO City(id, name, population, state) values (5, 'New York', 15000000, 'New York')," + + "(7, 'Denver', 3000000, 'Colorado')"); + + List> res = run(cache, "SELECT p.name from Person p join City c on p.city = c.name where " + + "c.population > 5000000 order by p.name"); + + assertEquals(2, res.size()); + + assertEquals(Collections.singletonList("Ann Pierce"), res.get(0)); + + assertEquals(Collections.singletonList("John Doe"), res.get(1)); + + run(cache, "ALTER TABLE Person ADD COLUMN age int"); + + run(cache, "UPDATE Person SET age = (5 - id) * 10"); + + res = run(cache, "SELECT p.name from Person p join City c on p.city = c.name where " + + "c.population > 5000000 and age < 40"); + + assertEquals(1, res.size()); + + assertEquals(Collections.singletonList("Ann Pierce"), res.get(0)); + + run(cache, "CREATE INDEX pidx2 on Person(age desc)"); + + run(cache, "DROP INDEX pidx2"); + + run(cache, "DROP INDEX pidx1"); + + run(cache, "DROP INDEX cidx2"); + + run(cache, "DROP INDEX cidx1"); + + run(cache, "DELETE FROM Person where age > 10"); + + assertEquals(0, cache.size()); + + ignite(nodeIndex()).destroyCache("City"); + } + + /** + * Test that we can add columns dynamically to tables associated with non dynamic caches as well. + */ + public void testAddColumnToNonDynamicCache() { + run("ALTER TABLE \"idx\".PERSON ADD COLUMN CITY varchar"); + + doSleep(500); + + QueryField c = c("CITY", String.class.getName()); + + for (Ignite node : Ignition.allGrids()) + checkNodeState((IgniteEx)node, "idx", "PERSON", c); + } + + /** + * Test that we can add columns dynamically to tables associated with non dynamic caches storing user types as well. + */ + @SuppressWarnings("unchecked") + public void testAddColumnToNonDynamicCacheWithRealValueType() { + CacheConfiguration ccfg = defaultCacheConfiguration().setName("City") + .setIndexedTypes(Integer.class, City.class); + + IgniteCache cache = ignite(nodeIndex()).getOrCreateCache(ccfg); + + run(cache, "ALTER TABLE \"City\".City ADD COLUMN population int"); + + doSleep(500); + + QueryField c = c("POPULATION", Integer.class.getName()); + + for (Ignite node : Ignition.allGrids()) + checkNodeState((IgniteEx)node, "City", "CITY", c); + + run(cache, "INSERT INTO \"City\".City (_key, id, name, state, population) values " + + "(1, 1, 'Washington', 'DC', 2500000)"); + + List> res = run(cache, "select _key, id, name, state, population from \"City\".City"); + + assertEquals(Collections.singletonList(Arrays.asList(1, 1, "Washington", "DC", 2500000)), res); + + City city = cache.get(1); + + assertEquals(1, city.id()); + assertEquals("Washington", city.name()); + assertEquals("DC", city.state()); + + cache.destroy(); + } + + /** + * @return Node index to run queries on. + */ + protected abstract int nodeIndex(); + + /** + * Run specified statement expected to throw {@code IgniteSqlException} with expected specified message. + * @param sql Statement. + * @param msg Expected message. + */ + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + protected void assertThrows(final String sql, String msg) { + assertThrows(grid(nodeIndex()), sql, msg); + } + + /** + * Execute SQL command and return resulting dataset. + * @param sql Statement. + * @return result. + */ + protected List> run(String sql) { + return run(grid(nodeIndex()), sql); + } + + /** City class. */ + private final static class City { + /** City id. */ + @QuerySqlField + private int id; + + /** City name. */ + @QuerySqlField + private String name; + + /** City state. */ + @QuerySqlField + private String state; + + /** + * @return City id. + */ + public int id() { + return id; + } + + /** + * @param id City id. + */ + public void id(int id) { + this.id = id; + } + + /** + * @return City name. + */ + public String name() { + return name; + } + + /** + * @param name City name. + */ + public void name(String name) { + this.name = name; + } + + /** + * @return City state. + */ + public String state() { + return state; + } + + /** + * @param state City state. + */ + public void state(String state) { + this.state = state; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsClientBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsClientBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsClientBasicSelfTest.java new file mode 100644 index 0000000..1972f2c --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsClientBasicSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** + * Test to check {@code ALTER TABLE} operations started from client node. + */ +public class H2DynamicColumnsClientBasicSelfTest extends H2DynamicColumnsAbstractBasicSelfTest { + /** {@inheritDoc} */ + @Override protected int nodeIndex() { + return CLI_IDX; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerBasicSelfTest.java new file mode 100644 index 0000000..fa15752 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerBasicSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** + * Test to check {@code ALTER TABLE} operations started from server node. + */ +public class H2DynamicColumnsServerBasicSelfTest extends H2DynamicColumnsAbstractBasicSelfTest { + /** {@inheritDoc} */ + @Override protected int nodeIndex() { + return SRV_IDX; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerCoordinatorBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerCoordinatorBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerCoordinatorBasicSelfTest.java new file mode 100644 index 0000000..db7f75c --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicColumnsServerCoordinatorBasicSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** + * Test to check {@code ALTER TABLE} operations started from coordinator node. + */ +public class H2DynamicColumnsServerCoordinatorBasicSelfTest extends H2DynamicColumnsAbstractBasicSelfTest { + /** {@inheritDoc} */ + @Override protected int nodeIndex() { + return SRV_CRD_IDX; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java index 329e580..770a375 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java @@ -599,7 +599,7 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { IgniteSQLException.class, "No cache value related columns found"); assertParseThrows("create table Person (id int primary key, age int not null) WITH \"cacheTemplate=cache\"", - IgniteSQLException.class, "Non nullable columns are forbidden"); + IgniteSQLException.class, "Non nullable columns are not supported [colName=AGE]"); assertParseThrows("create table Person (id int primary key, age int unique) WITH \"template=cache\"", IgniteSQLException.class, "Too many constraints - only PRIMARY KEY is supported for CREATE TABLE"); @@ -620,6 +620,44 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { IgniteSQLException.class, "Direct specification of _KEY and _VAL columns is forbidden"); } + /** */ + public void testParseAlterTableAddColumn() throws Exception { + assertAlterTableAddColumnEquals(buildAlterTableAddColumn("SCH2", "Person", false, false, + c("COMPANY", Value.STRING)), "ALTER TABLE SCH2.Person ADD company varchar"); + + assertAlterTableAddColumnEquals(buildAlterTableAddColumn("SCH2", "Person", true, true, + c("COMPANY", Value.STRING)), "ALTER TABLE IF EXISTS SCH2.Person ADD if not exists company varchar"); + + assertAlterTableAddColumnEquals(buildAlterTableAddColumn("SCH2", "Person", false, true, + c("COMPANY", Value.STRING), c("city", Value.STRING)), + "ALTER TABLE IF EXISTS SCH2.Person ADD (company varchar, \"city\" varchar)"); + + assertAlterTableAddColumnEquals(buildAlterTableAddColumn("SCH2", "City", false, true, + c("POPULATION", Value.INT)), "ALTER TABLE IF EXISTS SCH2.\"City\" ADD (population int)"); + + // There's no table with such name, but H2 parsing does not fail just yet. + assertAlterTableAddColumnEquals(buildAlterTableAddColumn("SCH2", "City", false, false, + c("POPULATION", Value.INT)), "ALTER TABLE SCH2.\"City\" ADD (population int)"); + + assertAlterTableAddColumnEquals(buildAlterTableAddColumn("SCH2", "Person", true, false, + c("NAME", Value.STRING)), "ALTER TABLE SCH2.Person ADD if not exists name varchar"); + + // There's a column with such name, but H2 parsing does not fail just yet. + assertAlterTableAddColumnEquals(buildAlterTableAddColumn("SCH2", "Person", false, false, + c("NAME", Value.STRING)), "ALTER TABLE SCH2.Person ADD name varchar"); + + // IF NOT EXISTS with multiple columns. + assertParseThrows("ALTER TABLE IF EXISTS SCH2.Person ADD if not exists (company varchar, city varchar)", + DbException.class, null); + + // Both BEFORE and AFTER keywords. + assertParseThrows("ALTER TABLE IF EXISTS SCH2.Person ADD if not exists company varchar before addrid", + IgniteSQLException.class, "ALTER TABLE ADD COLUMN BEFORE/AFTER is not supported"); + + // No such schema. + assertParseThrows("ALTER TABLE SCH3.\"Person\" ADD (city varchar)", DbException.class, null); + } + /** * @param sql Statement. * @param exCls Exception class. @@ -747,6 +785,58 @@ public class GridQueryParsingTest extends GridCommonAbstractTest { } /** + * Parse SQL and compare it to expected instance of ALTER TABLE. + */ + private void assertAlterTableAddColumnEquals(GridSqlAlterTableAddColumn exp, String sql) throws Exception { + Prepared prepared = parse(sql); + + GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared); + + assertTrue(stmt instanceof GridSqlAlterTableAddColumn); + + assertAlterTableAddColumnEquals(exp, (GridSqlAlterTableAddColumn)stmt); + } + + /** */ + private static GridSqlAlterTableAddColumn buildAlterTableAddColumn(String schema, String tbl, + boolean ifNotExists, boolean ifTblExists, GridSqlColumn... cols) { + GridSqlAlterTableAddColumn res = new GridSqlAlterTableAddColumn(); + + res.schemaName(schema); + + res.tableName(tbl); + + res.ifNotExists(ifNotExists); + + res.ifTableExists(ifTblExists); + + res.columns(cols); + + return res; + } + + /** + * Test two instances of {@link GridSqlAlterTableAddColumn} for equality. + */ + private static void assertAlterTableAddColumnEquals(GridSqlAlterTableAddColumn exp, + GridSqlAlterTableAddColumn actual) { + assertEqualsIgnoreCase(exp.schemaName(), actual.schemaName()); + assertEqualsIgnoreCase(exp.tableName(), actual.tableName()); + assertEquals(exp.columns().length, actual.columns().length); + + for (int i = 0; i < exp.columns().length; i++) { + GridSqlColumn expCol = exp.columns()[i]; + GridSqlColumn col = actual.columns()[i]; + + assertEquals(expCol.columnName(), col.columnName()); + assertEquals(expCol.column().getType(), col.column().getType()); + } + + assertEquals(exp.ifNotExists(), actual.ifNotExists()); + assertEquals(exp.ifTableExists(), actual.ifTableExists()); + } + + /** * @param name Column name. * @param type Column data type. * @return {@link GridSqlColumn} with given name and type. http://git-wip-us.apache.org/repos/asf/ignite/blob/adec3e7e/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 5ac0655f..947bb4d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -89,11 +89,18 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQueryP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest; import org.apache.ignite.internal.processors.cache.index.DuplicateKeyValueClassesSelfTest; +import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicPartitionedSelfTest; +import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentAtomicReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentTransactionalPartitionedSelfTest; +import org.apache.ignite.internal.processors.cache.index.DynamicColumnsConcurrentTransactionalReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicIndexClientBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerCoordinatorBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerNodeFIlterBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.DynamicIndexServerNodeFilterCoordinatorBasicSelfTest; +import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsClientBasicSelfTest; +import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsServerBasicSelfTest; +import org.apache.ignite.internal.processors.cache.index.H2DynamicColumnsServerCoordinatorBasicSelfTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexAtomicPartitionedNearSelfTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexAtomicPartitionedSelfTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexAtomicReplicatedSelfTest; @@ -250,6 +257,13 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(H2DynamicIndexAtomicPartitionedSelfTest.class); suite.addTestSuite(H2DynamicIndexAtomicPartitionedNearSelfTest.class); suite.addTestSuite(H2DynamicTableSelfTest.class); + suite.addTestSuite(H2DynamicColumnsClientBasicSelfTest.class); + suite.addTestSuite(H2DynamicColumnsServerBasicSelfTest.class); + suite.addTestSuite(H2DynamicColumnsServerCoordinatorBasicSelfTest.class); + suite.addTestSuite(DynamicColumnsConcurrentAtomicPartitionedSelfTest.class); + suite.addTestSuite(DynamicColumnsConcurrentTransactionalPartitionedSelfTest.class); + suite.addTestSuite(DynamicColumnsConcurrentAtomicReplicatedSelfTest.class); + suite.addTestSuite(DynamicColumnsConcurrentTransactionalReplicatedSelfTest.class); // DML+DDL. suite.addTestSuite(H2DynamicIndexingComplexClientAtomicPartitionedTest.class);