ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6
Date Wed, 10 Jun 2015 16:27:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
new file mode 100644
index 0000000..9dc2227
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StopRoutineDiscoveryMessage extends AbstractContinuousMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param routineId Routine id.
+     */
+    public StopRoutineDiscoveryMessage(UUID routineId) {
+        super(routineId);
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+        return new StopRoutineAckDiscoveryMessage(routineId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StopRoutineDiscoveryMessage.class, this, "routineId", routineId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
index 50e9ab9..dc9d025 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerCacheUpdaters.java
@@ -160,7 +160,7 @@ public class DataStreamerCacheUpdaters {
     /**
      * Batched updater. Updates cache using batch operations thus is dead lock prone.
      */
-    private static class BatchedSorted<K, V> implements StreamReceiver<K, V>, InternalUpdater {
+    private static class BatchedSorted<K, V> implements StreamReceiver<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 40716be..26b0568 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -204,19 +204,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 // Remap regular mappings.
                 final Buffer buf = bufMappings.remove(id);
 
+                // Only async notification is possible since
+                // discovery thread may be trapped otherwise.
                 if (buf != null) {
-                    // Only async notification is possible since
-                    // discovery thread may be trapped otherwise.
-                    ctx.closure().callLocalSafe(
-                        new Callable<Object>() {
-                            @Override public Object call() throws Exception {
-                                buf.onNodeLeft();
-
-                                return null;
-                            }
-                        },
-                        true /* system pool */
-                    );
+                    waitAffinityAndRun(new Runnable() {
+                        @Override public void run() {
+                            buf.onNodeLeft();
+                        }
+                    }, discoEvt.topologyVersion(), true);
                 }
             }
         };
@@ -254,6 +249,31 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * @param c Closure to run.
+     * @param topVer Topology version to wait for.
+     * @param async Async flag.
+     */
+    private void waitAffinityAndRun(final Runnable c, long topVer, boolean async) {
+        AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, 0);
+
+        IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer0);
+
+        if (fut != null && !fut.isDone()) {
+            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> fut) {
+                    ctx.closure().runLocalSafe(c, true);
+                }
+            });
+        }
+        else {
+            if (async)
+                ctx.closure().runLocalSafe(c, true);
+            else
+                c.run();
+        }
+    }
+
+    /**
      * @return Cache object context.
      */
     public CacheObjectContext cacheObjectContext() {
@@ -533,6 +553,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         boolean initPda = ctx.deploy().enabled() && jobPda == null;
 
+        AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
         for (DataStreamerEntry entry : entries) {
             List<ClusterNode> nodes;
 
@@ -549,7 +571,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     initPda = false;
                 }
 
-                nodes = nodes(key);
+                nodes = nodes(key, topVer);
             }
             catch (IgniteCheckedException e) {
                 resFut.onDone(e);
@@ -627,10 +649,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 }
             };
 
-            GridFutureAdapter<?> f;
+            final GridFutureAdapter<?> f;
 
             try {
-                f = buf.update(entriesForNode, lsnr);
+                f = buf.update(entriesForNode, topVer, lsnr);
             }
             catch (IgniteInterruptedCheckedException e1) {
                 resFut.onDone(e1);
@@ -639,30 +661,38 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             if (ctx.discovery().node(nodeId) == null) {
-                if (bufMappings.remove(nodeId, buf))
-                    buf.onNodeLeft();
+                if (bufMappings.remove(nodeId, buf)) {
+                    final Buffer buf0 = buf;
+
+                    waitAffinityAndRun(new Runnable() {
+                        @Override public void run() {
+                            buf0.onNodeLeft();
 
-                if (f != null)
-                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
-                        "(node has left): " + nodeId));
+                            if (f != null)
+                                f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                                    "(node has left): " + nodeId));
+                        }
+                    }, ctx.discovery().topologyVersion(), false);
+                }
             }
         }
     }
 
     /**
      * @param key Key to map.
+     * @param topVer Topology version.
      * @return Nodes to send requests to.
      * @throws IgniteCheckedException If failed.
      */
-    private List<ClusterNode> nodes(KeyCacheObject key) throws IgniteCheckedException {
+    private List<ClusterNode> nodes(KeyCacheObject key, AffinityTopologyVersion topVer) throws IgniteCheckedException {
         GridAffinityProcessor aff = ctx.affinity();
 
         List<ClusterNode> res = null;
 
         if (!allowOverwrite())
-            res = aff.mapKeyToPrimaryAndBackups(cacheName, key);
+            res = aff.mapKeyToPrimaryAndBackups(cacheName, key, topVer);
         else {
-            ClusterNode node = aff.mapKeyToNode(cacheName, key);
+            ClusterNode node = aff.mapKeyToNode(cacheName, key, topVer);
 
             if (node != null)
                 res = Collections.singletonList(node);
@@ -965,11 +995,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         /**
          * @param newEntries Infos.
+         * @param topVer Topology version.
          * @param lsnr Listener for the operation future.
          * @throws IgniteInterruptedCheckedException If failed.
          * @return Future for operation.
          */
         @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries,
+            AffinityTopologyVersion topVer,
             IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
             List<DataStreamerEntry> entries0 = null;
             GridFutureAdapter<Object> curFut0;
@@ -992,7 +1024,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             if (entries0 != null) {
-                submit(entries0, curFut0);
+                submit(entries0, topVer, curFut0);
 
                 if (cancelled)
                     curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
@@ -1029,7 +1061,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             }
 
             if (entries0 != null)
-                submit(entries0, curFut0);
+                submit(entries0, null, curFut0);
 
             // Create compound future for this flush.
             GridCompoundFuture<Object, Object> res = null;
@@ -1074,10 +1106,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         /**
          * @param entries Entries to submit.
+         * @param topVer Topology version.
          * @param curFut Current future.
          * @throws IgniteInterruptedCheckedException If interrupted.
          */
-        private void submit(final Collection<DataStreamerEntry> entries, final GridFutureAdapter<Object> curFut)
+        private void submit(final Collection<DataStreamerEntry> entries,
+            @Nullable AffinityTopologyVersion topVer,
+            final GridFutureAdapter<Object> curFut)
             throws IgniteInterruptedCheckedException {
             assert entries != null;
             assert !entries.isEmpty();
@@ -1166,6 +1201,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                 reqs.put(reqId, (GridFutureAdapter<Object>)fut);
 
+                if (topVer == null)
+                    topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
                 DataStreamerRequest req = new DataStreamerRequest(
                     reqId,
                     topicBytes,
@@ -1180,7 +1218,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     dep != null ? dep.participants() : null,
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
-                    ctx.cache().context().exchange().readyAffinityVersion());
+                    topVer);
 
                 try {
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
@@ -1428,6 +1466,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     cctx.evicts().touch(entry, topVer);
 
                     CU.unwindEvicts(cctx);
+
+                    entry.onUnlock();
                 }
                 catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
                     // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 72911af..aa3bfe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.datastructures;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -32,6 +33,7 @@ import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import javax.cache.event.*;
 import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
@@ -40,7 +42,6 @@ import java.util.concurrent.*;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
 import static org.apache.ignite.cache.CacheRebalanceMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*;
 import static org.apache.ignite.transactions.TransactionConcurrency.*;
 import static org.apache.ignite.transactions.TransactionIsolation.*;
@@ -99,6 +100,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** */
     private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache;
 
+    /** */
+    private volatile UUID qryId;
+
     /**
      * @param ctx Context.
      */
@@ -112,7 +116,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() {
+    @Override public void onKernalStart() throws IgniteCheckedException {
         if (ctx.config().isDaemon())
             return;
 
@@ -139,11 +143,35 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
             seqView = atomicsCache;
 
-            dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
+            dsCacheCtx = atomicsCache.context();
         }
     }
 
     /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void startQuery() throws IgniteCheckedException {
+        if (qryId == null) {
+            synchronized (this) {
+                if (qryId == null) {
+                    qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+                        new DataStructuresEntryFilter(),
+                        dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+                        false);
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (qryId != null)
+            dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
+    }
+
+    /**
      * Gets a sequence from cache or creates one if it's not cached.
      *
      * @param name Sequence name.
@@ -161,6 +189,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() {
             @Override public IgniteAtomicSequence applyx() throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -287,6 +317,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() {
             @Override public IgniteAtomicLong applyx() throws IgniteCheckedException {
                 final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -490,6 +522,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() {
             @Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -591,6 +625,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() {
             @Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException {
                 GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name);
@@ -899,6 +935,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         checkAtomicsConfiguration();
 
+        startQuery();
+
         return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() {
             @Override public IgniteCountDownLatch applyx() throws IgniteCheckedException {
                 GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -906,8 +944,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 dsCacheCtx.gate().enter();
 
                 try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheCountDownLatchValue val = cast(dsView.get(key),
-                        GridCacheCountDownLatchValue.class);
+                    GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
 
                     // Check that count down hasn't been created in other thread yet.
                     GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);
@@ -1034,28 +1071,46 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Transaction committed callback for transaction manager.
      *
-     * @param tx Committed transaction.
      */
-    public <K, V> void onTxCommitted(IgniteInternalTx tx) {
-        if (dsCacheCtx == null)
-            return;
+    static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
 
-        if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) {
-            Collection<IgniteTxEntry> entries = tx.writeEntries();
+        /** {@inheritDoc} */
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+            if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
+                return evt.getValue() instanceof GridCacheCountDownLatchValue;
+            else {
+                assert evt.getEventType() == EventType.REMOVED : evt;
 
-            if (log.isDebugEnabled())
-                log.debug("Committed entries: " + entries);
+                return true;
+            }
+        }
 
-            for (IgniteTxEntry entry : entries) {
-                // Check updated or created GridCacheInternalKey keys.
-                if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key().internal()) {
-                    GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructuresEntryFilter.class, this);
+        }
+    }
 
-                    Object val0 = CU.value(entry.value(), entry.context(), false);
+    /**
+     *
+     */
+    private class DataStructuresEntryListener implements
+        CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(
+            Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+            throws CacheEntryListenerException
+        {
+            for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
+                if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
+                    GridCacheInternal val0 = evt.getValue();
 
                     if (val0 instanceof GridCacheCountDownLatchValue) {
+                        GridCacheInternalKey key = evt.getKey();
+
                         // Notify latch on changes.
                         GridCacheRemovable latch = dsMap.get(key);
 
@@ -1067,8 +1122,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                             latch0.onUpdate(val.get());
 
                             if (val.get() == 0 && val.autoDelete()) {
-                                entry.cached().markObsolete(dsCacheCtx.versions().next());
-
                                 dsMap.remove(key);
 
                                 latch.onRemoved();
@@ -1080,11 +1133,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                 ", actual=" + latch.getClass() + ", value=" + latch + ']');
                         }
                     }
+
                 }
+                else {
+                    assert evt.getEventType() == EventType.REMOVED : evt;
 
-                // Check deleted GridCacheInternal keys.
-                if (entry.op() == DELETE && entry.key().internal()) {
-                    GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
+                    GridCacheInternal key = evt.getKey();
 
                     // Entry's val is null if entry deleted.
                     GridCacheRemovable obj = dsMap.remove(key);
@@ -1094,6 +1148,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 }
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructuresEntryListener.class, this);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
index 65cb48d..5fd6c81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
@@ -98,5 +98,5 @@ public interface HadoopJob {
     /**
      * Cleans up the job staging directory.
      */
-    void cleanupStagingDirectory();
+    public void cleanupStagingDirectory();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..3d2ee17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,13 +21,14 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Task context.
  */
 public abstract class HadoopTaskContext {
     /** */
-    private final HadoopJob job;
+    protected final HadoopJob job;
 
     /** */
     private HadoopTaskInput input;
@@ -187,4 +188,15 @@ public abstract class HadoopTaskContext {
      * @throws IgniteCheckedException If failed.
      */
     public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+    /**
+     * Executes a callable on behalf of the job owner.
+     * In case of embedded task execution the implementation of this method
+     * will use classes loaded by the ClassLoader this HadoopTaskContext loaded with.
+     * @param c The callable.
+     * @param <T> The return type of the Callable.
+     * @return The result of the callable.
+     * @throws IgniteCheckedException On any error in callable.
+     */
+    public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 7c1a837..361f75f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -48,8 +48,12 @@ public interface IgfsEx extends IgniteFileSystem {
     /** Property name for URI of file system. */
     public static final String SECONDARY_FS_URI = "SECONDARY_FS_URI";
 
-    /** Property name for user name of file system. */
-    public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
+    /** Property name for default user name of file system.
+     * NOTE: for secondary file system this is just a default user name, which is used
+     * when the 2ndary filesystem is used outside of any user context.
+     * If another user name is set in the context, 2ndary file system will work on behalf
+     * of that user, which is different from the default. */
+     public static final String SECONDARY_FS_USER_NAME = "SECONDARY_FS_USER_NAME";
 
     /**
      * Stops IGFS cleaning all used resources.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 34636d2..c3495e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -245,8 +245,12 @@ public final class IgfsImpl implements IgfsEx {
             for (IgfsFileWorkerBatch batch : workerMap.values())
                 batch.cancel();
 
-            if (secondaryFs instanceof AutoCloseable)
-                U.closeQuiet((AutoCloseable)secondaryFs);
+            try {
+                secondaryFs.close();
+            }
+            catch (Exception e) {
+                log.error("Failed to close secondary file system.", e);
+            }
         }
 
         igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 8a8b858..cfe6ed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -51,10 +51,10 @@ class IgfsIpcHandler implements IgfsServerHandler {
     private final int bufSize; // Buffer size. Must not be less then file block size.
 
     /** IGFS instance for this handler. */
-    private IgfsEx igfs;
+    private final IgfsEx igfs;
 
     /** Resource ID generator. */
-    private AtomicLong rsrcIdGen = new AtomicLong();
+    private final AtomicLong rsrcIdGen = new AtomicLong();
 
     /** Stopping flag. */
     private volatile boolean stopping;
@@ -241,138 +241,148 @@ class IgfsIpcHandler implements IgfsServerHandler {
      * @return Response message.
      * @throws IgniteCheckedException If failed.
      */
-    private IgfsMessage processPathControlRequest(IgfsClientSession ses, IgfsIpcCommand cmd,
+    private IgfsMessage processPathControlRequest(final IgfsClientSession ses, final IgfsIpcCommand cmd,
         IgfsMessage msg) throws IgniteCheckedException {
-        IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
+        final IgfsPathControlRequest req = (IgfsPathControlRequest)msg;
 
         if (log.isDebugEnabled())
             log.debug("Processing path control request [igfsName=" + igfs.name() + ", req=" + req + ']');
 
-        IgfsControlResponse res = new IgfsControlResponse();
+        final IgfsControlResponse res = new IgfsControlResponse();
+
+        final String userName = req.userName();
+
+        assert userName != null;
 
         try {
-            switch (cmd) {
-                case EXISTS:
-                    res.response(igfs.exists(req.path()));
+            IgfsUserContext.doAs(userName, new IgniteOutClosure<Object>() {
+                @Override public Void apply() {
+                    switch (cmd) {
+                        case EXISTS:
+                            res.response(igfs.exists(req.path()));
 
-                    break;
+                            break;
 
-                case INFO:
-                    res.response(igfs.info(req.path()));
+                        case INFO:
+                            res.response(igfs.info(req.path()));
 
-                    break;
+                            break;
 
-                case PATH_SUMMARY:
-                    res.response(igfs.summary(req.path()));
+                        case PATH_SUMMARY:
+                            res.response(igfs.summary(req.path()));
 
-                    break;
+                            break;
 
-                case UPDATE:
-                    res.response(igfs.update(req.path(), req.properties()));
+                        case UPDATE:
+                            res.response(igfs.update(req.path(), req.properties()));
 
-                    break;
+                            break;
 
-                case RENAME:
-                    igfs.rename(req.path(), req.destinationPath());
+                        case RENAME:
+                            igfs.rename(req.path(), req.destinationPath());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case DELETE:
-                    res.response(igfs.delete(req.path(), req.flag()));
+                        case DELETE:
+                            res.response(igfs.delete(req.path(), req.flag()));
 
-                    break;
+                            break;
 
-                case MAKE_DIRECTORIES:
-                    igfs.mkdirs(req.path(), req.properties());
+                        case MAKE_DIRECTORIES:
+                            igfs.mkdirs(req.path(), req.properties());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case LIST_PATHS:
-                    res.paths(igfs.listPaths(req.path()));
+                        case LIST_PATHS:
+                            res.paths(igfs.listPaths(req.path()));
 
-                    break;
+                            break;
 
-                case LIST_FILES:
-                    res.files(igfs.listFiles(req.path()));
+                        case LIST_FILES:
+                            res.files(igfs.listFiles(req.path()));
 
-                    break;
+                            break;
 
-                case SET_TIMES:
-                    igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+                        case SET_TIMES:
+                            igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
 
-                    res.response(true);
+                            res.response(true);
 
-                    break;
+                            break;
 
-                case AFFINITY:
-                    res.locations(igfs.affinity(req.path(), req.start(), req.length()));
+                        case AFFINITY:
+                            res.locations(igfs.affinity(req.path(), req.start(), req.length()));
 
-                    break;
+                            break;
 
-                case OPEN_READ: {
-                    IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
-                        igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
+                        case OPEN_READ: {
+                            IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
+                                igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
 
-                    long streamId = registerResource(ses, igfsIn);
+                            long streamId = registerResource(ses, igfsIn);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
-                        igfsIn.fileInfo().modificationTime());
+                            IgfsFileInfo info = new IgfsFileInfo(igfsIn.fileInfo(), null,
+                                igfsIn.fileInfo().modificationTime());
 
-                    res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
+                            res.response(new IgfsInputStreamDescriptor(streamId, info.length()));
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_CREATE: {
-                    long streamId = registerResource(ses, igfs.create(
-                        req.path(),       // Path.
-                        bufSize,          // Buffer size.
-                        req.flag(),       // Overwrite if exists.
-                        affinityKey(req), // Affinity key based on replication factor.
-                        req.replication(),// Replication factor.
-                        req.blockSize(),  // Block size.
-                        req.properties()  // File properties.
-                    ));
+                        case OPEN_CREATE: {
+                            long streamId = registerResource(ses, igfs.create(
+                                req.path(),       // Path.
+                                bufSize,          // Buffer size.
+                                req.flag(),       // Overwrite if exists.
+                                affinityKey(req), // Affinity key based on replication factor.
+                                req.replication(),// Replication factor.
+                                req.blockSize(),  // Block size.
+                                req.properties()  // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file create [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                case OPEN_APPEND: {
-                    long streamId = registerResource(ses, igfs.append(
-                        req.path(),        // Path.
-                        bufSize,           // Buffer size.
-                        req.flag(),        // Create if absent.
-                        req.properties()   // File properties.
-                    ));
+                        case OPEN_APPEND: {
+                            long streamId = registerResource(ses, igfs.append(
+                                req.path(),        // Path.
+                                bufSize,           // Buffer size.
+                                req.flag(),        // Create if absent.
+                                req.properties()   // File properties.
+                            ));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
-                            req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Opened IGFS output stream for file append [igfsName=" + igfs.name() + ", path=" +
+                                    req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
 
-                    res.response(streamId);
+                            res.response(streamId);
 
-                    break;
-                }
+                            break;
+                        }
 
-                default:
-                    assert false : "Unhandled path control request command: " + cmd;
+                        default:
+                            assert false : "Unhandled path control request command: " + cmd;
 
-                    break;
-            }
+                            break;
+                    }
+
+                    return null;
+                }
+            });
         }
         catch (IgniteException e) {
             throw new IgniteCheckedException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index e33e0d4..b98c5d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -669,7 +669,7 @@ public class IgfsMetaManager extends IgfsManager {
     private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx) throws IgniteCheckedException {
         assert fileId != null;
 
-        IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singletonList(fileId)).get(fileId) :
+        IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singleton(fileId)).get(fileId) :
             id2InfoPrj.get(fileId);
 
         return info == null ? Collections.<String, IgfsListingEntry>emptyMap() : info.listing();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 683b317..44ee90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -30,14 +30,14 @@ import java.util.*;
  */
 class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     /** Delegate. */
-    private final IgfsImpl igfs;
+    private final IgfsEx igfs;
 
     /**
      * Constructor.
      *
      * @param igfs Delegate.
      */
-    IgfsSecondaryFileSystemImpl(IgfsImpl igfs) {
+    IgfsSecondaryFileSystemImpl(IgfsEx igfs) {
         this.igfs = igfs;
     }
 
@@ -118,4 +118,9 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
     @Override public Map<String, String> properties() {
         return Collections.emptyMap();
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
index 253d5be..caa6866 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java
@@ -239,13 +239,13 @@ public class IgfsServer {
      */
     private class ClientWorker extends GridWorker {
         /** Connected client endpoint. */
-        private IpcEndpoint endpoint;
+        private final IpcEndpoint endpoint;
 
         /** Data output stream. */
         private final IgfsDataOutputStream out;
 
         /** Client session object. */
-        private IgfsClientSession ses;
+        private final IgfsClientSession ses;
 
         /** Queue node for fast unlink. */
         private ConcurrentLinkedDeque8.Node<ClientWorker> node;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 4b0234f..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
 
@@ -88,4 +90,18 @@ public class IgfsUtils {
     private IgfsUtils() {
         // No-op.
     }
+
+    /**
+     * Provides non-null user name.
+     * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
+     * which is the current process owner user.
+     * @param user a user name to be fixed.
+     * @return non-null interned user name.
+     */
+    public static String fixUserName(@Nullable String user) {
+        if (F.isEmpty(user))
+           user = FileSystemConfiguration.DFLT_USER_NAME;
+
+        return user;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7043e33..f7b4b0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -121,7 +121,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     if (F.isEmpty(meta.getValueType()))
                         throw new IgniteCheckedException("Value type is not set: " + meta);
 
-                    TypeDescriptor desc = new TypeDescriptor(ccfg);
+                    TypeDescriptor desc = new TypeDescriptor();
 
                     Class<?> valCls = U.classForName(meta.getValueType(), null);
 
@@ -160,7 +160,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     Class<?> keyCls = clss[i];
                     Class<?> valCls = clss[i + 1];
 
-                    TypeDescriptor desc = processKeyAndValueClasses(ccfg, keyCls, valCls);
+                    TypeDescriptor desc = processKeyAndValueClasses(keyCls, valCls);
 
                     addTypeByName(ccfg, desc);
                     types.put(new TypeId(ccfg.getName(), valCls), desc);
@@ -188,15 +188,17 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param ccfg Cache configuration.
      * @param keyCls Key class.
      * @param valCls Value class.
      * @return Type descriptor.
      * @throws IgniteCheckedException If failed.
      */
-    private TypeDescriptor processKeyAndValueClasses(CacheConfiguration<?,?> ccfg, Class<?> keyCls, Class<?> valCls)
+    private TypeDescriptor processKeyAndValueClasses(
+        Class<?> keyCls,
+        Class<?> valCls
+    )
         throws IgniteCheckedException {
-        TypeDescriptor d = new TypeDescriptor(ccfg);
+        TypeDescriptor d = new TypeDescriptor();
 
         d.keyClass(keyCls);
         d.valueClass(valCls);
@@ -318,7 +320,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to rebuild indexes (grid is stopping).");
 
         try {
-            return rebuildIndexes(space, typesByName.get(new TypeName(space, valTypeName)));
+            return rebuildIndexes(
+                space,
+                typesByName.get(
+                    new TypeName(
+                        space,
+                        valTypeName)));
         }
         finally {
             busyLock.leaveBusy();
@@ -539,7 +546,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
-            return idx.queryTwoStep(ctx.cache().internalCache(space).context(), qry);
+            return idx.queryTwoStep(
+                ctx.cache().internalCache(space).context(),
+                qry);
         }
         finally {
             busyLock.leaveBusy();
@@ -589,59 +598,62 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param qry Query.
      * @return Cursor.
      */
-    public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(GridCacheContext<?,?> cctx, SqlQuery qry) {
+    public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(final GridCacheContext<?, ?> cctx, final SqlQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
-            String space = cctx.name();
-            String type = qry.getType();
-            String sqlQry = qry.getSql();
-            Object[] params = qry.getArgs();
-
-            TypeDescriptor typeDesc = typesByName.get(new TypeName(space, type));
-
-            if (typeDesc == null || !typeDesc.registered())
-                throw new CacheException("Failed to find SQL table for type: " + type);
-
-            final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc,
-                idx.backupFilter());
-
-            if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                ctx.event().record(new CacheQueryExecutedEvent<>(
-                    ctx.discovery().localNode(),
-                    "SQL query executed.",
-                    EVT_CACHE_QUERY_EXECUTED,
-                    CacheQueryType.SQL.name(),
-                    null,
-                    null,
-                    sqlQry,
-                    null,
-                    null,
-                    params,
-                    null,
-                    null));
-            }
-
-            return new ClIter<Cache.Entry<K,V>>() {
-                @Override public void close() throws Exception {
-                    i.close();
-                }
-
-                @Override public boolean hasNext() {
-                    return i.hasNext();
-                }
-
-                @Override public Cache.Entry<K,V> next() {
-                    IgniteBiTuple<K,V> t = i.next();
-
-                    return new CacheEntryImpl<>(t.getKey(), t.getValue());
-                }
-
-                @Override public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
+            return executeQuery(
+                cctx,
+                new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() {
+                    @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
+                        String space = cctx.name();
+                        String type = qry.getType();
+                        String sqlQry = qry.getSql();
+                        Object[] params = qry.getArgs();
+
+                        TypeDescriptor typeDesc = typesByName.get(
+                            new TypeName(
+                                space,
+                                type));
+
+                        if (typeDesc == null || !typeDesc.registered())
+                            throw new CacheException("Failed to find SQL table for type: " + type);
+
+                        final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.query(
+                            space,
+                            sqlQry,
+                            F.asList(params),
+                            typeDesc,
+                            idx.backupFilter());
+
+                        sendQueryExecutedEvent(
+                            sqlQry,
+                            params);
+
+                        return new ClIter<Cache.Entry<K, V>>() {
+                            @Override public void close() throws Exception {
+                                i.close();
+                            }
+
+                            @Override public boolean hasNext() {
+                                return i.hasNext();
+                            }
+
+                            @Override public Cache.Entry<K, V> next() {
+                                IgniteBiTuple<K, V> t = i.next();
+
+                                return new CacheEntryImpl<>(
+                                    t.getKey(),
+                                    t.getValue());
+                            }
+
+                            @Override public void remove() {
+                                throw new UnsupportedOperationException();
+                            }
+                        };
+                    }
+                });
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -652,6 +664,28 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param sqlQry Sql query.
+     * @param params Params.
+     */
+    private void sendQueryExecutedEvent(String sqlQry, Object[] params) {
+        if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+            ctx.event().record(new CacheQueryExecutedEvent<>(
+                ctx.discovery().localNode(),
+                "SQL query executed.",
+                EVT_CACHE_QUERY_EXECUTED,
+                CacheQueryType.SQL.name(),
+                null,
+                null,
+                sqlQry,
+                null,
+                null,
+                params,
+                null,
+                null));
+        }
+    }
+
+    /**
      * @return Message factory for {@link GridIoManager}.
      */
     public MessageFactory messageFactory() {
@@ -670,39 +704,29 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param qry Query.
      * @return Iterator.
      */
-    public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?> cctx, SqlFieldsQuery qry) {
+    public QueryCursor<List<?>> queryLocalFields(final GridCacheContext<?,?> cctx, final SqlFieldsQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
-            String space = cctx.name();
-            String sql = qry.getSql();
-            Object[] args = qry.getArgs();
-
-            GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
-
-            if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
-                ctx.event().record(new CacheQueryExecutedEvent<>(
-                        ctx.discovery().localNode(),
-                        "SQL query executed.",
-                        EVT_CACHE_QUERY_EXECUTED,
-                        CacheQueryType.SQL.name(),
-                        null,
-                        null,
-                        sql,
-                        null,
-                        null,
-                        args,
-                        null,
-                        null));
-            }
+            return executeQuery(cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
+                @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
+                    String space = cctx.name();
+                    String sql = qry.getSql();
+                    Object[] args = qry.getArgs();
 
-            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
-                new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+                    GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter());
 
-            cursor.fieldsMeta(res.metaData());
+                    sendQueryExecutedEvent(sql, args);
 
-            return cursor;
+                    QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
+                        new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepPortable()));
+
+                    cursor.fieldsMeta(res.metaData());
+
+                    return cursor;
+                }
+            });
         }
         catch (IgniteCheckedException e) {
             throw new CacheException(e);
@@ -793,7 +817,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             if (type == null || !type.registered())
                 throw new CacheException("Failed to find SQL table for type: " + resType);
 
-            return idx.queryText(space, clause, type, filters);
+            return idx.queryText(
+                space,
+                clause,
+                type,
+                filters);
         }
         finally {
             busyLock.leaveBusy();
@@ -808,7 +836,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @return Field rows.
      * @throws IgniteCheckedException If failed.
      */
-    public <K, V> GridQueryFieldsResult queryFields(@Nullable String space, String clause, Collection<Object> params,
+    public GridQueryFieldsResult queryFields(@Nullable String space, String clause, Collection<Object> params,
         IndexingQueryFilter filters) throws IgniteCheckedException {
         checkEnabled();
 
@@ -837,7 +865,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         if (ctx.indexing().enabled()) {
             CacheObjectContext coctx = cacheObjectContext(spaceName);
 
-            ctx.indexing().onSwap(spaceName, key.value(coctx, false));
+            ctx.indexing().onSwap(
+                spaceName,
+                key.value(
+                    coctx,
+                    false));
         }
 
         if (idx == null)
@@ -847,7 +879,9 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to process swap event (grid is stopping).");
 
         try {
-            idx.onSwap(spaceName, key);
+            idx.onSwap(
+                spaceName,
+                key);
         }
         finally {
             busyLock.leaveBusy();
@@ -1067,7 +1101,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         assert valCls != null;
 
         for (Map.Entry<String, Class<?>> entry : meta.getAscendingFields().entrySet()) {
-            ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue());
+            ClassProperty prop = buildClassProperty(
+                keyCls,
+                valCls,
+                entry.getKey(),
+                entry.getValue());
 
             d.addProperty(prop, false);
 
@@ -1079,7 +1117,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
 
         for (Map.Entry<String, Class<?>> entry : meta.getDescendingFields().entrySet()) {
-            ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue());
+            ClassProperty prop = buildClassProperty(
+                keyCls,
+                valCls,
+                entry.getKey(),
+                entry.getValue());
 
             d.addProperty(prop, false);
 
@@ -1091,7 +1133,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
 
         for (String txtIdx : meta.getTextFields()) {
-            ClassProperty prop = buildClassProperty(keyCls, valCls, txtIdx, String.class);
+            ClassProperty prop = buildClassProperty(
+                keyCls,
+                valCls,
+                txtIdx,
+                String.class);
 
             d.addProperty(prop, false);
 
@@ -1109,7 +1155,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 int order = 0;
 
                 for (Map.Entry<String, IgniteBiTuple<Class<?>, Boolean>> idxField : idxFields.entrySet()) {
-                    ClassProperty prop = buildClassProperty(keyCls, valCls, idxField.getKey(), idxField.getValue().get1());
+                    ClassProperty prop = buildClassProperty(
+                        keyCls,
+                        valCls,
+                        idxField.getKey(),
+                        idxField.getValue().get1());
 
                     d.addProperty(prop, false);
 
@@ -1123,7 +1173,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         }
 
         for (Map.Entry<String, Class<?>> entry : meta.getQueryFields().entrySet()) {
-            ClassProperty prop = buildClassProperty(keyCls, valCls, entry.getKey(), entry.getValue());
+            ClassProperty prop = buildClassProperty(
+                keyCls,
+                valCls,
+                entry.getKey(),
+                entry.getValue());
 
             d.addProperty(prop, false);
         }
@@ -1231,7 +1285,11 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      */
     private static ClassProperty buildClassProperty(Class<?> keyCls, Class<?> valCls, String pathStr, Class<?> resType)
         throws IgniteCheckedException {
-        ClassProperty res = buildClassProperty(true, keyCls, pathStr, resType);
+        ClassProperty res = buildClassProperty(
+            true,
+            keyCls,
+            pathStr,
+            resType);
 
         if (res == null) // We check key before value consistently with PortableProperty.
             res = buildClassProperty(false, valCls, pathStr, resType);
@@ -1330,6 +1388,59 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param cctx Cache context.
+     * @param clo Closure.
+     */
+    private <R> R executeQuery(GridCacheContext<?,?> cctx, IgniteOutClosureX<R> clo)
+        throws IgniteCheckedException {
+        final long start = U.currentTimeMillis();
+
+        Throwable err = null;
+
+        R res = null;
+
+        try {
+            res = clo.apply();
+
+            return res;
+        }
+        catch (GridClosureException e) {
+            err = e.unwrap();
+
+            throw (IgniteCheckedException)err;
+        }
+        finally {
+            GridCacheQueryMetricsAdapter metrics = (GridCacheQueryMetricsAdapter)cctx.queries().metrics();
+
+            onExecuted(cctx, metrics, res, err, start, U.currentTimeMillis() - start, log);
+        }
+    }
+
+    /**
+     * @param cctx Cctx.
+     * @param metrics Metrics.
+     * @param res Result.
+     * @param err Err.
+     * @param startTime Start time.
+     * @param duration Duration.
+     * @param log Logger.
+     */
+    public static void onExecuted(GridCacheContext<?, ?> cctx, GridCacheQueryMetricsAdapter metrics,
+        Object res, Throwable err, long startTime, long duration, IgniteLogger log) {
+        boolean fail = err != null;
+
+        // Update own metrics.
+        metrics.onQueryExecute(duration, fail);
+
+        // Update metrics in query manager.
+        cctx.queries().onMetricsUpdate(duration, fail);
+
+        if (log.isTraceEnabled())
+            log.trace("Query execution finished [startTime=" + startTime +
+                    ", duration=" + duration + ", fail=" + (err != null) + ", res=" + res + ']');
+    }
+
+    /**
      *
      */
     private abstract static class Property {
@@ -1538,9 +1649,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      */
     private static class TypeDescriptor implements GridQueryTypeDescriptor {
         /** */
-        private CacheConfiguration<?,?> ccfg;
-
-        /** */
         private String name;
 
         /** Value field names and types with preserved order. */
@@ -1571,13 +1679,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
         private boolean registered;
 
         /**
-         * @param ccfg Cache configuration.
-         */
-        private TypeDescriptor(CacheConfiguration<?,?> ccfg) {
-            this.ccfg = ccfg;
-        }
-
-        /**
          * @return {@code True} if type registration in SPI was finished and type was not rejected.
          */
         boolean registered() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 22d1ff0..64eb1c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
@@ -69,7 +70,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new ConcurrentHashMap8<>();
 
     /** Deployment executor service. */
-    private final ExecutorService depExe = Executors.newSingleThreadExecutor();
+    private final ExecutorService depExe;
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -97,6 +98,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
      */
     public GridServiceProcessor(GridKernalContext ctx) {
         super(ctx);
+
+        depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.gridName(), "srvc-deploy"));
     }
 
     /** {@inheritDoc} */
@@ -128,10 +131,10 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 ctx.cache().context().deploy().ignoreOwnership(true);
 
             cfgQryId = cache.context().continuousQueries().executeInternalQuery(
-                new DeploymentListener(), null, true, true);
+                new DeploymentListener(), null, cache.context().affinityNode(), true);
 
             assignQryId = cache.context().continuousQueries().executeInternalQuery(
-                new AssignmentListener(), null, true, true);
+                new AssignmentListener(), null, cache.context().affinityNode(), true);
         }
         finally {
             if (ctx.deploy().enabled())
@@ -345,7 +348,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                                 "different configuration) [deployed=" + dep.configuration() + ", new=" + cfg + ']'));
                         }
                         else {
-                            for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+                            Iterator<Cache.Entry<Object, Object>> it = serviceEntries(
+                                ServiceAssignmentsPredicate.INSTANCE);
+
+                            while (it.hasNext()) {
+                                Cache.Entry<Object, Object> e = it.next();
+
                                 if (e.getKey() instanceof GridServiceAssignmentsKey) {
                                     GridServiceAssignments assigns = (GridServiceAssignments)e.getValue();
 
@@ -437,7 +445,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     public IgniteInternalFuture<?> cancelAll() {
         Collection<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
-        for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+        Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
+
+        while (it.hasNext()) {
+            Cache.Entry<Object, Object> e = it.next();
+
             if (!(e.getKey() instanceof GridServiceDeploymentKey))
                 continue;
 
@@ -456,7 +468,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     public Collection<ServiceDescriptor> serviceDescriptors() {
         Collection<ServiceDescriptor> descs = new ArrayList<>();
 
-        for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+        Iterator<Cache.Entry<Object, Object>> it = serviceEntries(ServiceDeploymentPredicate.INSTANCE);
+
+        while (it.hasNext()) {
+            Cache.Entry<Object, Object> e = it.next();
+
             if (!(e.getKey() instanceof GridServiceDeploymentKey))
                 continue;
 
@@ -904,6 +920,43 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param p Entry predicate used to execute query from client node.
+     * @return Service deployment entries.
+     */
+    @SuppressWarnings("unchecked")
+    private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) {
+        if (!cache.context().affinityNode()) {
+            ClusterNode oldestSrvNode =
+                CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE);
+
+            if (oldestSrvNode == null)
+                return F.emptyIterator();
+
+            GridCacheQueryManager qryMgr = cache.context().queries();
+
+            CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, false);
+
+            qry.keepAll(false);
+
+            qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
+
+            return cache.context().itHolder().iterator(qry.execute(),
+                new CacheIteratorConverter<Object, Map.Entry<Object,Object>>() {
+                    @Override protected Object convert(Map.Entry<Object, Object> e) {
+                        return new CacheEntryImpl<>(e.getKey(), e.getValue());
+                    }
+
+                    @Override protected void remove(Object item) {
+                        throw new UnsupportedOperationException();
+                    }
+                }
+            );
+        }
+        else
+            return cache.entrySetx().iterator();
+    }
+
+    /**
      * Service deployment listener.
      */
     private class DeploymentListener implements CacheEntryUpdatedListener<Object, Object> {
@@ -1045,18 +1098,24 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             try {
                 depExe.submit(new BusyRunnable() {
                     @Override public void run0() {
-                        long topVer = ((DiscoveryEvent)evt).topologyVersion();
+                        AffinityTopologyVersion topVer =
+                            new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion());
 
-                        ClusterNode oldest = U.oldest(ctx.discovery().nodes(topVer), null);
+                        ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
 
-                        if (oldest.isLocal()) {
+                        if (oldest != null && oldest.isLocal()) {
                             final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
 
                             if (ctx.deploy().enabled())
                                 ctx.cache().context().deploy().ignoreOwnership(true);
 
                             try {
-                                for (Cache.Entry<Object, Object> e : cache.entrySetx()) {
+                                Iterator<Cache.Entry<Object, Object>> it = serviceEntries(
+                                    ServiceDeploymentPredicate.INSTANCE);
+
+                                while (it.hasNext()) {
+                                    Cache.Entry<Object, Object> e = it.next();
+
                                     if (!(e.getKey() instanceof GridServiceDeploymentKey))
                                         continue;
 
@@ -1068,7 +1127,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                                         ctx.cache().internalCache(UTILITY_CACHE_NAME).context().affinity().
                                             affinityReadyFuture(topVer).get();
 
-                                        reassign(dep, topVer);
+                                        reassign(dep, topVer.topologyVersion());
                                     }
                                     catch (IgniteCheckedException ex) {
                                         if (!(e instanceof ClusterTopologyCheckedException))
@@ -1085,7 +1144,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                             }
 
                             if (!retries.isEmpty())
-                                onReassignmentFailed(topVer, retries);
+                                onReassignmentFailed(topVer.topologyVersion(), retries);
                         }
 
                         // Clean up zombie assignments.
@@ -1265,4 +1324,46 @@ public class GridServiceProcessor extends GridProcessorAdapter {
          */
         public abstract void run0();
     }
+
+    /**
+     *
+     */
+    static class ServiceDeploymentPredicate implements IgniteBiPredicate<Object, Object> {
+        /** */
+        static final ServiceDeploymentPredicate INSTANCE = new ServiceDeploymentPredicate();
+
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Object key, Object val) {
+            return key instanceof GridServiceDeploymentKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ServiceDeploymentPredicate.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class ServiceAssignmentsPredicate implements IgniteBiPredicate<Object, Object> {
+        /** */
+        static final ServiceAssignmentsPredicate INSTANCE = new ServiceAssignmentsPredicate();
+
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(Object key, Object val) {
+            return key instanceof GridServiceAssignmentsKey;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ServiceAssignmentsPredicate.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
new file mode 100644
index 0000000..a0fd9b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.timeout;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+
+/**
+ * Wrapper for {@link IgniteSpiTimeoutObject}.
+ */
+public class GridSpiTimeoutObject implements GridTimeoutObject {
+    /** */
+    @GridToStringInclude
+    private final IgniteSpiTimeoutObject obj;
+
+    /**
+     * @param obj SPI object.
+     */
+    public GridSpiTimeoutObject(IgniteSpiTimeoutObject obj) {
+        this.obj = obj;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        obj.onTimeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid timeoutId() {
+        return obj.id();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return obj.endTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        assert false;
+
+        return super.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        assert false;
+
+        return super.equals(obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final String toString() {
+        return S.toString(GridSpiTimeoutObject.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 81ff72b..e4f370c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -21,11 +21,14 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.thread.*;
 
+import java.io.*;
 import java.util.*;
 
 /**
@@ -40,10 +43,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() {
             /** {@inheritDoc} */
             @Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) {
-                long time1 = o1.endTime();
-                long time2 = o2.endTime();
+                int res = Long.compare(o1.endTime(), o2.endTime());
 
-                return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId());
+                if (res != 0)
+                    return res;
+
+                return o1.timeoutId().compareTo(o2.timeoutId());
             }
         });
 
@@ -98,6 +103,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Schedule the specified timer task for execution at the specified
+     * time with the specified period, in milliseconds.
+     *
+     * @param task Task to execute.
+     * @param delay Delay to first execution in milliseconds.
+     * @param period Period for execution in milliseconds or -1.
+     * @return Cancelable to cancel task.
+     */
+    public CancelableTask schedule(Runnable task, long delay, long period) {
+        assert delay >= 0 : delay;
+        assert period > 0 || period == -1 : period;
+
+        CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
+
+        addTimeoutObject(obj);
+
+        return obj;
+    }
+
+    /**
      * @param timeoutObj Timeout object.
      */
     public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
@@ -173,4 +198,78 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() + ']');
         X.println(">>>   timeoutObjsSize: " + timeoutObjs.size());
     }
+
+    /**
+     *
+     */
+    public class CancelableTask implements GridTimeoutObject, Closeable {
+        /** */
+        private final IgniteUuid id = IgniteUuid.randomUuid();
+
+        /** */
+        private long endTime;
+
+        /** */
+        private final long period;
+
+        /** */
+        private volatile boolean cancel;
+
+        /** */
+        @GridToStringInclude
+        private final Runnable task;
+
+        /**
+         * @param task Task to execute.
+         * @param firstTime First time.
+         * @param period Period.
+         */
+        CancelableTask(Runnable task, long firstTime, long period) {
+            this.task = task;
+            endTime = firstTime;
+            this.period = period;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void onTimeout() {
+            if (cancel)
+                return;
+
+            try {
+                task.run();
+            }
+            finally {
+                if (!cancel && period > 0) {
+                    endTime = U.currentTimeMillis() + period;
+
+                    addTimeoutObject(this);
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            cancel = true;
+
+            synchronized (this) {
+                // Just waiting for task execution end to make sure that task will not be executed anymore.
+                removeTimeoutObject(this);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CancelableTask.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
index 1d1e022..f8ee265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/transactions/IgniteTxRollbackCheckedException.java
@@ -36,6 +36,15 @@ public class IgniteTxRollbackCheckedException extends IgniteCheckedException {
     }
 
     /**
+     * Creates new exception with given nested exception.
+     *
+     * @param cause Nested exception.
+     */
+    public IgniteTxRollbackCheckedException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
      * Creates new rollback exception with given error message and optional nested exception.
      *
      * @param msg Error message.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
index bff26ec..42fe089 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridJavaProcess.java
@@ -128,25 +128,31 @@ public final class GridJavaProcess {
         gjProc.log = log;
         gjProc.procKilledC = procKilledC;
 
-        String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
-        String classpath = System.getProperty("java.class.path");
-        String sfcp = System.getProperty("surefire.test.class.path");
-
-        if (sfcp != null)
-            classpath += System.getProperty("path.separator") + sfcp;
-
-        if (cp != null)
-            classpath += System.getProperty("path.separator") + cp;
-
         List<String> procParams = params == null || params.isEmpty() ?
             Collections.<String>emptyList() : Arrays.asList(params.split(" "));
 
         List<String> procCommands = new ArrayList<>();
 
+        String javaBin = System.getProperty("java.home") + File.separator + "bin" + File.separator + "java";
+
         procCommands.add(javaBin);
         procCommands.addAll(jvmArgs == null ? U.jvmArgs() : jvmArgs);
-        procCommands.add("-cp");
-        procCommands.add(classpath);
+
+        if (!jvmArgs.contains("-cp") && !jvmArgs.contains("-classpath")) {
+            String classpath = System.getProperty("java.class.path");
+
+            String sfcp = System.getProperty("surefire.test.class.path");
+
+            if (sfcp != null)
+                classpath += System.getProperty("path.separator") + sfcp;
+
+            if (cp != null)
+                classpath += System.getProperty("path.separator") + cp;
+
+            procCommands.add("-cp");
+            procCommands.add(classpath);
+        }
+
         procCommands.add(clsName);
         procCommands.addAll(procParams);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index fb9ad29..f8caf22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -241,8 +241,8 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
             lsnr.apply(this);
         }
         catch (IllegalStateException e) {
-            U.warn(null, "Failed to notify listener (is grid stopped?) [fut=" + this +
-                ", lsnr=" + lsnr + ", err=" + e.getMessage() + ']');
+            U.error(null, "Failed to notify listener (is grid stopped?) [fut=" + this +
+                ", lsnr=" + lsnr + ", err=" + e.getMessage() + ']', e);
         }
         catch (RuntimeException | Error e) {
             U.error(null, "Failed to notify listener: " + lsnr, e);



Mime
View raw message