ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: #ignite-710: DataStreamProcessor processRequest should be called in async mode.
Date Wed, 22 Apr 2015 13:19:55 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-710 994340593 -> 0c3087db9


#ignite-710: DataStreamProcessor processRequest should be called in async mode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0c3087db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0c3087db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0c3087db

Branch: refs/heads/ignite-710
Commit: 0c3087db92088d24c7bdc77e33da6531d9e6c204
Parents: 9943405
Author: ivasilinets <ivasilinets@gridgain.com>
Authored: Wed Apr 22 16:19:52 2015 +0300
Committer: ivasilinets <ivasilinets@gridgain.com>
Committed: Wed Apr 22 16:19:52 2015 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamProcessor.java       | 38 +++++++++++++-------
 .../DataStreamerMultiThreadedSelfTest.java      |  2 +-
 2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0c3087db/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index fc7e8fa..9e53bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -174,7 +174,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
      * @param nodeId Sender ID.
      * @param req Request.
      */
-    private void processRequest(UUID nodeId, DataStreamerRequest req) {
+    private void processRequest(final UUID nodeId, final DataStreamerRequest req) {
         if (!busyLock.enterBusy()) {
             if (log.isDebugEnabled())
                 log.debug("Ignoring data load request (node is stopping): " + req);
@@ -186,6 +186,31 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
             if (log.isDebugEnabled())
                 log.debug("Processing data load request: " + req);
 
+            AffinityTopologyVersion locAffVer = ctx.cache().context().exchange().readyAffinityVersion();
+            AffinityTopologyVersion rmtAffVer = req.topologyVersion();
+
+            if (locAffVer.compareTo(rmtAffVer) < 0) {
+                if (log.isDebugEnabled())
+                    log.debug("Received request has higher affinity topology version [request="
+ req +
+                        ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']');
+
+                IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
+
+                if (fut != null && !fut.isDone()) {
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) {
+                            ctx.closure().runLocalSafe(new Runnable() {
+                                @Override public void run() {
+                                    processRequest(nodeId, req);
+                                }
+                            }, false);
+                        }
+                    });
+
+                    return;
+                }
+            }
+
             Object topic;
 
             try {
@@ -252,17 +277,6 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
             Exception err = null;
 
             try {
-                AffinityTopologyVersion locAffVer = ctx.cache().context().exchange().readyAffinityVersion();
-                AffinityTopologyVersion rmtAffVer = req.topologyVersion();
-
-                if (locAffVer.compareTo(rmtAffVer) < 0) {
-                    if (log.isDebugEnabled())
-                        log.debug("Received request has higher affinity topology version
[request=" + req +
-                            ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']');
-
-                    ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer).get();
-                }
-
                 job.call();
             }
             catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0c3087db/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
index 231580a..5eedd8d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerMultiThreadedSelfTest.java
@@ -72,7 +72,7 @@ public class DataStreamerMultiThreadedSelfTest extends GridCommonAbstractTest
{
 
             Set<IgniteFuture> futs = new HashSet<>();
 
-            try (final DataStreamerImpl<Object, Object> dataLdr = (DataStreamerImpl)ignite.dataStreamer(null))
{
+            try (final DataStreamerImpl dataLdr = (DataStreamerImpl)ignite.dataStreamer(null))
{
                 dataLdr.maxRemapCount(0);
 
                 final AtomicInteger igniteId = new AtomicInteger(1);


Mime
View raw message