ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-4635: Introduced DDL worker thread.
Date Fri, 03 Mar 2017 11:57:18 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl c05be524a -> f79ead47a


IGNITE-4635: Introduced DDL worker thread.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f79ead47
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f79ead47
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f79ead47

Branch: refs/heads/ignite-4565-ddl
Commit: f79ead47a895eb6d205ea0f2a813efb23b67c989
Parents: c05be52
Author: Alexander Paschenko <alexander.a.paschenko@gmail.com>
Authored: Fri Mar 3 14:57:10 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Fri Mar 3 14:57:10 2017 +0300

----------------------------------------------------------------------
 .../query/h2/DmlStatementsProcessor.java        |  16 ++-
 .../processors/query/h2/IgniteH2Indexing.java   |   9 +-
 .../query/h2/ddl/DdlStatementsProcessor.java    | 122 +++++++++++++++++--
 .../processors/query/h2/ddl/DdlTask.java        |  25 ++++
 4 files changed, 148 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f79ead47/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 352b013..172f934 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -41,10 +41,12 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryArrayIdentityResolver;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -95,6 +97,9 @@ public class DmlStatementsProcessor {
     /** Indexing. */
     private IgniteH2Indexing idx;
 
+    /** Logger. */
+    private IgniteLogger log;
+
     /** Set of binary type ids for which warning about missing identity in configuration
has been printed. */
     private final static Set<Integer> WARNED_TYPES =
         Collections.newSetFromMap(new ConcurrentHashMap8<Integer, Boolean>());
@@ -106,10 +111,15 @@ public class DmlStatementsProcessor {
     private final ConcurrentMap<String, ConcurrentMap<String, UpdatePlan>> planCache
= new ConcurrentHashMap<>();
 
     /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
      * @param idx indexing.
      */
-    public void start(IgniteH2Indexing idx) {
+    public void start(GridKernalContext ctx, IgniteH2Indexing idx) {
         this.idx = idx;
+
+        log = ctx.log(DmlStatementsProcessor.class);
     }
 
     /**
@@ -394,7 +404,7 @@ public class DmlStatementsProcessor {
         while (it.hasNext()) {
             List<?> e = it.next();
             if (e.size() != 2) {
-                U.warn(idx.getLogger(), "Invalid row size on DELETE - expected 2, got " +
e.size());
+                U.warn(log, "Invalid row size on DELETE - expected 2, got " + e.size());
                 continue;
             }
 
@@ -895,7 +905,7 @@ public class DmlStatementsProcessor {
     private BinaryObject updateHashCodeIfNeeded(GridCacheContext cctx, BinaryObject binObj)
{
         if (U.isHashCodeEmpty(binObj)) {
             if (WARNED_TYPES.add(binObj.type().typeId()))
-                U.warn(idx.getLogger(), "Binary object's type does not have identity resolver
explicitly set, therefore " +
+                U.warn(log, "Binary object's type does not have identity resolver explicitly
set, therefore " +
                     "BinaryArrayIdentityResolver is used to generate hash codes for its instances,
and therefore " +
                     "hash code of this binary object will most likely not match that of its
non serialized form. " +
                     "For finer control over identity of this type, please update your BinaryConfiguration
accordingly." +

http://git-wip-us.apache.org/repos/asf/ignite/blob/f79ead47/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index ad731ff..1818eb0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -408,13 +408,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
-     * @return Logger.
-     */
-    public IgniteLogger getLogger() {
-        return log;
-    }
-
-    /**
      * @param c Connection.
      * @param sql SQL.
      * @param useStmtCache If {@code true} uses statement cache.
@@ -1886,7 +1879,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 throw new IgniteCheckedException("Failed to initialize DDL statements processor",
e);
             }
 
-            dmlProc.start(this);
+            dmlProc.start(ctx, this);
             ddlProc.start(ctx, this);
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f79ead47/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
index 1c4eeaa..1c558c5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlStatementsProcessor.java
@@ -22,13 +22,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
@@ -48,11 +52,14 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
 import org.h2.command.Prepared;
 import org.h2.command.ddl.CreateIndex;
 import org.h2.command.ddl.DropIndex;
 import org.h2.jdbc.JdbcPreparedStatement;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * DDL statements processor.<p>
@@ -62,8 +69,8 @@ public class DdlStatementsProcessor {
     /** Kernal context. */
     GridKernalContext ctx;
 
-    /** Indexing engine. */
-    private IgniteH2Indexing idx;
+    /** Logger. */
+    private IgniteLogger log;
 
     /** State flag. */
     private AtomicBoolean isStopped = new AtomicBoolean();
@@ -71,6 +78,9 @@ public class DdlStatementsProcessor {
     /** Running operations originating at this node as a client. */
     private Map<IgniteUuid, GridFutureAdapter> operations = new ConcurrentHashMap<>();
 
+    /** Worker. */
+    private volatile DdlWorker worker;
+
     /**
      * Initialize message handlers and this' fields needed for further operation.
      *
@@ -79,20 +89,37 @@ public class DdlStatementsProcessor {
      */
     public void start(final GridKernalContext ctx, IgniteH2Indexing idx) {
         this.ctx = ctx;
-        this.idx = idx;
 
-        ctx.discovery().setCustomEventListener(DdlInitDiscoveryMessage.class, new CustomEventListener<DdlInitDiscoveryMessage>()
{
+        log = ctx.log(DdlStatementsProcessor.class);
+
+        worker = new DdlWorker(ctx.gridName(), log);
+
+        IgniteThread workerThread = new IgniteThread(worker);
+
+        workerThread.setDaemon(true);
+
+        workerThread.start();
+
+        ctx.discovery().setCustomEventListener(DdlInitDiscoveryMessage.class,
+            new CustomEventListener<DdlInitDiscoveryMessage>() {
             /** {@inheritDoc} */
             @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
-            @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode
snd, DdlInitDiscoveryMessage msg) {
+            @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode
snd,
+                DdlInitDiscoveryMessage msg) {
                 onInit(msg);
             }
         });
 
-        ctx.discovery().setCustomEventListener(DdlAckDiscoveryMessage.class, new CustomEventListener<DdlAckDiscoveryMessage>()
{
+        ctx.discovery().setCustomEventListener(DdlAckDiscoveryMessage.class,
+            new CustomEventListener<DdlAckDiscoveryMessage>() {
             /** {@inheritDoc} */
-            @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode
snd, DdlAckDiscoveryMessage msg) {
-                onAck(snd, msg);
+            @Override public void onCustomEvent(AffinityTopologyVersion topVer, final ClusterNode
snd,
+                final DdlAckDiscoveryMessage msg) {
+                submitTask(new DdlTask() {
+                    @Override public void run() {
+                        onAck(snd, msg);
+                    }
+                });
             }
         });
 
@@ -115,6 +142,20 @@ public class DdlStatementsProcessor {
     }
 
     /**
+     * Submit a task to {@link #worker} for async execution.
+     *
+     * @param task Task.
+     */
+    private void submitTask(DdlTask task) {
+        DdlWorker worker0 = worker;
+
+        if (worker0 != null)
+            worker0.submit(task);
+        else
+            log.debug("Cannot submit DDL task because worker is null (node is stopping):
" + task);
+    }
+
+    /**
      * Handle {@code ACK} message on a <b>peer node</b> - do local portion of
actual DDL job and notify
      * <b>coordinator</b> about success or failure.
      *
@@ -147,7 +188,7 @@ public class DdlStatementsProcessor {
             ctx.io().send(snd, GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
         }
         catch (Throwable e) {
-            idx.getLogger().error("Failed to notify coordinator about local DLL operation
completion [opId=" +
+            U.error(log, "Failed to notify coordinator about local DLL operation completion
[opId=" +
                 msg.operation().operationId() + ", clientNodeId=" + snd.id() + ']', e);
         }
     }
@@ -232,7 +273,7 @@ public class DdlStatementsProcessor {
             ctx.io().send(args.clientNodeId(), GridTopic.TOPIC_QUERY, res, GridIoPolicy.IDX_POOL);
         }
         catch (IgniteCheckedException e) {
-            idx.getLogger().error("Failed to notify client node about DDL operation failure
" +
+            U.error(log, "Failed to notify client node about DDL operation failure " +
                 "[opId=" + args.operationId() + ", clientNodeId=" + args.clientNodeId() +
']', e);
         }
     }
@@ -248,8 +289,7 @@ public class DdlStatementsProcessor {
         GridFutureAdapter fut = operations.get(opId);
 
         if (fut == null) {
-            idx.getLogger().warning("DDL operation not found at its client [opId=" + opId
+ ", nodeId=" +
-                ctx.localNodeId() + ']');
+            U.warn(log, "DDL operation not found at its client [opId=" + opId + ", nodeId="
+ ctx.localNodeId() + ']');
 
             return;
         }
@@ -280,7 +320,6 @@ public class DdlStatementsProcessor {
      * Exists as a separate method to allow overriding it in tests to check behavior in case
of errors.
      *
      * @param args Operation arguments.
-     * @return Whether this node participates in this operation, or not.
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
@@ -312,6 +351,14 @@ public class DdlStatementsProcessor {
         if (!isStopped.compareAndSet(false, true))
             throw new IgniteCheckedException(new IllegalStateException("DDL processor has
been stopped already"));
 
+        DdlWorker worker0 = worker;
+
+        if (worker0 != null) {
+            worker0.cancel();
+
+            worker = null;
+        }
+
         for (Map.Entry<IgniteUuid, GridFutureAdapter> e : operations.entrySet())
             e.getValue().onDone(new IgniteCheckedException("Operation has been cancelled
[opId=" + e.getKey() +']'));
     }
@@ -421,4 +468,53 @@ public class DdlStatementsProcessor {
     public static boolean isDdlStatement(Prepared cmd) {
         return cmd instanceof CreateIndex || cmd instanceof DropIndex;
     }
+
+    /**
+     * DDL worker.
+     */
+    private class DdlWorker extends GridWorker {
+        /** Worker queue. */
+        private final BlockingQueue<DdlTask> queue = new LinkedBlockingDeque<>();
+
+        /**
+         * Constructor.
+         *
+         * @param gridName Gird name.
+         * @param log Logger.
+         */
+        public DdlWorker(@Nullable String gridName, IgniteLogger log) {
+            super(gridName, "indexing-ddl-worker", log);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException
{
+            while (!isCancelled()) {
+                DdlTask task = queue.take();
+
+                try {
+                    task.run();
+                }
+                catch (Exception e) {
+                    U.error(log, "Unexpected exception during DDL task processing [task="
+ task + ']', e);
+                }
+                catch (Throwable t) {
+                    U.error(log, "Unexpected error during DDL task processing (worker will
be stopped) [task=" +
+                        task + ']', t);
+
+                    throw t;
+                }
+            }
+        }
+
+        /**
+         * Submit task.
+         *
+         * @param task Task.
+         */
+        public void submit(DdlTask task) {
+            assert task != null;
+
+            queue.add(task);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f79ead47/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlTask.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlTask.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlTask.java
new file mode 100644
index 0000000..0211e4f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlTask.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.ddl;
+
+/**
+ * DDL task.
+ */
+public interface DdlTask extends Runnable {
+    // No-op.
+}


Mime
View raw message