ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [34/70] [abbrv] ignite git commit: IGNITE-4565: Implemented CREATE INDEX and DROP INDEX. This closes #1773. This closes #1804.
Date Thu, 20 Apr 2017 07:49:07 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
index 7677d0d..119a389 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryTypeDescriptorImpl.java
@@ -23,7 +23,10 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -33,6 +36,9 @@ import java.util.Map;
  * Descriptor of type.
  */
 public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
+    /** Space. */
+    private final String space;
+
     /** */
     private String name;
 
@@ -50,9 +56,15 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
     /** Map with upper cased property names to help find properties based on SQL INSERT and MERGE queries. */
     private final Map<String, GridQueryProperty> uppercaseProps = new HashMap<>();
 
+    /** Mutex for operations on indexes. */
+    private final Object idxMux = new Object();
+
     /** */
     @GridToStringInclude
-    private final Map<String, QueryIndexDescriptorImpl> indexes = new HashMap<>();
+    private final Map<String, QueryIndexDescriptorImpl> idxs = new HashMap<>();
+
+    /** Aliases. */
+    private Map<String, String> aliases;
 
     /** */
     private QueryIndexDescriptorImpl fullTextIdx;
@@ -78,6 +90,25 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
     /** */
     private String affKey;
 
+    /** Obsolete. */
+    private volatile boolean obsolete;
+
+    /**
+     * Constructor.
+     *
+     * @param space Cache name.
+     */
+    public QueryTypeDescriptorImpl(String space) {
+        this.space = space;
+    }
+
+    /**
+     * @return Space.
+     */
+    public String space() {
+        return space;
+    }
+
     /** {@inheritDoc} */
     @Override public String name() {
         return name;
@@ -97,7 +128,7 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
      * @return Table name.
      */
     @Override public String tableName() {
-        return tblName;
+        return tblName != null ? tblName : name;
     }
 
     /**
@@ -160,7 +191,9 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
 
     /** {@inheritDoc} */
     @Override public Map<String, GridQueryIndexDescriptor> indexes() {
-        return Collections.<String, GridQueryIndexDescriptor>unmodifiableMap(indexes);
+        synchronized (idxMux) {
+            return Collections.<String, GridQueryIndexDescriptor>unmodifiableMap(idxs);
+        }
     }
 
     /** {@inheritDoc} */
@@ -176,59 +209,74 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
     }
 
     /**
-     * Adds index.
+     * Get index by name.
      *
-     * @param idxName Index name.
-     * @param type Index type.
-     * @param inlineSize Inline size.
-     * @return Index descriptor.
-     * @throws IgniteCheckedException In case of error.
+     * @param name Name.
+     * @return Index.
      */
-    public QueryIndexDescriptorImpl addIndex(String idxName, QueryIndexType type, int inlineSize) throws IgniteCheckedException {
-        QueryIndexDescriptorImpl idx = new QueryIndexDescriptorImpl(type, inlineSize);
+    @Nullable public QueryIndexDescriptorImpl index(String name) {
+        synchronized (idxMux) {
+            return idxs.get(name);
+        }
+    }
 
-        if (indexes.put(idxName, idx) != null)
-            throw new IgniteCheckedException("Index with name '" + idxName + "' already exists.");
+    /**
+     * @return Raw index descriptors.
+     */
+    public Collection<QueryIndexDescriptorImpl> indexes0() {
+        synchronized (idxMux) {
+            return new ArrayList<>(idxs.values());
+        }
+    }
 
-        return idx;
+    /** {@inheritDoc} */
+    @Override public GridQueryIndexDescriptor textIndex() {
+        return fullTextIdx;
     }
 
     /**
-     * Adds field to index.
+     * Add index.
      *
-     * @param idxName Index name.
-     * @param field Field name.
-     * @param orderNum Fields order number in index.
-     * @param inlineSize Inline size.
-     * @param descending Sorting order.
+     * @param idx Index.
      * @throws IgniteCheckedException If failed.
      */
-    public void addFieldToIndex(
-        String idxName,
-        String field,
-        int orderNum,
-        int inlineSize,
-        boolean descending
-    ) throws IgniteCheckedException {
-        QueryIndexDescriptorImpl desc = indexes.get(idxName);
+    public void addIndex(QueryIndexDescriptorImpl idx) throws IgniteCheckedException {
+        synchronized (idxMux) {
+            if (idxs.put(idx.name(), idx) != null)
+                throw new IgniteCheckedException("Index with name '" + idx.name() + "' already exists.");
+        }
+    }
 
-        if (desc == null)
-            desc = addIndex(idxName, QueryIndexType.SORTED, inlineSize);
+    /**
+     * Drop index.
+     *
+     * @param idxName Index name.
+     */
+    public void dropIndex(String idxName) {
+        synchronized (idxMux) {
+            idxs.remove(idxName);
+        }
+    }
 
-        desc.addField(field, orderNum, descending);
+    /**
+     * Chedk if particular field exists.
+     *
+     * @param field Field.
+     * @return {@code True} if exists.
+     */
+    public boolean hasField(String field) {
+        return props.containsKey(field) || QueryUtils._VAL.equalsIgnoreCase(field);
     }
 
     /**
      * Adds field to text index.
      *
      * @param field Field name.
+     * @throws IgniteCheckedException If failed.
      */
-    public void addFieldToTextIndex(String field) {
-        if (fullTextIdx == null) {
-            fullTextIdx = new QueryIndexDescriptorImpl(QueryIndexType.FULLTEXT, 0);
-
-            indexes.put(null, fullTextIdx);
-        }
+    public void addFieldToTextIndex(String field) throws IgniteCheckedException {
+        if (fullTextIdx == null)
+            fullTextIdx = new QueryIndexDescriptorImpl(this, null, QueryIndexType.FULLTEXT, 0);
 
         fullTextIdx.addField(field, 0, false);
     }
@@ -335,6 +383,34 @@ public class QueryTypeDescriptorImpl implements GridQueryTypeDescriptor {
         this.affKey = affKey;
     }
 
+    /**
+     * @return Aliases.
+     */
+    public Map<String, String> aliases() {
+        return aliases != null ? aliases : Collections.<String, String>emptyMap();
+    }
+
+    /**
+     * @param aliases Aliases.
+     */
+    public void aliases(Map<String, String> aliases) {
+        this.aliases = aliases;
+    }
+
+    /**
+     * @return {@code True} if obsolete.
+     */
+    public boolean obsolete() {
+        return obsolete;
+    }
+
+    /**
+     * Mark index as obsolete.
+     */
+    public void markObsolete() {
+        obsolete = true;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(QueryTypeDescriptorImpl.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index f00cbd6..3a7437b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
 import org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
 import org.apache.ignite.internal.processors.query.property.QueryClassProperty;
 import org.apache.ignite.internal.processors.query.property.QueryFieldAccessor;
@@ -44,7 +45,6 @@ import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.sql.Time;
 import java.sql.Timestamp;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +52,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
+
 /**
  * Utility methods for queries.
  */
@@ -59,6 +62,9 @@ public class QueryUtils {
     /** */
     public static final String _VAL = "_val";
 
+    /** Discovery history size. */
+    private static final int DISCO_HIST_SIZE = getInteger(IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE, 1000);
+
     /** */
     private static final Class<?> GEOMETRY_CLASS = U.classForName("com.vividsolutions.jts.geom.Geometry", null);
 
@@ -82,6 +88,69 @@ public class QueryUtils {
     ));
 
     /**
+     * Get table name for entity.
+     *
+     * @param entity Entity.
+     * @return Table name.
+     */
+    public static String tableName(QueryEntity entity) {
+        String res = entity.getTableName();
+
+        if (res == null)
+            res = typeName(entity.getValueType());
+
+        return res;
+    }
+
+    /**
+     * Get index name.
+     *
+     * @param entity Query entity.
+     * @param idx Index.
+     * @return Index name.
+     */
+    public static String indexName(QueryEntity entity, QueryIndex idx) {
+        return indexName(tableName(entity), idx);
+    }
+
+    /**
+     * Get index name.
+     *
+     * @param tblName Table name.
+     * @param idx Index.
+     * @return Index name.
+     */
+    public static String indexName(String tblName, QueryIndex idx) {
+        String res = idx.getName();
+
+        if (res == null) {
+            StringBuilder idxName = new StringBuilder(tblName + "_");
+
+            for (Map.Entry<String, Boolean> field : idx.getFields().entrySet()) {
+                idxName.append(field.getKey());
+
+                idxName.append('_');
+                idxName.append(field.getValue() ? "asc_" : "desc_");
+            }
+
+            for (int i = 0; i < idxName.length(); i++) {
+                char ch = idxName.charAt(i);
+
+                if (Character.isWhitespace(ch))
+                    idxName.setCharAt(i, '_');
+                else
+                    idxName.setCharAt(i, Character.toLowerCase(ch));
+            }
+
+            idxName.append("idx");
+
+            return idxName.toString();
+        }
+
+        return res;
+    }
+
+    /**
      * Create type candidate for query entity.
      *
      * @param space Space.
@@ -103,7 +172,9 @@ public class QueryUtils {
 
         CacheObjectContext coCtx = binaryEnabled ? ctx.cacheObjects().contextForCache(ccfg) : null;
 
-        QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl();
+        QueryTypeDescriptorImpl desc = new QueryTypeDescriptorImpl(space);
+
+        desc.aliases(qryEntity.getAliases());
 
         // Key and value classes still can be available if they are primitive or JDK part.
         // We need that to set correct types for _key and _val columns.
@@ -206,11 +277,6 @@ public class QueryUtils {
      */
     public static void processBinaryMeta(GridKernalContext ctx, QueryEntity qryEntity, QueryTypeDescriptorImpl d)
         throws IgniteCheckedException {
-        Map<String,String> aliases = qryEntity.getAliases();
-
-        if (aliases == null)
-            aliases = Collections.emptyMap();
-
         Set<String> keyFields = qryEntity.getKeyFields();
 
         // We have to distinguish between empty and null keyFields when the key is not of SQL type -
@@ -239,7 +305,7 @@ public class QueryUtils {
                 isKeyField = (hasKeyFields ? keyFields.contains(entry.getKey()) : null);
 
             QueryBinaryProperty prop = buildBinaryProperty(ctx, entry.getKey(),
-                U.classForName(entry.getValue(), Object.class, true), aliases, isKeyField);
+                U.classForName(entry.getValue(), Object.class, true), d.aliases(), isKeyField);
 
             d.addProperty(prop, false);
         }
@@ -256,18 +322,13 @@ public class QueryUtils {
      */
     public static void processClassMeta(QueryEntity qryEntity, QueryTypeDescriptorImpl d, CacheObjectContext coCtx)
         throws IgniteCheckedException {
-        Map<String,String> aliases = qryEntity.getAliases();
-
-        if (aliases == null)
-            aliases = Collections.emptyMap();
-
         for (Map.Entry<String, String> entry : qryEntity.getFields().entrySet()) {
             QueryClassProperty prop = buildClassProperty(
                 d.keyClass(),
                 d.valueClass(),
                 entry.getKey(),
                 U.classForName(entry.getValue(), Object.class),
-                aliases,
+                d.aliases(),
                 coCtx);
 
             d.addProperty(prop, false);
@@ -275,7 +336,7 @@ public class QueryUtils {
 
         processIndexes(qryEntity, d);
     }
-    
+
     /**
      * Processes indexes based on query entity.
      *
@@ -285,53 +346,90 @@ public class QueryUtils {
      */
     private static void processIndexes(QueryEntity qryEntity, QueryTypeDescriptorImpl d) throws IgniteCheckedException {
         if (!F.isEmpty(qryEntity.getIndexes())) {
-            Map<String, String> aliases = qryEntity.getAliases();
+            for (QueryIndex idx : qryEntity.getIndexes())
+                processIndex(idx, d);
+        }
+    }
 
-            if (aliases == null)
-                aliases = Collections.emptyMap();
+    /**
+     * Process dynamic index change.
+     *
+     * @param idx Index.
+     * @param d Type descriptor to populate.
+     * @throws IgniteCheckedException If failed to build index information.
+     */
+    public static void processDynamicIndexChange(String idxName, @Nullable QueryIndex idx, QueryTypeDescriptorImpl d)
+        throws IgniteCheckedException {
+        d.dropIndex(idxName);
 
-            for (QueryIndex idx : qryEntity.getIndexes()) {
-                String idxName = idx.getName();
+        if (idx != null)
+            processIndex(idx, d);
+    }
 
-                if (idxName == null)
-                    idxName = QueryEntity.defaultIndexName(idx);
+    /**
+     * Create index descriptor.
+     *
+     * @param typeDesc Type descriptor.
+     * @param idx Index.
+     * @return Index descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static QueryIndexDescriptorImpl createIndexDescriptor(QueryTypeDescriptorImpl typeDesc, QueryIndex idx)
+        throws IgniteCheckedException {
+        String idxName = indexName(typeDesc.tableName(), idx);
+        QueryIndexType idxTyp = idx.getIndexType();
 
-                QueryIndexType idxTyp = idx.getIndexType();
+        assert idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL;
 
-                if (idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL) {
-                    d.addIndex(idxName, idxTyp, idx.getInlineSize());
+        QueryIndexDescriptorImpl res = new QueryIndexDescriptorImpl(typeDesc, idxName, idxTyp, idx.getInlineSize());
 
-                    int i = 0;
+        int i = 0;
 
-                    for (Map.Entry<String, Boolean> entry : idx.getFields().entrySet()) {
-                        String field = entry.getKey();
-                        boolean asc = entry.getValue();
+        for (Map.Entry<String, Boolean> entry : idx.getFields().entrySet()) {
+            String field = entry.getKey();
+            boolean asc = entry.getValue();
 
-                        String alias = aliases.get(field);
+            String alias = typeDesc.aliases().get(field);
 
-                        if (alias != null)
-                            field = alias;
+            if (alias != null)
+                field = alias;
 
-                        d.addFieldToIndex(idxName, field, i++, idx.getInlineSize(), !asc);
-                    }
-                }
-                else if (idxTyp == QueryIndexType.FULLTEXT){
-                    for (String field : idx.getFields().keySet()) {
-                        String alias = aliases.get(field);
+            res.addField(field, i++, !asc);
+        }
 
-                        if (alias != null)
-                            field = alias;
+        return res;
+    }
 
-                        d.addFieldToTextIndex(field);
-                    }
-                }
-                else if (idxTyp != null)
-                    throw new IllegalArgumentException("Unsupported index type [idx=" + idx.getName() +
-                        ", typ=" + idxTyp + ']');
-                else
-                    throw new IllegalArgumentException("Index type is not set: " + idx.getName());
+    /**
+     * Process single index.
+     *
+     * @param idx Index.
+     * @param d Type descriptor to populate.
+     * @throws IgniteCheckedException If failed to build index information.
+     */
+    private static void processIndex(QueryIndex idx, QueryTypeDescriptorImpl d) throws IgniteCheckedException {
+        QueryIndexType idxTyp = idx.getIndexType();
+
+        if (idxTyp == QueryIndexType.SORTED || idxTyp == QueryIndexType.GEOSPATIAL) {
+            QueryIndexDescriptorImpl idxDesc = createIndexDescriptor(d, idx);
+
+            d.addIndex(idxDesc);
+        }
+        else if (idxTyp == QueryIndexType.FULLTEXT){
+            for (String field : idx.getFields().keySet()) {
+                String alias = d.aliases().get(field);
+
+                if (alias != null)
+                    field = alias;
+
+                d.addFieldToTextIndex(field);
             }
         }
+        else if (idxTyp != null)
+            throw new IllegalArgumentException("Unsupported index type [idx=" + idx.getName() +
+                ", typ=" + idxTyp + ']');
+        else
+            throw new IllegalArgumentException("Index type is not set: " + idx.getName());
     }
     
     /**
@@ -674,6 +772,31 @@ public class QueryUtils {
     }
 
     /**
+     * Discovery history size.
+     *
+     * @return Discovery history size.
+     */
+    public static int discoveryHistorySize() {
+        return DISCO_HIST_SIZE;
+    }
+
+    /**
+     * Wrap schema exception if needed.
+     *
+     * @param e Original exception.
+     * @return Schema exception.
+     */
+    @Nullable public static SchemaOperationException wrapIfNeeded(@Nullable Exception e) {
+        if (e == null)
+            return null;
+
+        if (e instanceof SchemaOperationException)
+            return (SchemaOperationException)e;
+
+        return new SchemaOperationException("Unexpected exception.", e);
+    }
+
+    /**
      * Private constructor.
      */
     private QueryUtils() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
new file mode 100644
index 0000000..f97f931
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaExchangeWorkerTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaAbstractDiscoveryMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Cache schema change task for exchange worker.
+ */
+public class SchemaExchangeWorkerTask implements CachePartitionExchangeWorkerTask {
+    /** Message. */
+    private final SchemaAbstractDiscoveryMessage msg;
+
+    /**
+     * Constructor.
+     *
+     * @param msg Message.
+     */
+    public SchemaExchangeWorkerTask(SchemaAbstractDiscoveryMessage msg) {
+        assert msg != null;
+
+        this.msg = msg;
+    }
+
+    /**
+     * @return Message.
+     */
+    public SchemaAbstractDiscoveryMessage message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaExchangeWorkerTask.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
new file mode 100644
index 0000000..3321e66
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Closure that internally applies given {@link SchemaIndexCacheVisitorClosure} to some set of entries.
+ */
+public interface SchemaIndexCacheVisitor {
+    /**
+     * Visit cache entries and pass them to closure.
+     *
+     * @param clo Closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java
new file mode 100644
index 0000000..7f50089
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorClosure.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Index closure accepting current entry state.
+ */
+public interface SchemaIndexCacheVisitorClosure {
+    /**
+     * Apply closure.
+     *
+     * @param key Key.
+     * @param part Partition.
+     * @param val Value.
+     * @param ver Version.
+     * @param expiration Expiration.
+     * @param link Link.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void apply(KeyCacheObject key, int part, CacheObject val, GridCacheVersion ver, long expiration, long link)
+        throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
new file mode 100644
index 0000000..58c909d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.Collection;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
+
+/**
+ * Traversor operating all primary and backup partitions of given cache.
+ */
+public class SchemaIndexCacheVisitorImpl implements SchemaIndexCacheVisitor {
+    /** Query procssor. */
+    private final GridQueryProcessor qryProc;
+
+    /** Cache context. */
+    private final GridCacheContext cctx;
+
+    /** Space name. */
+    private final String spaceName;
+
+    /** Table name. */
+    private final String tblName;
+
+    /** Cancellation token. */
+    private final SchemaIndexOperationCancellationToken cancel;
+
+    /**
+     * Constructor.
+     *
+     * @param cctx Cache context.
+     * @param spaceName Space name.
+     * @param tblName Table name.
+     * @param cancel Cancellation token.
+     */
+    public SchemaIndexCacheVisitorImpl(GridQueryProcessor qryProc, GridCacheContext cctx, String spaceName,
+        String tblName, SchemaIndexOperationCancellationToken cancel) {
+        this.qryProc = qryProc;
+        this.spaceName = spaceName;
+        this.tblName = tblName;
+        this.cancel = cancel;
+
+        if (cctx.isNear())
+            cctx = ((GridNearCacheAdapter)cctx.cache()).dht().context();
+
+        this.cctx = cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void visit(SchemaIndexCacheVisitorClosure clo) throws IgniteCheckedException {
+        assert clo != null;
+
+        FilteringVisitorClosure filterClo = new FilteringVisitorClosure(clo);
+
+        Collection<GridDhtLocalPartition> parts = cctx.topology().localPartitions();
+
+        for (GridDhtLocalPartition part : parts)
+            processPartition(part, filterClo);
+    }
+
+    /**
+     * Process partition.
+     *
+     * @param part Partition.
+     * @param clo Index closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processPartition(GridDhtLocalPartition part, FilteringVisitorClosure clo)
+        throws IgniteCheckedException {
+        checkCancelled();
+
+        boolean reserved = false;
+
+        if (part != null && part.state() != EVICTED)
+            reserved = (part.state() == OWNING || part.state() == RENTING) && part.reserve();
+
+        if (!reserved)
+            return;
+
+        try {
+            GridCursor<? extends CacheDataRow> cursor = part.dataStore().cursor();
+
+            while (cursor.next()) {
+                CacheDataRow row = cursor.get();
+
+                KeyCacheObject key = row.key();
+
+                processKey(key, row.link(), clo);
+            }
+        }
+        finally {
+            part.release();
+        }
+    }
+
+    /**
+     * Process single key.
+     *
+     * @param key Key.
+     * @param link Link.
+     * @param clo Closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void processKey(KeyCacheObject key, long link, FilteringVisitorClosure clo) throws IgniteCheckedException {
+        while (true) {
+            try {
+                checkCancelled();
+
+                GridCacheEntryEx entry = cctx.cache().entryEx(key);
+
+                try {
+                    entry.updateIndex(clo, link);
+                }
+                finally {
+                    cctx.evicts().touch(entry, AffinityTopologyVersion.NONE);
+                }
+
+                break;
+            }
+            catch (GridCacheEntryRemovedException ignored) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Check if visit process is not cancelled.
+     *
+     * @throws IgniteCheckedException If cancelled.
+     */
+    private void checkCancelled() throws IgniteCheckedException {
+        if (cancel.isCancelled())
+            throw new IgniteCheckedException("Index creation was cancelled.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaIndexCacheVisitorImpl.class, this);
+    }
+
+    /**
+     * Filtering visitor closure.
+     */
+    private class FilteringVisitorClosure implements SchemaIndexCacheVisitorClosure {
+
+        /** Target closure. */
+        private final SchemaIndexCacheVisitorClosure target;
+
+        /**
+         * Constructor.
+         *
+         * @param target Target.
+         */
+        public FilteringVisitorClosure(SchemaIndexCacheVisitorClosure target) {
+            this.target = target;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(KeyCacheObject key, int part, CacheObject val, GridCacheVersion ver,
+            long expiration, long link) throws IgniteCheckedException {
+            if (qryProc.belongsToTable(cctx, spaceName, tblName, key, val))
+                target.apply(key, part, val, ver, expiration, link);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java
new file mode 100644
index 0000000..1bc3434
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexOperationCancellationToken.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Index operation cancellation token.
+ */
+public class SchemaIndexOperationCancellationToken {
+    /** Cancel flag. */
+    private final AtomicBoolean flag = new AtomicBoolean();
+
+    /**
+     * Get cancel state.
+     *
+     * @return {@code True} if cancelled.
+     */
+    public boolean isCancelled() {
+        return flag.get();
+    }
+
+    /**
+     * Do cancel.
+     *
+     * @return {@code True} if cancel flag was set by this call.
+     */
+    public boolean cancel() {
+        return flag.compareAndSet(false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaIndexOperationCancellationToken.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java
new file mode 100644
index 0000000..3f12b77
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaKey.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Schema key.
+ */
+public class SchemaKey {
+    /** Space. */
+    private final String space;
+
+    /** Deployment ID. */
+    private final IgniteUuid depId;
+
+    /**
+     * Constructor.
+     *
+     * @param space Space.
+     * @param depId Deployment ID.
+     */
+    public SchemaKey(String space, IgniteUuid depId) {
+        this.space = space;
+        this.depId = depId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return 31 * (space != null ? space.hashCode() : 0) + depId.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (obj instanceof SchemaKey) {
+            SchemaKey other = (SchemaKey)obj;
+
+            return F.eq(space, other.space) && F.eq(depId, other.depId);
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
new file mode 100644
index 0000000..79fbfcd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaNodeLeaveExchangeWorkerTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Node leave exchange worker task.
+ */
+public class SchemaNodeLeaveExchangeWorkerTask implements CachePartitionExchangeWorkerTask {
+    /** Node. */
+    @GridToStringInclude
+    private final ClusterNode node;
+
+    /**
+     * Constructor.
+     *
+     * @param node Node.
+     */
+    public SchemaNodeLeaveExchangeWorkerTask(ClusterNode node) {
+        this.node = node;
+    }
+
+    /**
+     * @return Node.
+     */
+    public ClusterNode node() {
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaNodeLeaveExchangeWorkerTask.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java
new file mode 100644
index 0000000..6c47aff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationClientFuture.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Schema operation client future.
+ */
+public class SchemaOperationClientFuture extends GridFutureAdapter<Object> {
+    /** Operation ID. */
+    private final UUID opId;
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation ID.
+     */
+    public SchemaOperationClientFuture(UUID opId) {
+        this.opId = opId;
+    }
+
+    /**
+     * @return Operation ID.
+     */
+    public UUID operationId() {
+        return opId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaOperationClientFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
new file mode 100644
index 0000000..f0db026
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationException.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Schema operation exception.
+ */
+public class SchemaOperationException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Code: generic error. */
+    public static final int CODE_GENERIC = 0;
+
+    /** Code: cache not found. */
+    public static final int CODE_CACHE_NOT_FOUND = 1;
+
+    /** Code: table not found. */
+    public static final int CODE_TABLE_NOT_FOUND = 2;
+
+    /** Code: table already exists. */
+    public static final int CODE_TABLE_EXISTS = 3;
+
+    /** Code: column not found. */
+    public static final int CODE_COLUMN_NOT_FOUND = 4;
+
+    /** Code: column already exists. */
+    public static final int CODE_COLUMN_EXISTS = 5;
+
+    /** Code: index not found. */
+    public static final int CODE_INDEX_NOT_FOUND = 6;
+
+    /** Code: index already exists. */
+    public static final int CODE_INDEX_EXISTS = 7;
+
+    /** Error code. */
+    private final int code;
+
+    /**
+     * Constructor for specific error type.
+     *
+     * @param code Code.
+     * @param objName Object name.
+     */
+    public SchemaOperationException(int code, String objName) {
+        super(message(code, objName));
+
+        this.code = code;
+    }
+
+    /**
+     * Constructor for generic error.
+     *
+     * @param msg Message.
+     */
+    public SchemaOperationException(String msg) {
+        this(msg, null);
+    }
+
+    /**
+     * Constructor for generic error.
+     *
+     * @param msg Message.
+     * @param cause Cause.
+     */
+    public SchemaOperationException(String msg, Throwable cause) {
+        super(msg, cause);
+
+        code = CODE_GENERIC;
+    }
+
+    /**
+     * @return Code.
+     */
+    public int code() {
+        return code;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaOperationException.class, this, "msg", getMessage());
+    }
+
+    /**
+     * Create message for specific code and object name.
+     *
+     * @param code Code.
+     * @param objName Object name.
+     * @return Message.
+     */
+    private static String message(int code, String objName) {
+        switch (code) {
+            case CODE_CACHE_NOT_FOUND:
+                return "Cache doesn't exist: " + objName;
+
+            case CODE_TABLE_NOT_FOUND:
+                return "Table doesn't exist: " + objName;
+
+            case CODE_TABLE_EXISTS:
+                return "Table already exists: " + objName;
+
+            case CODE_COLUMN_NOT_FOUND:
+                return "Column doesn't exist: " + objName;
+
+            case CODE_COLUMN_EXISTS:
+                return "Column already exists: " + objName;
+
+            case CODE_INDEX_NOT_FOUND:
+                return "Index doesn't exist: " + objName;
+
+            case CODE_INDEX_EXISTS:
+                return "Index already exists: " + objName;
+
+            default:
+                assert false;
+
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
new file mode 100644
index 0000000..eb0f3cd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationManager.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Schema operation manager.
+ */
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+public class SchemaOperationManager {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Query processor. */
+    private final GridQueryProcessor qryProc;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Operation handler. */
+    private final SchemaOperationWorker worker;
+
+    /** Mutex for concurrency control. */
+    private final Object mux = new Object();
+
+    /** Participants. */
+    private Collection<UUID> nodeIds;
+
+    /** Node results. */
+    private Map<UUID, SchemaOperationException> nodeRess;
+
+    /** Current coordinator node. */
+    private ClusterNode crd;
+
+    /** Whether coordinator state is mapped. */
+    private boolean crdMapped;
+
+    /** Coordinator finished flag. */
+    private boolean crdFinished;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param qryProc Query processor.
+     * @param worker Operation handler.
+     * @param crd Coordinator node.
+     */
+    public SchemaOperationManager(GridKernalContext ctx, GridQueryProcessor qryProc, SchemaOperationWorker worker,
+        @Nullable ClusterNode crd) {
+        assert !ctx.clientNode() || crd == null;
+
+        this.ctx = ctx;
+
+        log = ctx.log(SchemaOperationManager.class);
+
+        this.qryProc = qryProc;
+        this.worker = worker;
+
+        synchronized (mux) {
+            this.crd = crd;
+
+            prepareCoordinator();
+        }
+    }
+
+    /**
+     * Map operation handling.
+     */
+    @SuppressWarnings("unchecked")
+    public void start() {
+        worker.start();
+
+        synchronized (mux) {
+            worker.future().listen(new IgniteInClosure<IgniteInternalFuture>() {
+                @Override public void apply(IgniteInternalFuture fut) {
+                    onLocalNodeFinished(fut);
+                }
+            });
+        }
+    }
+
+    /**
+     * Handle local node finish.
+     *
+     * @param fut Future.
+     */
+    private void onLocalNodeFinished(IgniteInternalFuture fut) {
+        assert fut.isDone();
+
+        if (ctx.clientNode())
+            return;
+
+        SchemaOperationException err;
+
+        try {
+            fut.get();
+
+            err = null;
+        }
+        catch (Exception e) {
+            err = QueryUtils.wrapIfNeeded(e);
+        }
+
+        synchronized (mux) {
+            if (isLocalCoordinator())
+                onNodeFinished(ctx.localNodeId(), err);
+            else
+                qryProc.sendStatusMessage(crd.id(), operationId(), err);
+        }
+    }
+
+    /**
+     * Handle node finish.
+     *
+     * @param nodeId Node ID.
+     * @param err Error.
+     */
+    public void onNodeFinished(UUID nodeId, @Nullable SchemaOperationException err) {
+        synchronized (mux) {
+            assert isLocalCoordinator();
+
+            if (nodeRess.containsKey(nodeId)) {
+                if (log.isDebugEnabled())
+                    log.debug("Received duplicate result [opId=" + operationId() + ", nodeId=" + nodeId +
+                        ", err=" + err + ']');
+
+                return;
+            }
+
+            if (nodeIds.contains(nodeId)) {
+                if (log.isDebugEnabled())
+                    log.debug("Received result [opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
+
+                nodeRess.put(nodeId, err);
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("Received result from non-tracked node (joined after operation started, will ignore) " +
+                        "[opId=" + operationId() + ", nodeId=" + nodeId + ", err=" + err + ']');
+            }
+
+            checkFinished();
+        }
+    }
+
+    /**
+     * Handle node leave event.
+     *
+     * @param nodeId ID of the node that has left the grid.
+     * @param curCrd Current coordinator node.
+     */
+    public void onNodeLeave(UUID nodeId, ClusterNode curCrd) {
+        synchronized (mux) {
+            assert crd != null;
+
+            if (F.eq(nodeId, crd.id())) {
+                // Coordinator has left!
+                crd = curCrd;
+
+                prepareCoordinator();
+            }
+            else if (isLocalCoordinator()) {
+                // Other node has left, remove it from the coordinator's wait set.
+                // Handle this as success.
+                if (nodeIds.remove(nodeId))
+                    nodeRess.remove(nodeId);
+            }
+
+            IgniteInternalFuture fut = worker().future();
+
+            if (fut.isDone())
+                onLocalNodeFinished(fut);
+
+            checkFinished();
+        }
+    }
+
+    /**
+     * Check if operation finished.
+     */
+    private void checkFinished() {
+        assert Thread.holdsLock(mux);
+
+        if (isLocalCoordinator()) {
+            if (crdFinished)
+                return;
+
+            if (nodeIds.size() == nodeRess.size()) {
+                // Initiate finish request.
+                SchemaOperationException err = null;
+
+                for (Map.Entry<UUID, SchemaOperationException> nodeRes : nodeRess.entrySet()) {
+                    if (nodeRes.getValue() != null) {
+                        err = nodeRes.getValue();
+
+                        break;
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Collected all results, about to send finish message [opId=" + operationId() +
+                        ", err=" + err + ']');
+
+                crdFinished = true;
+
+                qryProc.onCoordinatorFinished(worker.operation(), err);
+            }
+        }
+    }
+
+    /**
+     * Prepare topology state in case local node is coordinator.
+     *
+     * @return {@code True} if state was changed by this call.
+     */
+    private boolean prepareCoordinator() {
+        if (isLocalCoordinator() && !crdMapped) {
+            // Initialize local structures.
+            nodeIds = new HashSet<>();
+            nodeRess = new HashMap<>();
+
+            for (ClusterNode alive : ctx.discovery().aliveServerNodes())
+                nodeIds.add(alive.id());
+
+            if (log.isDebugEnabled())
+                log.debug("Mapped participating nodes on coordinator [opId=" + operationId() +
+                    ", crdNodeId=" + ctx.localNodeId() + ", nodes=" + nodeIds + ']');
+
+            crdMapped = true;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Check if current node is local coordinator.
+     *
+     * @return {@code True} if coordinator.
+     */
+    private boolean isLocalCoordinator() {
+        assert Thread.holdsLock(mux);
+
+        return crd != null && crd.isLocal();
+    }
+
+    /**
+     * @return Worker.
+     */
+    public SchemaOperationWorker worker() {
+        return worker;
+    }
+
+    /**
+     * @return Operation ID.
+     */
+    private UUID operationId() {
+        return worker.operation().id();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
new file mode 100644
index 0000000..06feecb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaOperationWorker.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Schema operation executor.
+ */
+public class SchemaOperationWorker extends GridWorker {
+    /** Query processor */
+    private final GridQueryProcessor qryProc;
+
+    /** Deployment ID. */
+    private final IgniteUuid depId;
+
+    /** Target operation. */
+    private final SchemaAbstractOperation op;
+
+    /** No-op flag. */
+    private final boolean nop;
+
+    /** Whether cache started. */
+    private final boolean cacheRegistered;
+
+    /** Type descriptor. */
+    private final QueryTypeDescriptorImpl type;
+
+    /** Operation future. */
+    private final GridFutureAdapter fut;
+
+    /** Public operation future. */
+    private final GridFutureAdapter pubFut;
+
+    /** Start guard. */
+    private final AtomicBoolean startGuard = new AtomicBoolean();
+
+    /** Cancellation token. */
+    private final SchemaIndexOperationCancellationToken cancelToken = new SchemaIndexOperationCancellationToken();
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param qryProc Query processor.
+     * @param depId Deployment ID.
+     * @param op Target operation.
+     * @param nop No-op flag.
+     * @param err Predefined error.
+     * @param cacheRegistered Whether cache is registered in indexing at this point.
+     * @param type Type descriptor (if available).
+     */
+    public SchemaOperationWorker(GridKernalContext ctx, GridQueryProcessor qryProc, IgniteUuid depId,
+        SchemaAbstractOperation op, boolean nop, @Nullable SchemaOperationException err, boolean cacheRegistered,
+        @Nullable QueryTypeDescriptorImpl type) {
+        super(ctx.igniteInstanceName(), workerName(op), ctx.log(SchemaOperationWorker.class));
+
+        this.qryProc = qryProc;
+        this.depId = depId;
+        this.op = op;
+        this.nop = nop;
+        this.cacheRegistered = cacheRegistered;
+        this.type = type;
+
+        fut = new GridFutureAdapter();
+
+        if (err != null)
+            fut.onDone(err);
+        else if (nop || !cacheRegistered)
+            fut.onDone();
+
+        pubFut = publicFuture(fut);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        try {
+            // Execute.
+            qryProc.processIndexOperationLocal(op, type, depId, cancelToken);
+
+            fut.onDone();
+        }
+        catch (Exception e) {
+            fut.onDone(QueryUtils.wrapIfNeeded(e));
+        }
+    }
+
+    /**
+     * Perform initialization routine.
+     *
+     * @return This instance.
+     */
+    public SchemaOperationWorker start() {
+        if (startGuard.compareAndSet(false, true)) {
+            if (!fut.isDone())
+                new IgniteThread(this).start();
+        }
+
+        return this;
+    }
+
+    /**
+     * Chain the future making sure that operation is completed after local schema is updated.
+     *
+     * @param fut Current future.
+     * @return Chained future.
+     */
+    @SuppressWarnings("unchecked")
+    private GridFutureAdapter<?> publicFuture(GridFutureAdapter fut) {
+        final GridFutureAdapter<?> chainedFut = new GridFutureAdapter<>();
+
+        fut.listen(new IgniteInClosure<IgniteInternalFuture>() {
+            @Override public void apply(IgniteInternalFuture fut) {
+                Exception err = null;
+
+                try {
+                    fut.get();
+
+                    if (cacheRegistered && !nop)
+                        qryProc.onLocalOperationFinished(op, type);
+                }
+                catch (Exception e) {
+                    err = e;
+                }
+                finally {
+                    chainedFut.onDone(null, err);
+                }
+            }
+        });
+
+        return chainedFut;
+    }
+
+    /**
+     * @return No-op flag.
+     */
+    public boolean nop() {
+        return nop;
+    }
+
+    /**
+     * @return Whether cache is registered.
+     */
+    public boolean cacheRegistered() {
+        return cacheRegistered;
+    }
+
+    /**
+     * Cancel operation.
+     */
+    public void cancel() {
+        if (cancelToken.cancel())
+            super.cancel();
+    }
+
+    /**
+     * @return Operation.
+     */
+    public SchemaAbstractOperation operation() {
+        return op;
+    }
+
+    /**
+     * @return Future completed when operation is ready.
+     */
+    public IgniteInternalFuture future() {
+        return pubFut;
+    }
+
+    /**
+     * @return Worker name.
+     */
+    private static String workerName(SchemaAbstractOperation op) {
+        return "schema-op-worker-" + op.id();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
new file mode 100644
index 0000000..9fdc6c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+/**
+ * Abstract discovery message for schema operations.
+ */
+public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** ID */
+    private final IgniteUuid id = IgniteUuid.randomUuid();
+
+    /** Operation. */
+    @GridToStringInclude
+    protected final SchemaAbstractOperation op;
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     */
+    protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
+        this.op = op;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid id() {
+        return id;
+    }
+
+    /**
+     * @return Operation.
+     */
+    public SchemaAbstractOperation operation() {
+        return op;
+    }
+
+    /**
+     * @return Whether request must be propagated to exchange thread.
+     */
+    public abstract boolean exchange();
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaAbstractDiscoveryMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
new file mode 100644
index 0000000..2245b24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Schema change finish discovery message.
+ */
+public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Error. */
+    private final SchemaOperationException err;
+
+    /** Original propose message. */
+    private transient SchemaProposeDiscoveryMessage proposeMsg;
+
+    /**
+     * Constructor.
+     *
+     * @param op Original operation.
+     * @param err Error.
+     */
+    public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationException err) {
+        super(op);
+
+        this.err = err;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exchange() {
+        return false;
+    }
+
+    /**
+     * @return {@code True} if error was reported during init.
+     */
+    public boolean hasError() {
+        return err != null;
+    }
+
+    /**
+     * @return Error message (if any).
+     */
+    @Nullable public SchemaOperationException error() {
+        return err;
+    }
+
+    /**
+     * @return Propose message.
+     */
+    public SchemaProposeDiscoveryMessage proposeMessage() {
+        return proposeMsg;
+    }
+
+    /**
+     * @param proposeMsg Propose message.
+     */
+    public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
+        this.proposeMsg = proposeMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
new file mode 100644
index 0000000..5f75e60
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaOperationStatusMessage.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * Schema operation status message.
+ */
+public class SchemaOperationStatusMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Operation ID. */
+    private UUID opId;
+
+    /** Error bytes (if any). */
+    private byte[] errBytes;
+
+    /** Sender node ID. */
+    @GridDirectTransient
+    private UUID sndNodeId;
+
+    /**
+     * Default constructor.
+     */
+    public SchemaOperationStatusMessage() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation ID.
+     * @param errBytes Error bytes.
+     */
+    public SchemaOperationStatusMessage(UUID opId, byte[] errBytes) {
+        this.opId = opId;
+        this.errBytes = errBytes;
+    }
+
+    /**
+     * @return Operation ID.
+     */
+    public UUID operationId() {
+        return opId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    @Nullable public byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /**
+     * @return Sender node ID.
+     */
+    public UUID senderNodeId() {
+        return sndNodeId;
+    }
+
+    /**
+     * @param sndNodeId Sender node ID.
+     */
+    public void senderNodeId(UUID sndNodeId) {
+        this.sndNodeId = sndNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeUuid("opId", opId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                opId = reader.readUuid("opId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(SchemaOperationStatusMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -53;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaOperationStatusMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
new file mode 100644
index 0000000..664ee03
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.message;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.query.schema.SchemaKey;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
+import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Schema change propose discovery message.
+ */
+public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Cache deployment ID. */
+    private IgniteUuid depId;
+
+    /** Error. */
+    private SchemaOperationException err;
+
+    /** Whether to perform exchange. */
+    private transient boolean exchange;
+
+    /**
+     * Constructor.
+     *
+     * @param op Operation.
+     */
+    public SchemaProposeDiscoveryMessage(SchemaAbstractOperation op) {
+        super(op);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isMutable() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean exchange() {
+        return exchange;
+    }
+
+    /**
+     * @param exchange Whether to perform exchange.
+     */
+    public void exchange(boolean exchange) {
+        this.exchange = exchange;
+    }
+
+    /**
+     * @return Deployment ID.
+     */
+    @Nullable public IgniteUuid deploymentId() {
+        return depId;
+    }
+
+    /**
+     * @param depId Deployment ID.
+     */
+    public void deploymentId(IgniteUuid depId) {
+        this.depId = depId;
+    }
+
+    /**
+     *
+     * @return {@code True} if message is initialized.
+     */
+    public boolean initialized() {
+        return deploymentId() != null || hasError();
+    }
+
+    /**
+     * Set error.
+     *
+     * @param err Error.
+     */
+    public void onError(SchemaOperationException err) {
+        if (!hasError()) {
+            this.err = err;
+        }
+    }
+
+    /**
+     * @return {@code True} if error was reported during init.
+     */
+    public boolean hasError() {
+        return err != null;
+    }
+
+    /**
+     * @return Error message (if any).
+     */
+    @Nullable public SchemaOperationException error() {
+        return err;
+    }
+
+    /**
+     * @return Schema key.
+     */
+    public SchemaKey schemaKey() {
+        return new SchemaKey(operation().space(), depId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java
new file mode 100644
index 0000000..8418ece
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaAbstractOperation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.operation;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Abstract operation on schema.
+ */
+public abstract class SchemaAbstractOperation implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Operation ID. */
+    private final UUID opId;
+
+    /** Space. */
+    private final String space;
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation ID.
+     * @param space Space.
+     */
+    public SchemaAbstractOperation(UUID opId, String space) {
+        this.opId = opId;
+        this.space = space;
+    }
+
+    /**
+     * @return Operation id.
+     */
+    public UUID id() {
+        return opId;
+    }
+
+    /**
+     * @return Space.
+     */
+    public String space() {
+        return space;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaAbstractOperation.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java
new file mode 100644
index 0000000..fc4a9ff
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexAbstractOperation.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.operation;
+
+import java.util.UUID;
+
+/**
+ * Schema index abstract operation.
+ */
+public abstract class SchemaIndexAbstractOperation extends SchemaAbstractOperation {
+    /**
+     * Constructor.
+     *
+     * @param opId Operation ID.
+     * @param space Space.
+     */
+    public SchemaIndexAbstractOperation(UUID opId, String space) {
+        super(opId, space);
+    }
+
+    /**
+     * @return Index name.
+     */
+    public abstract String indexName();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java
new file mode 100644
index 0000000..9281f2a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/operation/SchemaIndexCreateOperation.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.schema.operation;
+
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Schema index create operation.
+ */
+public class SchemaIndexCreateOperation extends SchemaIndexAbstractOperation {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Table name. */
+    private final String tblName;
+
+    /** Index. */
+    @GridToStringInclude
+    private final QueryIndex idx;
+
+    /** Ignore operation if index exists. */
+    private final boolean ifNotExists;
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation id.
+     * @param space Space.
+     * @param tblName Table name.
+     * @param idx Index params.
+     * @param ifNotExists Ignore operation if index exists.
+     */
+    public SchemaIndexCreateOperation(UUID opId, String space, String tblName, QueryIndex idx, boolean ifNotExists) {
+        super(opId, space);
+
+        this.tblName = tblName;
+        this.idx = idx;
+        this.ifNotExists = ifNotExists;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String indexName() {
+        return QueryUtils.indexName(tblName, idx);
+    }
+
+    /**
+     * @return Table name.
+     */
+    public String tableName() {
+        return tblName;
+    }
+
+    /**
+     * @return Index params.
+     */
+    public QueryIndex index() {
+        return idx;
+    }
+
+    /**
+     * @return Ignore operation if index exists.
+     */
+    public boolean ifNotExists() {
+        return ifNotExists;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SchemaIndexCreateOperation.class, this, "parent", super.toString());
+    }
+}


Mime
View raw message