ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [27/50] ignite git commit: Implemented "onQueryRequest" handling.
Date Mon, 14 Aug 2017 10:30:37 GMT
Implemented "onQueryRequest" handling.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ccdd179a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ccdd179a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ccdd179a

Branch: refs/heads/ignite-5991-6019
Commit: ccdd179a8563c7585104982c536402b18e5ef367
Parents: 8c45273
Author: devozerov <vozerov@gridgain.com>
Authored: Mon Aug 14 10:48:07 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Mon Aug 14 10:48:07 2017 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  | 49 ++++++++++++++++++++
 .../h2/twostep/lazy/MapQueryLazyWorker.java     | 20 ++++++--
 2 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ccdd179a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 3e2eb8b..6b11a47 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
@@ -59,6 +60,7 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
 import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyIgniteThread;
 import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorker;
+import org.apache.ignite.internal.processors.query.h2.twostep.lazy.MapQueryLazyWorkerKey;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
@@ -70,6 +72,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.thread.IgniteThread;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
@@ -504,6 +507,22 @@ public class GridMapQueryExecutor {
             lazy);
     }
 
+    /** Lazy workers. */
+    private final ConcurrentHashMap<MapQueryLazyWorkerKey, MapQueryLazyWorker> lazyWorkers
= new ConcurrentHashMap<>();
+
+    /**
+     * Unregister lazy worker if needed (i.e. if we are currently in laze worker thread).
+     */
+    private void unregisterLazyWorkerIfNeeded() {
+        MapQueryLazyWorker worker = MapQueryLazyWorker.currentWorker();
+
+        if (worker != null) {
+            worker.stop();
+
+            lazyWorkers.remove(worker.key(), worker);
+        }
+    }
+
     /**
      * @param node Node authored request.
      * @param reqId Request ID.
@@ -537,6 +556,30 @@ public class GridMapQueryExecutor {
         Object[] params,
         boolean lazy
     ) {
+        if (lazy && MapQueryLazyWorker.currentWorker() == null) {
+            // Lazy queries must be re-submitted to dedicated workers.
+            MapQueryLazyWorkerKey key = new MapQueryLazyWorkerKey(node.id(), reqId, segmentId);
+            MapQueryLazyWorker worker = new MapQueryLazyWorker(ctx.igniteInstanceName(),
key, log);
+
+            if (busyLock.enterBusy()) {
+                try {
+                    MapQueryLazyWorker oldWorker = lazyWorkers.put(key, worker);
+
+                    if (oldWorker != null)
+                        oldWorker.stop();
+
+                    IgniteThread thread = new IgniteThread(worker);
+
+                    thread.start();
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
+            }
+
+            return;
+        }
+
         // Prepare to run queries.
         GridCacheContext<?, ?> mainCctx =
             !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) :
null;
@@ -551,6 +594,9 @@ public class GridMapQueryExecutor {
             if (topVer != null) {
                 // Reserve primary for topology version or explicit partitions.
                 if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+                    // Unregister lazy worker because re-try may never reach this node again.
+                    unregisterLazyWorkerIfNeeded();
+
                     sendRetry(node, reqId, segmentId);
 
                     return;
@@ -661,6 +707,9 @@ public class GridMapQueryExecutor {
                 qr.cancel(false);
             }
 
+            // Unregister worker after possible cancellation.
+            unregisterLazyWorkerIfNeeded();
+
             if (X.hasCause(e, GridH2RetryException.class))
                 sendRetry(node, reqId, segmentId);
             else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ccdd179a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorker.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorker.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorker.java
index 0c4079b..172e1ce 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorker.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/lazy/MapQueryLazyWorker.java
@@ -35,6 +35,9 @@ public class MapQueryLazyWorker extends GridWorker {
     /** Task to be executed. */
     private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
 
+    /** Key. */
+    private final MapQueryLazyWorkerKey key;
+
     /**
      * Constructor.
      *
@@ -44,6 +47,8 @@ public class MapQueryLazyWorker extends GridWorker {
      */
     public MapQueryLazyWorker(@Nullable String instanceName, MapQueryLazyWorkerKey key, IgniteLogger
log) {
         super(instanceName, workerName(key), log);
+
+        this.key = key;
     }
 
     /** {@inheritDoc} */
@@ -64,6 +69,13 @@ public class MapQueryLazyWorker extends GridWorker {
     }
 
     /**
+     * @return Worker key.
+     */
+    public MapQueryLazyWorkerKey key() {
+        return key;
+    }
+
+    /**
      * Stop the worker.
      */
     public void stop() {
@@ -78,12 +90,10 @@ public class MapQueryLazyWorker extends GridWorker {
     }
 
     /**
-     * @return Lazy worker thread flag.
+     * @return Current worker or {@code null} if call is performed not from lazy worker thread.
      */
-    public static boolean isLazyWorkerThread() {
-        MapQueryLazyWorker worker = LAZY_WORKER.get();
-
-        return worker != null;
+    @Nullable public static MapQueryLazyWorker currentWorker() {
+        return LAZY_WORKER.get();
     }
 
     /**


Mime
View raw message