ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [1/2] ignite git commit: WIP on operation handler.
Date Mon, 20 Mar 2017 12:49:06 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 2b7c1a2c9 -> 1b2a3dedb


WIP on operation handler.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee37450a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee37450a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee37450a

Branch: refs/heads/ignite-4565-ddl
Commit: ee37450a69634f29586b5f7e897c6a41322a602f
Parents: 2b7c1a2
Author: devozerov <vozerov@gridgain.com>
Authored: Mon Mar 20 15:34:35 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Mon Mar 20 15:34:35 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    |  76 +++++----
 .../query/ddl/IndexOperationHandler.java        | 161 +++++++++++++++++++
 2 files changed, 197 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1bb3c1c..7fe83ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -222,8 +222,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (initIdxStates != null) {
             Map<String, QueryIndexState> readyIdxStates = initIdxStates.readyOperations();
 
-            for (QueryTypeCandidate cand : cands)
-                applyReadyDynamicOperations(cand.descriptor(), readyIdxStates);
+            for (QueryTypeCandidate cand : cands) {
+                QueryTypeDescriptorImpl desc = cand.descriptor();
+
+                for (Map.Entry<String, QueryIndexState> entry : readyIdxStates.entrySet())
{
+                    String idxName = entry.getKey();
+                    QueryIndexState idxState = entry.getValue();
+
+                    if (F.eq(desc.tableName(), idxState.tableName()))
+                        QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
+                }
+            }
         }
 
         // Ready to register at this point.
@@ -245,19 +254,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Apply ready dynamic index states to not-yet-registered descriptor.
+     * Find current coordinator.
      *
-     * @param desc Descriptor.
-     * @param idxStates Index states.
+     * @return {@code True} if node is coordinator.
      */
-    private void applyReadyDynamicOperations(QueryTypeDescriptorImpl desc, Map<String,
QueryIndexState> idxStates)
-        throws IgniteCheckedException {
-        for (Map.Entry<String, QueryIndexState> entry : idxStates.entrySet()) {
-            String idxName = entry.getKey();
-            QueryIndexState idxState = entry.getValue();
+    private ClusterNode findCoordinator() {
+        ClusterNode res = null;
 
-            QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc);
+        for (ClusterNode node : ctx.discovery().aliveServerNodes()) {
+            if (res == null || res.order() > node.order())
+                res = node;
         }
+
+        return res;
     }
 
     /** {@inheritDoc} */
@@ -308,6 +317,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method
or from exchange worker.
+     * When called for the first time, we initialize topology thus understanding whether
current node is coordinator
+     * or not.
+     *
      * @param cctx Cache context.
      * @param idxStates Index states.
      * @throws IgniteCheckedException If failed.
@@ -353,33 +366,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      */
     public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) {
         idxWorker.onAccept(msg);
-    }
-
-    /**
-     * Handle index accept message.
-     *
-     * @param msg Message.
-     */
-    public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
-        idxWorker.onFinish(msg);
-    }
-
-    /**
-     * Handle node leave.
-     *
-     * @param node Node.
-     */
-    public void onNodeLeave(ClusterNode node) {
-        // TODO.
-    }
 
-    /**
-     * Handle index init discovery message.
-     *
-     * @param space Space.
-     * @param op Operation.
-     */
-    public void onIndexAccept(String space, AbstractIndexOperation op) {
         idxLock.writeLock().lock();
 
         // TODO
@@ -456,12 +443,21 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Handle index ack discovery message.
+     * Handle index accept message.
      *
      * @param msg Message.
      */
-    private void onIndexAckDiscoveryMessage(String space, IndexAcceptDiscoveryMessage msg)
{
-        // TODO
+    public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) {
+        idxWorker.onFinish(msg);
+    }
+
+    /**
+     * Handle node leave.
+     *
+     * @param node Node.
+     */
+    public void onNodeLeave(ClusterNode node) {
+        // TODO.
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
new file mode 100644
index 0000000..6932724
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Index change handler.
+ */
+public class IndexOperationHandler {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Query processor */
+    private final GridQueryProcessor qryProc;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Target operation. */
+    private final AbstractIndexOperation op;
+
+    /** Operation future. */
+    private final GridFutureAdapter opFut;
+
+    /** Mutex for concurrent access. */
+    private final Object mux = new Object();
+
+    /** Init flag. */
+    private boolean init;
+
+    /** Cancel flag. */
+    private boolean cancel;
+
+    /** Worker. */
+    private IndexWorker worker;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param qryProc Query processor.
+     * @param op Target operation.
+     */
+    public IndexOperationHandler(GridKernalContext ctx, GridQueryProcessor qryProc, AbstractIndexOperation
op) {
+        this.ctx = ctx;
+        this.qryProc = qryProc;
+        this.op = op;
+
+        log = ctx.log(IndexOperationHandler.class);
+        opFut = new GridFutureAdapter();
+    }
+
+    /**
+     * Perform initialization routine.
+     */
+    public void init() {
+        synchronized (mux) {
+            if (!init) {
+                init = true;
+
+                if (!cancel) {
+                    worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log);
+
+                    new IgniteThread(worker).start();
+
+                    worker.awaitStart();
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Worker name.
+     */
+    private String workerName() {
+        return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
+    }
+
+    /**
+     * Cancel operation.
+     */
+    public void cancel() {
+        synchronized (mux) {
+            if (!cancel) {
+                cancel = true;
+
+                if (worker != null)
+                    worker.cancel();
+            }
+
+            // TODO
+        }
+    }
+
+    /**
+     * Single-shot index worker responsible for operation execution.
+     */
+    private class IndexWorker extends GridWorker {
+        /** Worker start latch. */
+        private final CountDownLatch startLatch = new CountDownLatch(1);
+
+        /**
+         * Constructor.
+         *
+         * @param igniteInstanceName Ignite instance name.
+         * @param name Worker name.
+         * @param log Logger.
+         */
+        public IndexWorker(@Nullable String igniteInstanceName, String name, IgniteLogger
log) {
+            super(igniteInstanceName, name, log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException
{
+            startLatch.countDown();
+
+            // TODO: Do actual create/drop.
+        }
+
+        /**
+         * Await start.
+         */
+        private void awaitStart() {
+            try {
+                startLatch.await();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteInterruptedException("Interrupted while waiting index operation
worker start: " +
+                    name(), e);
+            }
+        }
+    }
+}


Mime
View raw message