ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [1/2] incubator-ignite git commit: # Merged 6.6.3 fixes.
Date Thu, 05 Feb 2015 13:58:53 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/sprint-1 2372947b7 -> bb8b07dce


# Merged 6.6.3 fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a86ae903
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a86ae903
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a86ae903

Branch: refs/heads/sprint-1
Commit: a86ae903e337140ddd18e966921e0de9d70ae79f
Parents: 46160c9
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Feb 5 16:58:14 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Feb 5 16:58:14 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 22 +++----
 .../affinity/GridAffinityAssignmentCache.java   | 23 ++++++-
 .../processors/cache/GridCacheMapEntry.java     | 10 ++-
 .../GridCacheContinuousQueryAdapter.java        | 45 +++++++++-----
 .../GridCacheContinuousQueryHandler.java        | 15 ++++-
 .../continuous/GridContinuousProcessor.java     | 15 ++++-
 .../portable/GridPortableInputStream.java       |  7 +++
 ...dCacheContinuousQueryReplicatedSelfTest.java | 65 ++++++++++++++++++++
 .../hadoop/jobtracker/GridHadoopJobTracker.java |  2 +-
 9 files changed, 160 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 217a2aa..b612465e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1507,15 +1507,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         private void onSegmentation() {
             GridSegmentationPolicy segPlc = ctx.config().getSegmentationPolicy();
 
+            // Always disconnect first.
+            try {
+                getSpi().disconnect();
+            }
+            catch (IgniteSpiException e) {
+                U.error(log, "Failed to disconnect discovery SPI.", e);
+            }
+
             switch (segPlc) {
                 case RESTART_JVM:
-                    try {
-                        getSpi().disconnect();
-                    }
-                    catch (IgniteSpiException e) {
-                        U.error(log, "Failed to disconnect discovery SPI.", e);
-                    }
-
                     U.warn(log, "Restarting JVM according to configured segmentation policy.");
 
                     restartJvm();
@@ -1523,13 +1524,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
                     break;
 
                 case STOP:
-                    try {
-                        getSpi().disconnect();
-                    }
-                    catch (IgniteSpiException e) {
-                        U.error(log, "Failed to disconnect discovery SPI.", e);
-                    }
-
                     U.warn(log, "Stopping local node according to configured segmentation
policy.");
 
                     stopNode();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index cc447ea..42c3b5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
@@ -35,6 +36,8 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+
 /**
  * Affinity cached function.
  */
@@ -121,6 +124,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version to calculate affinity cache for.
      * @param discoEvt Discovery event that caused this topology version change.
      */
+    @SuppressWarnings("IfMayBeConditional")
     public List<List<ClusterNode>> calculate(long topVer, IgniteDiscoveryEvent
discoEvt) {
         if (log.isDebugEnabled())
             log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId()
+
@@ -142,8 +146,23 @@ public class GridAffinityAssignmentCache {
 
         List<List<ClusterNode>> prevAssignment = prev == null ? null : prev.assignment();
 
-        List<List<ClusterNode>> assignment = aff.assignPartitions(
-            new GridCacheAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt, topVer,
backups));
+        List<List<ClusterNode>> assignment;
+
+        if (prevAssignment != null && discoEvt != null) {
+            CacheDistributionMode distroMode = U.distributionMode(discoEvt.eventNode(), ctx.name());
+
+            if (distroMode == null || // no cache on node.
+                distroMode == CLIENT_ONLY || distroMode == NEAR_ONLY)
+                assignment = prevAssignment;
+            else
+                assignment = aff.assignPartitions(new GridCacheAffinityFunctionContextImpl(sorted,
prevAssignment,
+                    discoEvt, topVer, backups));
+        }
+        else
+            assignment = aff.assignPartitions(new GridCacheAffinityFunctionContextImpl(sorted,
prevAssignment, discoEvt,
+                topVer, backups));
+
+        assert assignment != null;
 
         GridAffinityAssignment updated = new GridAffinityAssignment(topVer, assignment);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6aafc5d..96ceb93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1167,8 +1167,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K,
V>
 
             CacheMode mode = cctx.config().getCacheMode();
 
-            if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
-                (tx != null && tx.local() && !isNear()))
+            if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local()
&& !isNear()))
                 cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(),
old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, false);
@@ -1329,8 +1328,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K,
V>
 
                 CacheMode mode = cctx.config().getCacheMode();
 
-                if (mode == CacheMode.LOCAL || mode == CacheMode.REPLICATED ||
-                    (tx != null && tx.local() && !isNear()))
+                if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local()
&& !isNear()))
                     cctx.continuousQueries().onEntryUpdate(this, key, null, null, old, oldBytes,
false);
 
                 cctx.dataStructures().onEntryUpdated(key, true);
@@ -2144,7 +2142,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K,
V>
             if (res)
                 updateMetrics(op, metrics);
 
-            if (primary || cctx.isReplicated())
+            if (primary)
                 cctx.continuousQueries().onEntryUpdate(this, key, val, valueBytesUnlocked(),
old, oldBytes, false);
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
@@ -3143,7 +3141,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K,
V>
                 drReplicate(drType, val, valBytes, ver);
 
                 if (!skipQryNtf) {
-                    if (cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(),
key, topVer)) {
+                    if (cctx.affinity().primary(cctx.localNode(), key, topVer)) {
                         cctx.continuousQueries().onEntryUpdate(this,
                             key,
                             val,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
index ba34d6b..acd96ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAdapter.java
@@ -35,7 +35,7 @@ import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.locks.*;
 
-import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
 
 /**
  * Continuous query implementation.
@@ -228,27 +228,39 @@ public class GridCacheContinuousQueryAdapter<K, V> implements
CacheContinuousQue
         prj = prj.forCacheNodes(ctx.name());
 
         if (prj.nodes().isEmpty())
-            throw new ClusterTopologyCheckedException("Failed to execute query (projection
is empty): " + this);
+            throw new ClusterTopologyCheckedException("Failed to continuous execute query
(projection is empty): " +
+                this);
 
-        CacheMode mode = ctx.config().getCacheMode();
+        boolean skipPrimaryCheck = false;
 
-        if (mode == LOCAL || mode == REPLICATED) {
-            Collection<ClusterNode> nodes = prj.nodes();
+        Collection<ClusterNode> nodes = prj.nodes();
 
-            ClusterNode node = nodes.contains(ctx.localNode()) ? ctx.localNode() : F.rand(nodes);
+        if (nodes.isEmpty())
+            throw new ClusterTopologyCheckedException("Failed to execute continuous query
(empty projection is " +
+                "provided): " + this);
 
-            assert node != null;
+        switch (ctx.config().getCacheMode()) {
+            case LOCAL:
+                if (!nodes.contains(ctx.localNode()))
+                    throw new ClusterTopologyCheckedException("Continuous query for LOCAL
cache can be executed " +
+                        "only locally (provided projection contains remote nodes only): "
+ this);
+                else if (nodes.size() > 1)
+                    U.warn(log, "Continuous query for LOCAL cache will be executed locally
(provided projection is " +
+                        "ignored): " + this);
 
-            if (nodes.size() > 1) {
-                if (node.id().equals(ctx.localNodeId()))
-                    U.warn(log, "Continuous query for " + mode + " cache can be run only
on local node. " +
-                        "Will execute query locally: " + this);
-                else
-                    U.warn(log, "Continuous query for " + mode + " cache can be run only
on single node. " +
-                        "Will execute query on remote node [qry=" + this + ", node=" + node
+ ']');
-            }
+                prj = prj.forNode(ctx.localNode());
 
-            prj = prj.forNode(node);
+                break;
+
+            case REPLICATED:
+                if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode()))
{
+                    CacheDistributionMode distributionMode = ctx.config().getDistributionMode();
+
+                    if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED)
+                        skipPrimaryCheck = true;
+                }
+
+                break;
         }
 
         closeLock.lock();
@@ -271,6 +283,7 @@ public class GridCacheContinuousQueryAdapter<K, V> implements CacheContinuousQue
                 entryLsnr,
                 sync,
                 oldVal,
+                skipPrimaryCheck,
                 taskNameHash,
                 keepPortable);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
index 350b9b8..a03b9db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryHandler.java
@@ -84,6 +84,9 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     /** Keep portable flag. */
     private boolean keepPortable;
 
+    /** Whether to skip primary check for REPLICATED cache. */
+    private transient boolean skipPrimaryCheck;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -103,6 +106,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
      * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}.
      * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}.
      * @param oldVal {@code True} if old value is required.
+     * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
      * @param taskHash Task name hash code.
      */
     GridCacheContinuousQueryHandler(@Nullable String cacheName,
@@ -114,6 +118,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         boolean entryLsnr,
         boolean sync,
         boolean oldVal,
+        boolean skipPrimaryCheck,
         int taskHash,
         boolean keepPortable) {
         assert topic != null;
@@ -131,6 +136,7 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         this.oldVal = oldVal;
         this.taskHash = taskHash;
         this.keepPortable = keepPortable;
+        this.skipPrimaryCheck = skipPrimaryCheck;
     }
 
     /** {@inheritDoc} */
@@ -184,16 +190,21 @@ class GridCacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
             }
 
             @Override public void onEntryUpdate(GridCacheContinuousQueryEntry<K, V>
e, boolean recordEvt) {
+                GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+                if (cctx.isReplicated() && !skipPrimaryCheck && !e.primary())
+                    return;
+
                 boolean notify;
 
-                CacheFlag[] f = cacheContext(ctx).forceLocalRead();
+                CacheFlag[] f = cctx.forceLocalRead();
 
                 try {
                     notify = (prjPred == null || checkProjection(e)) &&
                         (filter == null || filter.apply(e));
                 }
                 finally {
-                    cacheContext(ctx).forceFlags(f);
+                    cctx.forceFlags(f);
                 }
 
                 if (notify) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 93df61f..bc66a2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1276,7 +1276,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
          * @return Object to send or {@code null} if there is nothing to send for now.
          */
         @Nullable Collection<Object> add(@Nullable Object obj) {
-            Collection<Object> toSnd = null;
+            ConcurrentLinkedDeque8 buf0 = null;
 
             if (buf.sizex() >= bufSize - 1) {
                 lock.writeLock().lock();
@@ -1284,7 +1284,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 try {
                     buf.add(obj);
 
-                    toSnd = buf;
+                    buf0 = buf;
 
                     buf = new ConcurrentLinkedDeque8<>();
 
@@ -1306,7 +1306,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 }
             }
 
-            return toSnd != null ? new ArrayList<>(toSnd) : null;
+            Collection<Object> toSnd = null;
+
+            if (buf0 != null) {
+                toSnd = new ArrayList<>(buf0.sizex());
+
+                for (Object o : buf0)
+                    toSnd.add(o);
+            }
+
+            return toSnd;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
index 8ab16f2..3c676bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableInputStream.java
@@ -174,4 +174,11 @@ public interface GridPortableInputStream extends GridPortableStream {
      * @return Remaining data.
      */
     public int remaining();
+
+    /**
+     * Length of data inside array.
+     *
+     * @param len Length.
+     */
+    public void length(int len);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
index 39dfda0..dddda8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
 
 import java.util.*;
 import java.util.concurrent.*;
@@ -45,6 +46,7 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu
     /**
      * @throws Exception If failed.
      */
+    @SuppressWarnings("unchecked")
     public void testRemoteNodeCallback() throws Exception {
         GridCache<Integer, Integer> cache1 = grid(0).cache(null);
 
@@ -79,4 +81,67 @@ public class GridCacheContinuousQueryReplicatedSelfTest extends GridCacheContinu
 
         assertEquals(10, val.get().intValue());
     }
+
+    /**
+     * Ensure that every node see every update.
+     *
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testCrossCallback() throws Exception {
+        // Prepare.
+        GridCache<Integer, Integer> cache1 = grid(0).cache(null);
+        GridCache<Integer, Integer> cache2 = grid(1).cache(null);
+
+        final int key1 = primaryKey(cache1);
+        final int key2 = primaryKey(cache2);
+
+        final CountDownLatch latch1 = new CountDownLatch(2);
+        final CountDownLatch latch2 = new CountDownLatch(2);
+
+
+        // Start query on the first node.
+        CacheContinuousQuery<Integer, Integer> qry1 = cache1.queries().createContinuousQuery();
+
+        qry1.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer,
Integer>>>() {
+            @Override public boolean apply(UUID nodeID,
+                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries)
{
+                for (CacheContinuousQueryEntry entry : entries) {
+                    log.info("Update in cache 1: " + entry);
+
+                    if (entry.getKey() == key1 || entry.getKey() == key2)
+                        latch1.countDown();
+                }
+
+                return latch1.getCount() != 0;
+            }
+        });
+
+        qry1.execute();
+
+        // Start query on the second node.
+        CacheContinuousQuery<Integer, Integer> qry2 = cache2.queries().createContinuousQuery();
+
+        qry2.localCallback(new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer,
Integer>>>() {
+            @Override public boolean apply(UUID nodeID,
+                Collection<CacheContinuousQueryEntry<Integer, Integer>> entries)
{
+                for (CacheContinuousQueryEntry entry : entries) {
+                    log.info("Update in cache 2: " + entry);
+
+                    if (entry.getKey() == key1 || entry.getKey() == key2)
+                        latch2.countDown();
+                }
+
+                return latch2.getCount() != 0;
+            }
+        });
+
+        qry2.execute();
+
+        cache1.put(key1, key1);
+        cache1.put(key2, key2);
+
+        assert latch1.await(LATCH_TIMEOUT, MILLISECONDS);
+        assert latch2.await(LATCH_TIMEOUT, MILLISECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a86ae903/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
index 1b9b4cb..2a771b8 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobTracker.java
@@ -194,7 +194,7 @@ public class GridHadoopJobTracker extends GridHadoopComponent {
                 }
             });
 
-        qry.execute();
+        qry.execute(ctx.kernalContext().grid().forLocal());
 
         ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(final IgniteEvent evt) {


Mime
View raw message