ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-4997: DDL: Fixed schema state replay on client reconnect. This closes #1845.
Date Fri, 21 Apr 2017 08:16:02 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 8ad5a945d -> 3eb52a8a4


IGNITE-4997: DDL: Fixed schema state replay on client reconnect. This closes #1845.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3eb52a8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3eb52a8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3eb52a8a

Branch: refs/heads/ignite-2.0
Commit: 3eb52a8a4d42add524051a9611b1b7d1a1d17398
Parents: 8ad5a94
Author: Alexander Paschenko <alexander.a.paschenko@gmail.com>
Authored: Fri Apr 21 11:15:55 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Fri Apr 21 11:15:55 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  12 ++
 .../processors/query/GridQueryProcessor.java    | 173 ++++++++++---------
 .../cache/index/AbstractSchemaSelfTest.java     |  44 +++--
 .../DynamicIndexAbstractConcurrentSelfTest.java | 122 +++++++++++++
 .../cache/index/SchemaExchangeSelfTest.java     |  57 +++++-
 5 files changed, 307 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index da6ebc1..28ef22f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1185,6 +1185,18 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 cache.onReconnected();
 
                 reconnected.add(cache);
+
+                if (!sysCache) {
+                    // Re-create cache structures inside indexing in order to apply recent
schema changes.
+                    GridCacheContext cctx = cache.context();
+
+                    DynamicCacheDescriptor desc = cacheDescriptor(name);
+
+                    assert desc != null;
+
+                    ctx.query().onCacheStop0(cctx.name());
+                    ctx.query().onCacheStart0(cctx, desc.schema());
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 8381882..e0dc387 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -652,113 +652,122 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Create type descriptors from schema and initialize indexing for given cache.<p>
+     * Use with {@link #busyLock} where appropriate.
      * @param cctx Cache context.
      * @param schema Initial schema.
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings({"deprecation", "ThrowableResultOfMethodCallIgnored"})
-    private void initializeCache(GridCacheContext<?, ?> cctx, QuerySchema schema) throws
IgniteCheckedException {
-        String space = cctx.name();
+    public void onCacheStart0(GridCacheContext<?, ?> cctx, QuerySchema schema)
+        throws IgniteCheckedException {
 
-        // Prepare candidates.
-        List<Class<?>> mustDeserializeClss = new ArrayList<>();
+        cctx.shared().database().checkpointReadLock();
 
-        Collection<QueryTypeCandidate> cands = new ArrayList<>();
+        try {
+            synchronized (stateMux) {
+                String space = cctx.name();
 
-        Collection<QueryEntity> qryEntities = schema.entities();
+                // Prepare candidates.
+                List<Class<?>> mustDeserializeClss = new ArrayList<>();
 
-        if (!F.isEmpty(qryEntities)) {
-            for (QueryEntity qryEntity : qryEntities) {
-                QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx, qryEntity,
mustDeserializeClss);
+                Collection<QueryTypeCandidate> cands = new ArrayList<>();
 
-                cands.add(cand);
-            }
-        }
+                Collection<QueryEntity> qryEntities = schema.entities();
 
-        // Ensure that candidates has unique index names. Otherwise we will not be able to
apply pending operations.
-        Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
-        Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
+                if (!F.isEmpty(qryEntities)) {
+                    for (QueryEntity qryEntity : qryEntities) {
+                        QueryTypeCandidate cand = QueryUtils.typeForQueryEntity(space, cctx,
qryEntity,
+                            mustDeserializeClss);
 
-        for (QueryTypeCandidate cand : cands) {
-            QueryTypeDescriptorImpl desc = cand.descriptor();
+                        cands.add(cand);
+                    }
+                }
 
-            QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc);
+                // Ensure that candidates has unique index names. Otherwise we will not be
able to apply pending operations.
+                Map<String, QueryTypeDescriptorImpl> tblTypMap = new HashMap<>();
+                Map<String, QueryTypeDescriptorImpl> idxTypMap = new HashMap<>();
 
-            if (oldDesc != null)
-                throw new IgniteException("Duplicate table name [cache=" + space +
-                    ", tblName=" + desc.tableName() + ", type1=" + desc.name() + ", type2="
+ oldDesc.name() + ']');
+                for (QueryTypeCandidate cand : cands) {
+                    QueryTypeDescriptorImpl desc = cand.descriptor();
 
-            for (String idxName : desc.indexes().keySet()) {
-                oldDesc = idxTypMap.put(idxName, desc);
+                    QueryTypeDescriptorImpl oldDesc = tblTypMap.put(desc.tableName(), desc);
 
-                if (oldDesc != null)
-                    throw new IgniteException("Duplicate index name [cache=" + space +
-                        ", idxName=" + idxName + ", type1=" + desc.name() + ", type2=" +
oldDesc.name() + ']');
-            }
-        }
+                    if (oldDesc != null)
+                        throw new IgniteException("Duplicate table name [cache=" + space
+
+                            ", tblName=" + desc.tableName() + ", type1=" + desc.name() +
", type2=" + oldDesc.name() + ']');
 
-        // Apply pending operation which could have been completed as no-op at this point.
There could be only one
-        // in-flight operation for a cache.
-        synchronized (stateMux) {
-            if (disconnected)
-                return;
+                    for (String idxName : desc.indexes().keySet()) {
+                        oldDesc = idxTypMap.put(idxName, desc);
 
-            for (SchemaOperation op : schemaOps.values()) {
-                if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId()))
{
-                    if (op.started()) {
-                        SchemaOperationWorker worker = op.manager().worker();
+                        if (oldDesc != null)
+                            throw new IgniteException("Duplicate index name [cache=" + space
+
+                                ", idxName=" + idxName + ", type1=" + desc.name() + ", type2="
+ oldDesc.name() + ']');
+                    }
+                }
 
-                        assert !worker.cacheRegistered();
+                // Apply pending operation which could have been completed as no-op at this
point.
+                // There could be only one in-flight operation for a cache.
+                for (SchemaOperation op : schemaOps.values()) {
+                    if (F.eq(op.proposeMessage().deploymentId(), cctx.dynamicDeploymentId()))
{
+                        if (op.started()) {
+                            SchemaOperationWorker worker = op.manager().worker();
 
-                        if (!worker.nop()) {
-                            IgniteInternalFuture fut = worker.future();
+                            assert !worker.cacheRegistered();
 
-                            assert fut.isDone();
+                            if (!worker.nop()) {
+                                IgniteInternalFuture fut = worker.future();
 
-                            if (fut.error() == null) {
-                                SchemaAbstractOperation op0 = op.proposeMessage().operation();
+                                assert fut.isDone();
 
-                                if (op0 instanceof SchemaIndexCreateOperation) {
-                                    SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation)op0;
+                                if (fut.error() == null) {
+                                    SchemaAbstractOperation op0 = op.proposeMessage().operation();
 
-                                    QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName());
+                                    if (op0 instanceof SchemaIndexCreateOperation) {
+                                        SchemaIndexCreateOperation opCreate = (SchemaIndexCreateOperation)
op0;
 
-                                    assert typeDesc != null;
+                                        QueryTypeDescriptorImpl typeDesc = tblTypMap.get(opCreate.tableName());
 
-                                    QueryUtils.processDynamicIndexChange(opCreate.indexName(),
opCreate.index(),
-                                        typeDesc);
-                                }
-                                else if (op0 instanceof SchemaIndexDropOperation) {
-                                    SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation)op0;
+                                        assert typeDesc != null;
+
+                                        QueryUtils.processDynamicIndexChange(opCreate.indexName(),
opCreate.index(),
+                                            typeDesc);
+                                    }
+                                    else if (op0 instanceof SchemaIndexDropOperation) {
+                                        SchemaIndexDropOperation opDrop = (SchemaIndexDropOperation)
op0;
 
-                                    QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName());
+                                        QueryTypeDescriptorImpl typeDesc = idxTypMap.get(opDrop.indexName());
 
-                                    assert typeDesc != null;
+                                        assert typeDesc != null;
 
-                                    QueryUtils.processDynamicIndexChange(opDrop.indexName(),
null, typeDesc);
+                                        QueryUtils.processDynamicIndexChange(opDrop.indexName(),
null, typeDesc);
+                                    }
+                                    else
+                                        assert false;
                                 }
-                                else
-                                    assert false;
                             }
                         }
+
+                        break;
                     }
+                }
 
-                    break;
+                // Ready to register at this point.
+                registerCache0(space, cctx, cands);
+
+                // Warn about possible implicit deserialization.
+                if (!mustDeserializeClss.isEmpty()) {
+                    U.warn(log, "Some classes in query configuration cannot be written in
binary format " +
+                        "because they either implement Externalizable interface or have writeObject/readObject
" +
+                        "methods. Instances of these classes will be deserialized in order
to build indexes. Please " +
+                        "ensure that all nodes have these classes in classpath. To enable
binary serialization " +
+                        "either implement " + Binarylizable.class.getSimpleName() + " interface
or set explicit " +
+                        "serializer using BinaryTypeConfiguration.setSerializer() method:
" + mustDeserializeClss);
                 }
             }
         }
-
-        // Ready to register at this point.
-        registerCache0(space, cctx, cands);
-
-        // Warn about possible implicit deserialization.
-        if (!mustDeserializeClss.isEmpty()) {
-            U.warn(log, "Some classes in query configuration cannot be written in binary
format " +
-                "because they either implement Externalizable interface or have writeObject/readObject
methods. " +
-                "Instances of these classes will be deserialized in order to build indexes.
Please ensure that " +
-                "all nodes have these classes in classpath. To enable binary serialization
either implement " +
-                Binarylizable.class.getSimpleName() + " interface or set explicit serializer
using " +
-                "BinaryTypeConfiguration.setSerializer() method: " + mustDeserializeClss);
+        finally {
+            cctx.shared().database().checkpointReadUnlock();
         }
     }
 
@@ -780,7 +789,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             schemaOps.clear();
         }
 
-        // Complete client futures outside of synchonized block because they may have listeners/chains.
+        // Complete client futures outside of synchronized block because they may have listeners/chains.
         for (SchemaOperationClientFuture fut : futs)
             fut.onDone(new SchemaOperationException("Client node is disconnected (operation
result is unknown)."));
 
@@ -804,14 +813,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (!busyLock.enterBusy())
             return;
 
-        cctx.shared().database().checkpointReadLock();
-
         try {
-            initializeCache(cctx, schema);
+            onCacheStart0(cctx, schema);
         }
         finally {
-            cctx.shared().database().checkpointReadUnlock();
-
             busyLock.leaveBusy();
         }
     }
@@ -827,7 +832,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             return;
 
         try {
-            unregisterCache0(cctx.name());
+            onCacheStop0(cctx.name());
         }
         finally {
             busyLock.leaveBusy();
@@ -1302,7 +1307,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 spaces.add(CU.mask(space));
             }
             catch (IgniteCheckedException | RuntimeException e) {
-                unregisterCache0(space);
+                onCacheStop0(space);
 
                 throw e;
             }
@@ -1310,12 +1315,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Unregister cache.
+     * Unregister cache.<p>
+     * Use with {@link #busyLock} where appropriate.
      *
      * @param space Space.
      */
-    private void unregisterCache0(String space) {
-        assert idx != null;
+    public void onCacheStop0(String space) {
+        if (idx == null)
+            return;
 
         synchronized (stateMux) {
             // Clear types.
@@ -1464,8 +1471,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
                     fut.onDone(e);
 
-                    if (e instanceof Error)
-                        throw e;
+                    throw e;
                 }
             }
         };
@@ -1547,6 +1553,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Type descriptor if found.
      * @throws IgniteCheckedException If type check failed.
      */
+    @SuppressWarnings("ConstantConditions")
     @Nullable private QueryTypeDescriptorImpl typeByValue(CacheObjectContext coctx,
         KeyCacheObject key,
         CacheObject val,

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
index e228026..19d6f54 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/AbstractSchemaSelfTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl;
@@ -131,7 +132,7 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Assert index state on all nodes.
+     * Assert index state on all <b>affinity</b> nodes.
      *
      * @param cacheName Cache name.
      * @param tblName Table name.
@@ -140,25 +141,44 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest {
      */
     protected static void assertIndex(String cacheName, String tblName, String idxName,
         IgniteBiTuple<String, Boolean>... fields) {
+        assertIndex(cacheName, false, tblName, idxName, fields);
+    }
+
+    /**
+     * Assert index state on all nodes.
+     *
+     * @param cacheName Cache name.
+     * @param checkNonAffinityNodes Whether existence of {@link GridQueryIndexDescriptor}
must be checked on non
+     *     affinity nodes as well.
+     * @param tblName Table name.
+     * @param idxName Index name.
+     * @param fields Fields.
+     */
+    protected static void assertIndex(String cacheName, boolean checkNonAffinityNodes, String
tblName, String idxName,
+        IgniteBiTuple<String, Boolean>... fields) {
         for (Ignite node : Ignition.allGrids())
-            assertIndex((IgniteEx)node, cacheName, tblName, idxName, fields);
+            assertIndex(node, checkNonAffinityNodes, cacheName, tblName, idxName, fields);
     }
 
     /**
      * Assert index state on particular node.
      *
      * @param node Node.
+     * @param checkNonAffinityNode Whether existence of {@link GridQueryIndexDescriptor}
must be checked regardless of
+     * whether this node is affinity node or not.
      * @param cacheName Cache name.
      * @param tblName Table name.
      * @param idxName Index name.
      * @param fields Fields.
      */
-    protected static void assertIndex(IgniteEx node, String cacheName, String tblName, String
idxName,
-        IgniteBiTuple<String, Boolean>... fields) {
-        assertIndexDescriptor(node, cacheName, tblName, idxName, fields);
+    protected static void assertIndex(Ignite node, boolean checkNonAffinityNode, String cacheName,
String tblName,
+        String idxName, IgniteBiTuple<String, Boolean>... fields) {
+        IgniteEx node0 = (IgniteEx)node;
+
+        assertIndexDescriptor(node0, cacheName, tblName, idxName, fields);
 
-        if (affinityNode(node, cacheName)) {
-            QueryTypeDescriptorImpl typeDesc = typeExisting(node, cacheName, tblName);
+        if (checkNonAffinityNode || affinityNode(node0, cacheName)) {
+            QueryTypeDescriptorImpl typeDesc = typeExisting(node0, cacheName, tblName);
 
             assertIndex(typeDesc, idxName, fields);
         }
@@ -263,11 +283,13 @@ public class AbstractSchemaSelfTest extends GridCommonAbstractTest {
      * @param tblName Table name.
      * @param idxName Index name.
      */
-    protected static void assertNoIndex(IgniteEx node, String cacheName, String tblName,
String idxName) {
-        assertNoIndexDescriptor(node, cacheName, idxName);
+    protected static void assertNoIndex(Ignite node, String cacheName, String tblName, String
idxName) {
+        IgniteEx node0 = (IgniteEx)node;
+
+        assertNoIndexDescriptor(node0, cacheName, idxName);
 
-        if (affinityNode(node, cacheName)) {
-            QueryTypeDescriptorImpl typeDesc = typeExisting(node, cacheName, tblName);
+        if (affinityNode(node0, cacheName)) {
+            QueryTypeDescriptorImpl typeDesc = typeExisting(node0, cacheName, tblName);
 
             assertNoIndex(typeDesc, idxName);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
index d2a2f49..5976615 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.index;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -28,6 +29,7 @@ import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteClientReconnectAbstractTest;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -47,6 +49,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi;
+
 /**
  * Concurrency tests for dynamic index create/drop.
  */
@@ -111,6 +115,11 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends
DynamicInde
         return ccfg.setCacheMode(cacheMode).setAtomicityMode(atomicityMode);
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration commonConfiguration(int idx) throws Exception
{
+        return super.commonConfiguration(idx).setDiscoverySpi(new TestTcpDiscoverySpi());
+    }
+
     /**
      * Make sure that coordinator migrates correctly between nodes.
      *
@@ -600,6 +609,119 @@ public abstract class DynamicIndexAbstractConcurrentSelfTest extends
DynamicInde
     }
 
     /**
+     * Make sure that client receives schema changes made while it was disconnected.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientReconnect() throws Exception {
+        checkClientReconnect(false);
+    }
+
+    /**
+     * Make sure that client receives schema changes made while it was disconnected, even
with cache recreation.
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientReconnectWithCacheRestart() throws Exception {
+        checkClientReconnect(true);
+    }
+
+    /**
+     * Make sure that client receives schema changes made while it was disconnected, optionally
with cache restart
+     * in the interim.
+     *
+     * @param restartCache Whether cache needs to be recreated during client's absence.
+     * @throws Exception If failed.
+     */
+    private void checkClientReconnect(final boolean restartCache) throws Exception {
+        // Start complex topology.
+        final Ignite srv = Ignition.start(serverConfiguration(1));
+        Ignition.start(serverConfiguration(2));
+        Ignition.start(serverConfiguration(3, true));
+
+        final Ignite cli = Ignition.start(clientConfiguration(4));
+
+        cli.createCache(cacheConfiguration());
+
+        // Check index create.
+        reconnectClientNode(srv, cli, restartCache, new RunnableX() {
+            @Override public void run() throws Exception {
+                final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+                queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
+            }
+        });
+
+        assertIndex(cli, true, CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+        assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+
+        // Check index drop.
+        reconnectClientNode(srv, cli, restartCache, new RunnableX() {
+            @Override public void run() throws Exception {
+                if (!restartCache)
+                    queryProcessor(srv).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, false).get();
+            }
+        });
+
+        assertNoIndex(cli, CACHE_NAME, TBL_NAME, IDX_NAME_1);
+        assertIndexNotUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1);
+
+        // Update existing index.
+        QueryIndex idx = index(IDX_NAME_2, field(alias(FIELD_NAME_2)));
+
+        queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
+
+        assertIndex(cli, true, CACHE_NAME, TBL_NAME, IDX_NAME_2, field(alias(FIELD_NAME_2)));
+        assertIndexUsed(IDX_NAME_2, SQL_SIMPLE_FIELD_2, SQL_ARG_2);
+
+        reconnectClientNode(srv, cli, restartCache, new RunnableX() {
+            @Override public void run() throws Exception {
+                if (!restartCache)
+                    queryProcessor(srv).dynamicIndexDrop(CACHE_NAME, IDX_NAME_2, false).get();
+
+                final QueryIndex idx = index(IDX_NAME_2, field(FIELD_NAME_1), field(alias(FIELD_NAME_2)));
+
+                queryProcessor(srv).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false);
+            }
+        });
+
+        assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(FIELD_NAME_1), field(alias(FIELD_NAME_2)));
+        assertIndexUsed(IDX_NAME_2, SQL_COMPOSITE, SQL_ARG_1, SQL_ARG_2);
+    }
+
+    /**
+     * Reconnect the client and run specified actions while it's out.
+     *
+     * @param srvNode Server node.
+     * @param cliNode Client node.
+     * @param restart Whether cache has to be recreated prior to executing required actions.
+     * @param clo Closure to run
+     * @throws Exception If failed.
+     */
+    private void reconnectClientNode(final Ignite srvNode, final Ignite cliNode, final boolean
restart,
+        final RunnableX clo) throws Exception {
+        IgniteClientReconnectAbstractTest.reconnectClientNode(log, cliNode, srvNode, new
Runnable() {
+            @Override public void run() {
+                if (restart) {
+                    srvNode.destroyCache(CACHE_NAME);
+
+                    srvNode.getOrCreateCache(cacheConfiguration().setName(CACHE_NAME));
+                }
+
+                try {
+                    clo.run();
+                }
+                catch (Exception e) {
+                    throw new IgniteException("Test reconnect runnable failed.", e);
+                }
+            }
+        });
+
+        if (restart)
+            cliNode.cache(CACHE_NAME);
+    }
+
+    /**
      * Test concurrent node start/stop along with index operations. Nothing should hang.
      *
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3eb52a8a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
index 95ad2f1..5a00345 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/SchemaExchangeSelfTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.index;
 
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -34,6 +37,9 @@ import org.apache.ignite.testframework.GridTestUtils;
 import java.util.Collections;
 import java.util.Map;
 
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.TestTcpDiscoverySpi;
+import static org.apache.ignite.internal.IgniteClientReconnectAbstractTest.reconnectClientNode;
+
 /**
  * Tests for schema exchange between nodes.
  */
@@ -396,11 +402,11 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
     }
 
     /**
-     * Test client reconnect.
+     * Test client reconnect after server restart accompanied by schema change.
      *
      * @throws Exception If failed.
      */
-    public void testClientReconnect() throws Exception {
+    public void testServerRestartWithNewTypes() throws Exception {
         IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
         assertTypes(node1, ValueClass.class);
 
@@ -412,11 +418,11 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
 
         stopGrid(1);
 
-        assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 return grid(2).context().clientDisconnected();
             }
-        }, 10_000L);
+        }, 10_000L));
 
         IgniteFuture reconnFut = null;
 
@@ -441,6 +447,40 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
     }
 
     /**
+     * Test client reconnect.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testClientReconnect() throws Exception {
+        final IgniteEx node1 = start(1, KeyClass.class, ValueClass.class);
+        assertTypes(node1, ValueClass.class);
+
+        final IgniteEx node2 = startClientNoCache(2);
+        assertTypes(node2);
+
+        node2.cache(CACHE_NAME);
+        assertTypes(node2, ValueClass.class);
+
+        reconnectClientNode(log, node2, node1, new Runnable() {
+            @Override public void run() {
+                assertTrue(node2.context().clientDisconnected());
+
+                final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1));
+
+                try {
+                    queryProcessor(node1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+
+        assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1));
+    }
+
+    /**
      * Ensure that only provided types exists for the given cache.
      *
      * @param node Node.
@@ -449,15 +489,15 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
     private static void assertTypes(IgniteEx node, Class... clss) {
         Map<String, QueryTypeDescriptorImpl> types = types(node, CACHE_NAME);
 
-        if (clss == null || clss.length == 0)
-            assert types.isEmpty();
+        if (F.isEmpty(clss))
+            assertTrue(types.isEmpty());
         else {
             assertEquals(clss.length, types.size());
 
             for (Class cls : clss) {
                 String tblName = tableName(cls);
 
-                assert types.containsKey(tblName);
+                assertTrue(types.containsKey(tblName));
             }
         }
     }
@@ -498,6 +538,7 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
         cfg.setClientMode(client);
         cfg.setLocalHost("127.0.0.1");
         cfg.setCacheConfiguration(cacheConfiguration(clss));
+        cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
 
         if (filterNodeName != null && F.eq(name, filterNodeName))
             cfg.setUserAttributes(Collections.singletonMap("AFF_NODE", true));
@@ -543,6 +584,8 @@ public class SchemaExchangeSelfTest extends AbstractSchemaSelfTest {
         cfg.setClientMode(client);
         cfg.setLocalHost("127.0.0.1");
 
+        cfg.setDiscoverySpi(new TestTcpDiscoverySpi());
+
         return (IgniteEx)Ignition.start(cfg);
     }
 


Mime
View raw message