ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [19/23] ignite git commit: Wired things up.
Date Wed, 22 Mar 2017 11:13:05 GMT
Wired things up.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/edaf29af
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/edaf29af
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/edaf29af

Branch: refs/heads/ignite-4565-ddl
Commit: edaf29af95e1503d95001d1bede5a3d52d120ca5
Parents: 013faff
Author: devozerov <vozerov@gridgain.com>
Authored: Wed Mar 22 11:30:55 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Wed Mar 22 11:30:55 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    | 223 +++++++++++++------
 .../query/ddl/IndexOperationState.java          |  12 +-
 2 files changed, 153 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/edaf29af/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 92bc7c1..2e5ee50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -47,9 +47,12 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.DropIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationHandler;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationState;
 import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusRequest;
 import org.apache.ignite.internal.processors.query.ddl.IndexOperationStatusResponse;
 import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
@@ -89,6 +92,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.internal.GridTopic.TOPIC_DYNAMIC_SCHEMA;
 import static org.apache.ignite.internal.IgniteComponentType.INDEXING;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
 
 /**
  * Indexing processor.
@@ -127,6 +131,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** Index create/drop client futures. */
     private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new ConcurrentHashMap<>();
 
+    /** Index operation states. */
+    private final ConcurrentHashMap<UUID, IndexOperationState> idxOpStates = new ConcurrentHashMap<>();
+
     /** IO message listener. */
     private final GridMessageListener ioLsnr;
 
@@ -425,73 +432,90 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
         idxLock.writeLock().lock();
 
-        // TODO
-
         try {
-//            // Validate.
-//            if (op instanceof CreateIndexOperation) {
-//                CreateIndexOperation op0 = (CreateIndexOperation)op;
-//
-//                QueryIndex idx = op0.index();
-//
-//                // Check conflict with other indexes.
-//                String idxName = op0.index().getName();
-//
-//                QueryIndexKey idxKey = new QueryIndexKey(space, idxName);
-//
-//                QueryIndexDescriptorImpl oldIdx = idxs.get(idxKey);
-//
-//                if (oldIdx != null) {
-//                    if (!op0.ifNotExists())
-//                        msg.onError(ctx.localNodeId(), "Index already exists [space=" +
space + ", index=" + idxName);
-//
-//                    return;
-//                }
-//
-//                // Make sure table exists.
-//                String tblName = op0.tableName();
-//
-//                QueryTypeDescriptorImpl typeDesc = null;
-//
-//                for (QueryTypeDescriptorImpl type : types.values()) {
-//                    if (F.eq(tblName, type.tableName())) {
-//                        typeDesc = type;
-//
-//                        break;
-//                    }
-//                }
-//
-//                if (typeDesc == null) {
-//                    msg.onError(ctx.localNodeId(), "Table doesn't exist: " + tblName);
-//
-//                    return;
-//                }
-//
-//                // Make sure that index can be applied to the given table.
-//                for (String idxField : idx.getFieldNames()) {
-//                    if (!typeDesc.fields().containsKey(idxField)) {
-//                        msg.onError(ctx.localNodeId(), "Field doesn't exist: " + idxField);
-//
-//                        return;
-//                    }
-//                }
-//            }
-//            else if (op instanceof DropIndexOperation) {
-//                DropIndexOperation op0 = (DropIndexOperation)op;
-//
-//                String idxName = op0.indexName();
-//
-//                QueryIndexKey idxKey = new QueryIndexKey(space, idxName);
-//
-//                QueryIndexDescriptorImpl oldIdx = idxs.get(idxKey);
-//
-//                if (oldIdx == null) {
-//                    if (!op0.ifExists())
-//                        msg.onError(ctx.localNodeId(), "Index doesn't exist: " + idxName);
-//                }
-//            }
-//            else
-//                msg.onError(ctx.localNodeId(), "Unsupported operation: " + op);
+            AbstractIndexOperation op = msg.operation();
+            String space = op.space();
+
+            boolean completed = false;
+            String errMsg = null;
+
+            // Validate.
+            if (op instanceof CreateIndexOperation) {
+                CreateIndexOperation op0 = (CreateIndexOperation)op;
+
+                QueryIndex idx = op0.index();
+
+                // Make sure table exists.
+                String tblName = op0.tableName();
+
+                QueryTypeDescriptorImpl type0 = null;
+
+                for (QueryTypeDescriptorImpl type : types.values()) {
+                    if (F.eq(tblName, type.tableName())) {
+                        type0 = type;
+
+                        break;
+                    }
+                }
+
+                if (type0 == null) {
+                    completed = true;
+                    errMsg = "Table doesn't exist: " + tblName;
+                }
+                else {
+                    // Make sure that index can be applied to the given table.
+                    for (String idxField : idx.getFieldNames()) {
+                        if (!type0.fields().containsKey(idxField)) {
+                            completed = true;
+                            errMsg = "Field doesn't exist: " + idxField;
+                        }
+                    }
+                }
+
+                // Check conflict with other indexes.
+                if (errMsg != null) {
+                    String idxName = op0.index().getName();
+
+                    QueryIndexKey idxKey = new QueryIndexKey(space, idxName);
+
+                    if (idxs.get(idxKey) != null) {
+                        completed = true;
+
+                        if (!op0.ifNotExists())
+                            errMsg = "Index already exists [space=" + space + ", index="
+ idxName + ']';
+                    }
+                }
+            }
+            else if (op instanceof DropIndexOperation) {
+                DropIndexOperation op0 = (DropIndexOperation)op;
+
+                String idxName = op0.indexName();
+
+                QueryIndexDescriptorImpl oldIdx = idxs.get(new QueryIndexKey(space, idxName));
+
+                if (oldIdx == null) {
+                    completed = true;
+
+                    if (!op0.ifExists())
+                        errMsg = "Index doesn't exist: " + idxName;
+                }
+                else {
+                    // Make sure that index relate to expected table.
+                    if (F.eq(oldIdx.typeDescriptor().tableName(), op.tableName())) {
+                        completed = true;
+                        errMsg = "Index doesn't exist: " + idxName;
+                    }
+                }
+            }
+            else {
+                completed = true;
+                errMsg = "Unsupported operation: " + op;
+            }
+
+            // Start async operation.
+            Exception err = errMsg != null ? new IgniteException(errMsg) : null;
+
+            startIndexOperation(op, completed, err);
         }
         finally {
             idxLock.writeLock().unlock();
@@ -499,12 +523,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Handle index accept message.
+     * Handle index finish message.
      *
      * @param msg Message.
      */
     public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+        UUID opId = msg.operation().operationId();
+
+        idxOpStates.remove(opId);
+
+        QueryIndexClientFuture cliFut = idxCliFuts.remove(opId);
+
+        if (cliFut != null) {
+            if (msg.hasError()) {
+                IgniteException err = new IgniteException(msg.errorMessage());
 
+                cliFut.onDone(err); // TODO: Better message and code handling.
+            }
+            else
+                cliFut.onDone();
+        }
     }
 
     /**
@@ -513,7 +551,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param node Node.
      */
     public void onNodeLeave(ClusterNode node) {
-        // TODO.
+        for (IndexOperationState idxOpState : idxOpStates.values())
+            idxOpState.onNodeLeave(node.id());
     }
 
     /**
@@ -1317,7 +1356,15 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param err Error.
      */
     private void startIndexOperation(AbstractIndexOperation op, boolean completed, Exception
err) {
-        // TODO
+        IndexOperationHandler hnd = new IndexOperationHandler(ctx, this, op, completed, err);
+
+        hnd.init();
+
+        IndexOperationState state = new IndexOperationState(ctx, this, hnd);
+
+        idxOpStates.put(op.operationId(), state);
+
+        state.tryMap();
     }
 
     /**
@@ -1326,7 +1373,35 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param req Status request.
      */
     private void processStatusRequest(IndexOperationStatusRequest req) {
-        // TODO
+        UUID opId = req.operationId();
+
+        IndexOperationState idxOpState = idxOpStates.get(opId);
+
+        if (idxOpState != null)
+            idxOpState.onStatusRequest(req.senderNodeId());
+        else
+            // Operation completed successfully.
+            sendStatusResponse(req.senderNodeId(), opId, null);
+    }
+
+    /**
+     * Send status response.
+     *
+     * @param destNodeId Destination node ID.
+     * @param opId Operation ID.
+     * @param errMsg Error message.
+     */
+    public void sendStatusResponse(UUID destNodeId, UUID opId, String errMsg) {
+        try {
+            IndexOperationStatusResponse resp = new IndexOperationStatusResponse(ctx.localNodeId(),
opId, errMsg);
+
+            // TODO: Proper pool!
+            ctx.io().sendToGridTopic(destNodeId, TOPIC_DYNAMIC_SCHEMA, resp, PUBLIC_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            // Node left, ignore.
+            // TODO: Better logging all over the state and handler to simplify debug!
+        }
     }
 
     /**
@@ -1335,7 +1410,13 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param resp Status response.
      */
     private void processStatusResponse(IndexOperationStatusResponse resp) {
-        // TODO
+        IndexOperationState idxOpState = idxOpStates.get(resp.operationId());
+
+        if (idxOpState != null)
+            idxOpState.onNodeFinished(resp.senderNodeId(), resp.errorMessage());
+        else {
+            // TODO: Log!
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/edaf29af/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
index 75ab2e7..88ba5cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationState.java
@@ -183,17 +183,7 @@ public class IndexOperationState {
                     errMsg = e.getMessage();
                 }
 
-                try {
-                    IndexOperationStatusResponse resp =
-                        new IndexOperationStatusResponse(ctx.localNodeId(), hnd.operation().operationId(),
errMsg);
-
-                    // TODO: Proper pool!
-                    ctx.io().sendToGridTopic(nodeId, TOPIC_DYNAMIC_SCHEMA, resp, PUBLIC_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    // Node left, ignore.
-                    // TODO: Better logging all over the state and handler to simplify debug!
-                }
+                qryProc.sendStatusResponse(nodeId, hnd.operation().operationId(), errMsg);
             }
         });
     }


Mime
View raw message