ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [08/36] incubator-ignite git commit: ignite-750 - fix
Date Thu, 23 Apr 2015 14:40:15 GMT
ignite-750 - fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/235843e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/235843e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/235843e6

Branch: refs/heads/ignite-gg-9702
Commit: 235843e69f2f1f27458714bc7632cc7d9edfbbaf
Parents: 3fdc824
Author: S.Vladykin <svladykin@gridgain.com>
Authored: Wed Apr 15 17:13:25 2015 +0300
Committer: S.Vladykin <svladykin@gridgain.com>
Committed: Wed Apr 15 17:13:25 2015 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryIndexing.java     |  4 ++-
 .../processors/query/GridQueryProcessor.java    |  2 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  6 ++--
 .../query/h2/twostep/GridMapQueryExecutor.java  | 34 +++++++++++++++++---
 .../h2/twostep/GridReduceQueryExecutor.java     | 34 +++++++++++++++++---
 5 files changed, 67 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 12f774c..fe029eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -23,6 +23,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.indexing.*;
@@ -39,9 +40,10 @@ public interface GridQueryIndexing {
      * Starts indexing.
      *
      * @param ctx Context.
+     * @param busyLock Busy lock.
      * @throws IgniteCheckedException If failed.
      */
-    public void start(GridKernalContext ctx) throws IgniteCheckedException;
+    public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws IgniteCheckedException;
 
     /**
      * Stops indexing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7ce894d..35e8d73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -94,7 +94,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
             execSvc = ctx.getExecutorService();
 
-            idx.start(ctx);
+            idx.start(ctx, busyLock);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7ec1dbe..b05c908 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1109,7 +1109,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
     /** {@inheritDoc} */
     @SuppressWarnings("NonThreadSafeLazyInitialization")
-    @Override public void start(GridKernalContext ctx) throws IgniteCheckedException {
+    @Override public void start(GridKernalContext ctx, GridSpinBusyLock busyLock) throws
IgniteCheckedException {
         if (log.isDebugEnabled())
             log.debug("Starting cache query index...");
 
@@ -1161,8 +1161,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             nodeId = ctx.localNodeId();
             marshaller = ctx.config().getMarshaller();
 
-            mapQryExec = new GridMapQueryExecutor();
-            rdcQryExec = new GridReduceQueryExecutor();
+            mapQryExec = new GridMapQueryExecutor(busyLock);
+            rdcQryExec = new GridReduceQueryExecutor(busyLock);
 
             mapQryExec.start(ctx, this);
             rdcQryExec.start(ctx, this);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 08ad38d..747ccb1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.h2.jdbc.*;
@@ -47,7 +48,7 @@ import static org.apache.ignite.events.EventType.*;
 /**
  * Map query executor.
  */
-public class GridMapQueryExecutor implements GridMessageListener {
+public class GridMapQueryExecutor {
     /** */
     private static final Field RESULT_FIELD;
 
@@ -77,6 +78,16 @@ public class GridMapQueryExecutor implements GridMessageListener {
     /** */
     private ConcurrentMap<UUID, ConcurrentMap<Long, QueryResults>> qryRess =
new ConcurrentHashMap8<>();
 
+    /** */
+    private final GridSpinBusyLock busyLock;
+
+    /**
+     * @param busyLock Busy lock.
+     */
+    public GridMapQueryExecutor(GridSpinBusyLock busyLock) {
+        this.busyLock = busyLock;
+    }
+
     /**
      * @param ctx Context.
      * @param h2 H2 Indexing.
@@ -102,11 +113,26 @@ public class GridMapQueryExecutor implements GridMessageListener {
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
-        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this);
+        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                if (!busyLock.enterBusy())
+                    return;
+
+                try {
+                    GridMapQueryExecutor.this.onMessage(nodeId, msg);
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
     }
 
-    /** {@inheritDoc} */
-    @Override public void onMessage(UUID nodeId, Object msg) {
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    public void onMessage(UUID nodeId, Object msg) {
         try {
             assert msg != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/235843e6/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index b7edd27..7f42e0d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.sql.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
+import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -56,7 +57,7 @@ import java.util.concurrent.atomic.*;
 /**
  * Reduce query executor.
  */
-public class GridReduceQueryExecutor implements GridMessageListener {
+public class GridReduceQueryExecutor {
     /** */
     private GridKernalContext ctx;
 
@@ -100,6 +101,16 @@ public class GridReduceQueryExecutor implements GridMessageListener {
         }
     }
 
+    /** */
+    private final GridSpinBusyLock busyLock;
+
+    /**
+     * @param busyLock Busy lock.
+     */
+    public GridReduceQueryExecutor(GridSpinBusyLock busyLock) {
+        this.busyLock = busyLock;
+    }
+
     /**
      * @param ctx Context.
      * @param h2 H2 Indexing.
@@ -111,7 +122,19 @@ public class GridReduceQueryExecutor implements GridMessageListener {
 
         log = ctx.log(GridReduceQueryExecutor.class);
 
-        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this);
+        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                if (!busyLock.enterBusy())
+                    return;
+
+                try {
+                    GridReduceQueryExecutor.this.onMessage(nodeId, msg);
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
+            }
+        });
 
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(final Event evt) {
@@ -133,8 +156,11 @@ public class GridReduceQueryExecutor implements GridMessageListener {
             " NOBUFFER FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\"");
     }
 
-    /** {@inheritDoc} */
-    @Override public void onMessage(UUID nodeId, Object msg) {
+    /**
+     * @param nodeId Node ID.
+     * @param msg Message.
+     */
+    public void onMessage(UUID nodeId, Object msg) {
         try {
             assert msg != null;
 


Mime
View raw message