ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/2] ignite git commit: Added operation handler.
Date Mon, 20 Mar 2017 12:49:07 GMT
Added operation handler.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b2a3ded
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b2a3ded
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b2a3ded

Branch: refs/heads/ignite-4565-ddl
Commit: 1b2a3dedb43f4950e3ac67204b717bc44460d28c
Parents: ee37450
Author: devozerov <vozerov@gridgain.com>
Authored: Mon Mar 20 15:48:56 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Mon Mar 20 15:48:56 2017 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryProcessor.java    | 11 ++++
 .../ddl/IndexAbstractDiscoveryMessage.java      | 17 -------
 .../ddl/IndexOperationCancellationToken.java    | 53 ++++++++++++++++++++
 .../query/ddl/IndexOperationHandler.java        | 37 +++++++-------
 4 files changed, 84 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7fe83ab..517a30d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.query.ddl.AbstractIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
 import org.apache.ignite.internal.processors.query.ddl.IndexAcceptDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexOperationCancellationToken;
 import org.apache.ignite.internal.processors.query.ddl.IndexProposeDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.task.IndexingAcceptTask;
 import org.apache.ignite.internal.processors.query.ddl.task.IndexingCacheStartTask;
@@ -461,6 +462,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Process index operation.
+     *
+     * @param op Operation.
+     * @param cancelToken Cancel token.
+     */
+    public void processIndexOperation(AbstractIndexOperation op, IndexOperationCancellationToken
cancelToken) {
+        // TODO.
+    }
+
+    /**
      * Register cache in indexing SPI.
      *
      * @param space Space.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
index 11d8f93..3de525b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexAbstractDiscoveryMessage.java
@@ -41,9 +41,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
     /** Whether request must be propagated to exchange worker for final processing. */
     private transient boolean exchange;
 
-    /** Local cache index state at the moment of message receive. */
-    private transient QueryIndexStates idxStates;
-
     /**
      * Constructor.
      *
@@ -66,20 +63,6 @@ public abstract class IndexAbstractDiscoveryMessage implements DiscoveryCustomMe
     }
 
     /**
-     * @return Index states.
-     */
-    @Nullable public QueryIndexStates indexStates() {
-        return idxStates;
-    }
-
-    /**
-     * @param idxStates Index states.
-     */
-    public void indexStates(QueryIndexStates idxStates) {
-        this.idxStates = idxStates;
-    }
-
-    /**
      * @return Whether request must be propagated to exchange worker for final processing.
      */
     public boolean exchange() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
new file mode 100644
index 0000000..e8b2c2b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationCancellationToken.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.ddl;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Index operation cancellation token.
+ */
+public class IndexOperationCancellationToken {
+    /** Cancel flag. */
+    private final AtomicBoolean flag = new AtomicBoolean();
+
+    /**
+     * Get cancel state.
+     *
+     * @return {@code True} if cancelled.
+     */
+    public boolean isCancelled() {
+        return flag.get();
+    }
+
+    /**
+     * Do cancel.
+     *
+     * @return {@code True} if cancel flag was set by this call.
+     */
+    public boolean cancel() {
+        return flag.compareAndSet(false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IndexOperationCancellationToken.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1b2a3ded/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
index 6932724..116b613 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java
@@ -51,12 +51,12 @@ public class IndexOperationHandler {
     /** Mutex for concurrent access. */
     private final Object mux = new Object();
 
+    /** Cancellation token. */
+    private final IndexOperationCancellationToken cancelToken = new IndexOperationCancellationToken();
+
     /** Init flag. */
     private boolean init;
 
-    /** Cancel flag. */
-    private boolean cancel;
-
     /** Worker. */
     private IndexWorker worker;
 
@@ -84,7 +84,7 @@ public class IndexOperationHandler {
             if (!init) {
                 init = true;
 
-                if (!cancel) {
+                if (!cancelToken.isCancelled()) {
                     worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log);
 
                     new IgniteThread(worker).start();
@@ -96,29 +96,25 @@ public class IndexOperationHandler {
     }
 
     /**
-     * @return Worker name.
-     */
-    private String workerName() {
-        return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
-    }
-
-    /**
      * Cancel operation.
      */
     public void cancel() {
         synchronized (mux) {
-            if (!cancel) {
-                cancel = true;
-
+            if (!cancelToken.cancel()) {
                 if (worker != null)
                     worker.cancel();
             }
-
-            // TODO
         }
     }
 
     /**
+     * @return Worker name.
+     */
+    private String workerName() {
+        return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName();
+    }
+
+    /**
      * Single-shot index worker responsible for operation execution.
      */
     private class IndexWorker extends GridWorker {
@@ -140,7 +136,14 @@ public class IndexOperationHandler {
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException
{
             startLatch.countDown();
 
-            // TODO: Do actual create/drop.
+            try {
+                qryProc.processIndexOperation(op, cancelToken);
+
+                opFut.onDone();
+            }
+            catch (Exception e) {
+                opFut.onDone(e);
+            }
         }
 
         /**


Mime
View raw message