ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [42/50] [abbrv] incubator-ignite git commit: # ignite-901 client reconnect support
Date Thu, 16 Jul 2015 13:43:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dd04bf4..daa9494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -153,21 +153,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
-                for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) {
-                    StartFuture fut = itr.next();
-
-                    itr.remove();
-
-                    fut.onDone(new IgniteException("Topology segmented"));
-                }
-
-                for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) {
-                    StopFuture fut = itr.next();
-
-                    itr.remove();
-
-                    fut.onDone(new IgniteException("Topology segmented"));
-                }
+                cancelFutures(new IgniteCheckedException("Topology segmented"));
             }
         }, EVT_NODE_SEGMENTED);
 
@@ -263,6 +249,27 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param e Error.
+     */
+    private void cancelFutures(IgniteCheckedException e) {
+        for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) {
+            StartFuture fut = itr.next();
+
+            itr.remove();
+
+            fut.onDone(e);
+        }
+
+        for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) {
+            StopFuture fut = itr.next();
+
+            itr.remove();
+
+            fut.onDone(e);
+        }
+    }
+
+    /**
      * @return {@code true} if lock successful, {@code false} if processor already stopped.
      */
     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
@@ -318,27 +325,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
-        if (!nodeId.equals(ctx.localNodeId())) {
+        if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
             DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
 
-            // Collect listeners information (will be sent to
-            // joining node during discovery process).
+            // Collect listeners information (will be sent to joining node during discovery process).
             for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
                 UUID routineId = e.getKey();
                 LocalRoutineInfo info = e.getValue();
 
-                data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
-                    info.hnd, info.bufSize, info.interval));
+                data.addItem(new DiscoveryDataItem(routineId,
+                    info.prjPred,
+                    info.hnd,
+                    info.bufSize,
+                    info.interval,
+                    info.autoUnsubscribe));
             }
 
             return data;
         }
-        else
-            return null;
+
+        return null;
     }
 
     /** {@inheritDoc} */
-    @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable obj) {
+    @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
         DiscoveryData data = (DiscoveryData)obj;
 
         if (!ctx.isDaemon() && data != null) {
@@ -377,6 +387,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * Callback invoked when cache is started.
      *
      * @param ctx Cache context.
+     * @throws IgniteCheckedException If failed.
      */
     public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException {
         for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) {
@@ -491,7 +502,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Register routine locally.
-        locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval));
+        locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
 
         StartFuture fut = new StartFuture(ctx, routineId);
 
@@ -500,7 +511,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         try {
             ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
         }
-        catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
+        catch (IgniteCheckedException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
             startFuts.remove(routineId);
 
             locInfos.remove(routineId);
@@ -565,7 +576,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             // Unregister handler locally.
             unregisterHandler(routineId, routine.hnd, true);
 
-            ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+            try {
+                ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+            }
+            catch (IgniteCheckedException e) {
+                fut.onDone(e);
+            }
 
             if (ctx.isStopping())
                 fut.onDone();
@@ -580,6 +596,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param obj Notification object.
      * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
      * @param sync If {@code true} then waits for event acknowledgment.
+     * @param msg If {@code true} then sent data is message.
      * @throws IgniteCheckedException In case of error.
      */
     public void addNotification(UUID nodeId,
@@ -630,6 +647,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
+
+        for (UUID rmtId : rmtInfos.keySet())
+            unregisterRemote(rmtId);
+
+        rmtInfos.clear();
+
+        clientInfos.clear();
+    }
+
     /**
      * @param nodeId Node ID.
      * @param routineId Routine ID.
@@ -637,6 +666,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      * @param toSnd Notification object to send.
      * @param orderedTopic Topic for ordered notifications.
      *      If {@code null}, non-ordered message will be sent.
+     * @param msg If {@code true} then sent data is collection of messages.
      * @throws IgniteCheckedException In case of error.
      */
     private void sendNotification(UUID nodeId,
@@ -703,8 +733,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 assert old == null;
             }
 
-            clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(),
-                data.interval()));
+            clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
+                hnd,
+                data.bufferSize(),
+                data.interval(),
+                data.autoUnsubscribe()));
         }
 
         boolean registered = false;
@@ -1022,14 +1055,22 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         /** Time interval. */
         private final long interval;
 
+        /** Automatic unsubscribe flag. */
+        private boolean autoUnsubscribe;
+
         /**
          * @param prjPred Projection predicate.
          * @param hnd Continuous routine handler.
          * @param bufSize Buffer size.
          * @param interval Interval.
+         * @param autoUnsubscribe Automatic unsubscribe flag.
          */
-        LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, int bufSize,
-            long interval) {
+        LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred,
+            GridContinuousHandler hnd,
+            int bufSize,
+            long interval,
+            boolean autoUnsubscribe)
+        {
             assert hnd != null;
             assert bufSize > 0;
             assert interval >= 0;
@@ -1038,6 +1079,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             this.hnd = hnd;
             this.bufSize = bufSize;
             this.interval = interval;
+            this.autoUnsubscribe = autoUnsubscribe;
         }
 
         /**
@@ -1046,6 +1088,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         GridContinuousHandler handler() {
             return hnd;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(LocalRoutineInfo.class, this);
+        }
     }
 
     /**
@@ -1053,7 +1100,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      */
     private static class RemoteRoutineInfo {
         /** Master node ID. */
-        private final UUID nodeId;
+        private UUID nodeId;
 
         /** Continuous routine handler. */
         private final GridContinuousHandler hnd;
@@ -1205,6 +1252,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
             return F.t(toSnd, diff < interval ? interval - diff : interval);
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(RemoteRoutineInfo.class, this);
+        }
     }
 
     /**
@@ -1221,6 +1273,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         @GridToStringInclude
         private Collection<DiscoveryDataItem> items;
 
+        /** */
         private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
 
         /**
@@ -1232,6 +1285,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         /**
          * @param nodeId Node ID.
+         * @param clientInfos Client information.
          */
         DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
             assert nodeId != null;
@@ -1308,9 +1362,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
          * @param hnd Handler.
          * @param bufSize Buffer size.
          * @param interval Time interval.
+         * @param autoUnsubscribe Automatic unsubscribe flag.
          */
-        DiscoveryDataItem(UUID routineId, @Nullable IgnitePredicate<ClusterNode> prjPred,
-            GridContinuousHandler hnd, int bufSize, long interval) {
+        DiscoveryDataItem(UUID routineId,
+            @Nullable IgnitePredicate<ClusterNode> prjPred,
+            GridContinuousHandler hnd,
+            int bufSize,
+            long interval,
+            boolean autoUnsubscribe)
+        {
             assert routineId != null;
             assert hnd != null;
             assert bufSize > 0;
@@ -1321,6 +1381,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             this.hnd = hnd;
             this.bufSize = bufSize;
             this.interval = interval;
+            this.autoUnsubscribe = autoUnsubscribe;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 54478f8..4f75e0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.stream.*;
 import org.apache.ignite.thread.*;
@@ -63,13 +64,15 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
     public DataStreamProcessor(GridKernalContext ctx) {
         super(ctx);
 
-        ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof DataStreamerRequest;
+        if (!ctx.clientNode()) {
+            ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    assert msg instanceof DataStreamerRequest;
 
-                processRequest(nodeId, (DataStreamerRequest)msg);
-            }
-        });
+                    processRequest(nodeId, (DataStreamerRequest)msg);
+                }
+            });
+        }
 
         marsh = ctx.config().getMarshaller();
     }
@@ -113,7 +116,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
-        ctx.io().removeMessageListener(TOPIC_DATASTREAM);
+        if (!ctx.clientNode())
+            ctx.io().removeMessageListener(TOPIC_DATASTREAM);
 
         busyLock.block();
 
@@ -139,6 +143,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
             log.debug("Stopped data streamer processor.");
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        for (DataStreamerImpl<?, ?> ldr : ldrs)
+            ldr.onDisconnected(reconnectFut);
+    }
+
     /**
      * @param cacheName Cache name ({@code null} for default cache).
      * @return Data loader.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 26b0568..605f478 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -145,6 +145,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
+    /** */
+    private CacheException disconnectErr;
+
     /** Closed flag. */
     private final AtomicBoolean closed = new AtomicBoolean();
 
@@ -245,7 +248,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         fut = new DataStreamerFuture(this);
 
-        publicFut = new IgniteFutureImpl<>(fut);
+        publicFut = new IgniteCacheFutureImpl<>(fut);
     }
 
     /**
@@ -284,8 +287,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
      * Enters busy lock.
      */
     private void enterBusy() {
-        if (!busyLock.enterBusy())
+        if (!busyLock.enterBusy()) {
+            if (disconnectErr != null)
+                throw disconnectErr;
+
             throw new IllegalStateException("Data streamer has been closed.");
+        }
     }
 
     /**
@@ -435,7 +442,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             load0(entries0, resFut, keys, 0);
 
-            return new IgniteFutureImpl<>(resFut);
+            return new IgniteCacheFutureImpl<>(resFut);
         }
         catch (IgniteException e) {
             return new IgniteFinishedFutureImpl<>(e);
@@ -487,7 +494,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
             load0(entries, resFut, keys, 0);
 
-            return new IgniteFutureImpl<>(resFut);
+            return new IgniteCacheFutureImpl<>(resFut);
         }
         catch (Throwable e) {
             resFut.onDone(e);
@@ -631,6 +638,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                             resFut.onDone();
                         }
                     }
+                    catch (IgniteClientDisconnectedCheckedException e1) {
+                        if (log.isDebugEnabled())
+                            log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+                        resFut.onDone(e1);
+                    }
                     catch (IgniteCheckedException e1) {
                         if (log.isDebugEnabled())
                             log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
@@ -757,6 +770,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     try {
                         fut.get();
                     }
+                    catch (IgniteClientDisconnectedCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to flush buffer: " + e);
+
+                        throw CU.convertToCacheException(e);
+                    }
                     catch (IgniteCheckedException e) {
                         if (log.isDebugEnabled())
                             log.debug("Failed to flush buffer: " + e);
@@ -802,7 +821,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             doFlush();
         }
         catch (IgniteCheckedException e) {
-            throw GridCacheUtils.convertToCacheException(e);
+            throw CU.convertToCacheException(e);
         }
         finally {
             leaveBusy();
@@ -843,7 +862,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             closeEx(cancel);
         }
         catch (IgniteCheckedException e) {
-            throw GridCacheUtils.convertToCacheException(e);
+            throw CU.convertToCacheException(e);
         }
     }
 
@@ -852,6 +871,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
      * @throws IgniteCheckedException If failed.
      */
     public void closeEx(boolean cancel) throws IgniteCheckedException {
+        closeEx(cancel, null);
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @param err Error.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException {
         if (!closed.compareAndSet(false, true))
             return;
 
@@ -868,7 +896,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 cancelled = true;
 
                 for (Buffer buf : bufMappings.values())
-                    buf.cancelAll();
+                    buf.cancelAll(err);
             }
             else
                 doFlush();
@@ -881,13 +909,29 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
             e = e0;
         }
 
-        fut.onDone(null, e);
+        fut.onDone(null, e != null ? e : err);
 
         if (e != null)
             throw e;
     }
 
     /**
+     * @param reconnectFut Reconnect future.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+            "Data streamer has been closed, client node disconnected.");
+
+        disconnectErr = (CacheException)CU.convertToCacheException(err);
+
+        for (Buffer buf : bufMappings.values())
+            buf.cancelAll(err);
+
+        closeEx(true, err);
+    }
+
+    /**
      * @return {@code true} If the loader is closed.
      */
     boolean isClosed() {
@@ -1027,7 +1071,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                 submit(entries0, topVer, curFut0);
 
                 if (cancelled)
-                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
+                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+                        DataStreamerImpl.this));
+                else if (ctx.clientDisconnected())
+                    curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                        "Client node disconnected."));
             }
 
             return curFut0;
@@ -1227,11 +1275,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                         log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
                 }
                 catch (IgniteCheckedException e) {
-                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
-                        ((GridFutureAdapter<Object>)fut).onDone(e);
-                    else
-                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
-                            "request (node has left): " + node.id()));
+                    GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut);
+
+                    try {
+                        if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+                            fut0.onDone(e);
+                        else
+                            fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): "
+                                + node.id()));
+                    }
+                    catch (IgniteClientDisconnectedCheckedException e0) {
+                        fut0.onDone(e0);
+                    }
                 }
             }
         }
@@ -1304,10 +1359,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         }
 
         /**
-         *
+         * @param err Error.
          */
-        void cancelAll() {
-            IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
+        void cancelAll(@Nullable IgniteCheckedException err) {
+            if (err == null)
+                err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
 
             for (IgniteInternalFuture<?> f : locFuts) {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 5c171e8..57b16f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -185,6 +185,32 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param key Key.
+     * @param obj Object.
+     */
+    void onRemoved(GridCacheInternal key, GridCacheRemovable obj) {
+        dsMap.remove(key, obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+        for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet()) {
+            GridCacheRemovable obj = e.getValue();
+
+            if (clusterRestarted) {
+                obj.onRemoved();
+
+                dsMap.remove(e.getKey(), obj);
+            }
+            else
+                obj.needCheckNotRemoved();
+        }
+
+        for (GridCacheContext cctx : ctx.cache().context().cacheContexts())
+            cctx.dataStructures().onReconnected(clusterRestarted);
+    }
+
+    /**
      * Gets a sequence from cache or creates one if it's not cached.
      *
      * @param name Sequence name.
@@ -1001,8 +1027,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                         dsView.put(key, val);
                     }
 
-                    latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(),
-                        val.autoDelete(), key, cntDownLatchView, dsCacheCtx);
+                    latch = new GridCacheCountDownLatchImpl(name, val.initialCount(),
+                        val.autoDelete(),
+                        key,
+                        cntDownLatchView,
+                        dsCacheCtx);
 
                     dsMap.put(key, latch);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index 5e9245d..1d6e735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -57,6 +57,9 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     /** Removed flag.*/
     private volatile boolean rmvd;
 
+    /** Check removed flag. */
+    private boolean rmvCheck;
+
     /** Atomic long key. */
     private GridCacheInternalKey key;
 
@@ -336,7 +339,31 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
      */
     private void checkRemoved() throws IllegalStateException {
         if (rmvd)
-            throw new IllegalStateException("Atomic long was removed from cache: " + name);
+            throw removedError();
+
+        if (rmvCheck) {
+            try {
+                rmvd = atomicView.get(key) == null;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+
+            rmvCheck = false;
+
+            if (rmvd) {
+                ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+                throw removedError();
+            }
+        }
+    }
+
+    /**
+     * @return Error.
+     */
+    private IllegalStateException removedError() {
+        return new IllegalStateException("Atomic long was removed from cache: " + name);
     }
 
     /** {@inheritDoc} */
@@ -345,8 +372,8 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
     }
 
     /** {@inheritDoc} */
-    @Override public void onInvalid(@Nullable Exception err) {
-        // No-op.
+    @Override public void needCheckNotRemoved() {
+        rmvCheck = true;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 0c4e5e6..f740c4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.concurrent.*;
@@ -56,6 +55,9 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
     /** Status.*/
     private volatile boolean rmvd;
 
+    /** Check removed flag. */
+    private boolean rmvCheck;
+
     /** Atomic reference key. */
     private GridCacheInternalKey key;
 
@@ -156,8 +158,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
     }
 
     /** {@inheritDoc} */
-    @Override public void onInvalid(@Nullable Exception err) {
-        // No-op.
+    @Override public void needCheckNotRemoved() {
+        rmvCheck = true;
     }
 
     /** {@inheritDoc} */
@@ -293,7 +295,31 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
      */
     private void checkRemoved() throws IllegalStateException {
         if (rmvd)
-            throw new IllegalStateException("Atomic reference was removed from cache: " + name);
+            throw removedError();
+
+        if (rmvCheck) {
+            try {
+                rmvd = atomicView.get(key) == null;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+
+            rmvCheck = false;
+
+            if (rmvd) {
+                ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+                throw removedError();
+            }
+        }
+    }
+
+    /**
+     * @return Error.
+     */
+    private IllegalStateException removedError() {
+        return new IllegalStateException("Atomic reference was removed from cache: " + name);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 2400a7e..31f4f24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -61,6 +61,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     /** Removed flag. */
     private volatile boolean rmvd;
 
+    /** Check removed flag. */
+    private boolean rmvCheck;
+
     /** Sequence key. */
     private GridCacheInternalKey key;
 
@@ -391,7 +394,31 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
      */
     private void checkRemoved() throws IllegalStateException {
         if (rmvd)
-            throw new IllegalStateException("Sequence was removed from cache: " + name);
+            throw removedError();
+
+        if (rmvCheck) {
+            try {
+                rmvd = seqView.get(key) == null;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+
+            rmvCheck = false;
+
+            if (rmvd) {
+                ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+                throw removedError();
+            }
+        }
+    }
+
+    /**
+     * @return Error.
+     */
+    private IllegalStateException removedError() {
+        return new IllegalStateException("Sequence was removed from cache: " + name);
     }
 
     /** {@inheritDoc} */
@@ -400,8 +427,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
     }
 
     /** {@inheritDoc} */
-    @Override public void onInvalid(@Nullable Exception err) {
-        // No-op.
+    @Override public void needCheckNotRemoved() {
+        rmvCheck = true;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 76ea7ca..d2dedeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -59,6 +59,9 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
     /** Removed flag.*/
     private volatile boolean rmvd;
 
+    /** Check removed flag. */
+    private boolean rmvCheck;
+
     /** Atomic stamped key. */
     private GridCacheInternalKey key;
 
@@ -206,8 +209,8 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
     }
 
     /** {@inheritDoc} */
-    @Override public void onInvalid(@Nullable Exception err) {
-        // No-op.
+    @Override public void needCheckNotRemoved() {
+        rmvCheck = true;
     }
 
     /** {@inheritDoc} */
@@ -369,7 +372,31 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
      */
     private void checkRemoved() throws IllegalStateException {
         if (rmvd)
-            throw new IllegalStateException("Atomic stamped was removed from cache: " + name);
+            throw removedError();
+
+        if (rmvCheck) {
+            try {
+                rmvd = atomicView.get(key) == null;
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+
+            rmvCheck = false;
+
+            if (rmvd) {
+                ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+                throw removedError();
+            }
+        }
+    }
+
+    /**
+     * @return Error.
+     */
+    private IllegalStateException removedError() {
+        return new IllegalStateException("Atomic stamped was removed from cache: " + name);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 85b6cfd..95b970a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -67,9 +67,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /** Cache context. */
     private GridCacheContext ctx;
 
-    /** Current count. */
-    private int cnt;
-
     /** Initial count. */
     private int initCnt;
 
@@ -96,7 +93,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
      * Constructor.
      *
      * @param name Latch name.
-     * @param cnt Current count.
      * @param initCnt Initial count.
      * @param autoDel Auto delete flag.
      * @param key Latch key.
@@ -104,7 +100,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
      * @param ctx Cache context.
      */
     public GridCacheCountDownLatchImpl(String name,
-        int cnt,
         int initCnt,
         boolean autoDel,
         GridCacheInternalKey key,
@@ -112,14 +107,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
         GridCacheContext ctx)
     {
         assert name != null;
-        assert cnt >= 0;
         assert initCnt >= 0;
         assert key != null;
         assert latchView != null;
         assert ctx != null;
 
         this.name = name;
-        this.cnt = cnt;
         this.initCnt = initCnt;
         this.autoDel = autoDel;
         this.key = key;
@@ -136,7 +129,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
 
     /** {@inheritDoc} */
     @Override public int count() {
-        return cnt;
+        try {
+            return CU.outTx(new GetCountCallable(), ctx);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -207,13 +205,11 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
 
     /** {@inheritDoc} */
     @Override public boolean onRemoved() {
-        assert cnt == 0;
-
         return rmvd = true;
     }
 
     /** {@inheritDoc} */
-    @Override public void onInvalid(@Nullable Exception err) {
+    @Override public void needCheckNotRemoved() {
         // No-op.
     }
 
@@ -231,8 +227,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     @Override public void onUpdate(int cnt) {
         assert cnt >= 0;
 
-        this.cnt = cnt;
-
         while (internalLatch != null && internalLatch.getCount() > cnt)
             internalLatch.countDown();
     }
@@ -253,9 +247,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                                     if (log.isDebugEnabled())
                                         log.debug("Failed to find count down latch with given name: " + name);
 
-                                    assert cnt == 0;
-
-                                    return new CountDownLatch(cnt);
+                                    return new CountDownLatch(0);
                                 }
 
                                 tx.commit();
@@ -337,6 +329,29 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /**
      *
      */
+    private class GetCountCallable implements Callable<Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer call() throws Exception {
+            Integer val;
+
+            try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+                GridCacheCountDownLatchValue latchVal = latchView.get(key);
+
+                if (latchVal == null)
+                    return 0;
+
+                val = latchVal.get();
+
+                tx.rollback();
+            }
+
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
     private class CountDownCallable implements Callable<Integer> {
         /** Value to count down on (if 0 then latch is counted down to 0). */
         private final int val;
@@ -359,9 +374,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                     if (log.isDebugEnabled())
                         log.debug("Failed to find count down latch with given name: " + name);
 
-                    assert cnt == 0;
-
-                    return cnt;
+                    return 0;
                 }
 
                 int retVal;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
index 48d8644..dd4f2cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.processors.datastructures;
 
-import org.jetbrains.annotations.*;
-
 /**
  * Provides callback for marking object as removed.
  */
@@ -31,7 +29,7 @@ public interface GridCacheRemovable {
     public boolean onRemoved();
 
     /**
-     * @param err Error which cause data structure to become invalid.
+     *
      */
-    public void onInvalid(@Nullable Exception err);
+    public void needCheckNotRemoved();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f74fe95..6d920fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -101,6 +101,19 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
         return rmvd;
     }
 
+    /**
+     * @return {@code True} if set header found in cache.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public boolean checkHeader() throws IgniteCheckedException {
+        IgniteInternalCache<GridCacheSetHeaderKey, GridCacheSetHeader> cache0 = ctx.cache();
+
+        GridCacheSetHeader hdr = cache0.get(new GridCacheSetHeaderKey(name));
+
+        return hdr != null && hdr.id().equals(id);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public int size() {
@@ -476,7 +489,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
     /**
      * @return Set ID.
      */
-    IgniteUuid id() {
+    public IgniteUuid id() {
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index ba43da7..90c26f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -57,6 +57,9 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
     /** Busy lock. */
     private GridSpinBusyLock busyLock;
 
+    /** Check removed flag. */
+    private boolean rmvCheck;
+
     /**
      * Required by {@link Externalizable}.
      */
@@ -78,6 +81,13 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
     }
 
     /**
+     * @return Set delegate.
+     */
+    public GridCacheSetImpl delegate() {
+        return delegate;
+    }
+
+    /**
      * Remove callback.
      */
     public void blockOnRemove() {
@@ -510,8 +520,43 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
      * Enters busy state.
      */
     private void enterBusy() {
+        boolean rmvd;
+
+        if (rmvCheck) {
+            try {
+                rmvd = !delegate().checkHeader();
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+
+            rmvCheck = false;
+
+            if (rmvd) {
+                delegate.removed(true);
+
+                cctx.dataStructures().onRemoved(this);
+
+                throw removedError();
+            }
+        }
+
         if (!busyLock.enterBusy())
-            throw new IllegalStateException("Set has been removed from cache: " + delegate);
+            throw removedError();
+    }
+
+    /**
+     *
+     */
+    public void needCheckNotRemoved() {
+        rmvCheck = true;
+    }
+
+    /**
+     * @return Error.
+     */
+    private IllegalStateException removedError() {
+        return new IllegalStateException("Set has been removed from cache: " + delegate);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 48e9686..350068a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1413,7 +1413,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
      * @return {@code true} if node is dead, {@code false} is node is alive.
      */
     private boolean isDeadNode(UUID uid) {
-        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index d1ee5ad..3a309f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -863,7 +863,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
      * @return {@code true} if node is dead, {@code false} is node is alive.
      */
     private boolean isDeadNode(UUID uid) {
-        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+        return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 0cbb77a..8639bc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -246,4 +246,11 @@ public interface GridQueryIndexing {
      * @return Backup filter.
      */
     public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts);
+
+    /**
+     * Client disconnected callback.
+     *
+     * @param reconnectFut Reconnect future.
+     */
+    public void onDisconnected(IgniteFuture<?> reconnectFut);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1ba1fae..f3ad4b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -234,6 +234,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             idx.stop();
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        if (idx != null)
+            idx.onDisconnected(reconnectFut);
+    }
+
     /**
      * @param cctx Cache context.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index bb451c7..78b09e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -124,7 +124,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         cache = ctx.cache().utilityCache();
 
-        ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
+        if (!ctx.clientNode())
+            ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
 
         try {
             if (ctx.deploy().enabled())
@@ -165,7 +166,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         busyLock.block();
 
-        ctx.event().removeLocalEventListener(topLsnr);
+        if (!ctx.clientNode())
+            ctx.event().removeLocalEventListener(topLsnr);
 
         if (cfgQryId != null)
             cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
@@ -209,6 +211,27 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             log.debug("Stopped service processor.");
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        for (Map.Entry<String, GridServiceDeploymentFuture> e : depFuts.entrySet()) {
+            GridServiceDeploymentFuture fut = e.getValue();
+
+            fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                "Failed to deploy service, client node disconnected."));
+
+            depFuts.remove(e.getKey(), fut);
+        }
+
+        for (Map.Entry<String, GridFutureAdapter<?>> e : undepFuts.entrySet()) {
+            GridFutureAdapter fut = e.getValue();
+
+            fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                "Failed to undeploy service, client node disconnected."));
+
+            undepFuts.remove(e.getKey(), fut);
+        }
+    }
+
     /**
      * Validates service configuration.
      *
@@ -328,6 +351,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             return old;
         }
 
+        if (ctx.clientDisconnected()) {
+            fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+                "Failed to deploy service, client node disconnected."));
+
+            depFuts.remove(cfg.getName(), fut);
+        }
+
         while (true) {
             try {
                 GridServiceDeploymentKey key = new GridServiceDeploymentKey(cfg.getName());
@@ -646,10 +676,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                     }
                 }
                 else {
-                    Collection<ClusterNode> nodes =
-                        assigns.nodeFilter() == null ?
-                            ctx.discovery().nodes(topVer) :
-                            F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter());
+                    Collection<ClusterNode> nodes = assigns.nodeFilter() == null ?
+                        ctx.discovery().nodes(topVer) :
+                        F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter());
 
                     if (!nodes.isEmpty()) {
                         int size = nodes.size();
@@ -1019,7 +1048,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                                     cache.getAndRemove(key);
                                 }
                                 catch (IgniteCheckedException ex) {
-                                    log.error("Failed to remove assignments for undeployed service: " + name, ex);
+                                    U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
                                 }
                             }
                         }
@@ -1164,7 +1193,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                                 }
                             }
                             catch (IgniteCheckedException ex) {
-                                log.error("Failed to clean up zombie assignments for service: " + name, ex);
+                                U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 8e13bc4..556beea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -68,9 +68,15 @@ class GridServiceProxy<T> implements Serializable {
      * @param name Service name.
      * @param svc Service type class.
      * @param sticky Whether multi-node request should be done.
+     * @param ctx Context.
      */
-    @SuppressWarnings("unchecked") GridServiceProxy(ClusterGroup prj, String name, Class<? super T> svc,
-        boolean sticky, GridKernalContext ctx) {
+    @SuppressWarnings("unchecked")
+    GridServiceProxy(ClusterGroup prj,
+        String name,
+        Class<? super T> svc,
+        boolean sticky,
+        GridKernalContext ctx)
+    {
         this.prj = prj;
         this.ctx = ctx;
         hasLocNode = hasLocalNode(prj);
@@ -159,6 +165,9 @@ class GridServiceProxy<T> implements Serializable {
                 catch (RuntimeException | Error e) {
                     throw e;
                 }
+                catch (IgniteCheckedException e) {
+                    throw U.convertException(e);
+                }
                 catch (Exception e) {
                     throw new IgniteException(e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d59a51d..d3caf5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -76,8 +76,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
     private final LongAdder8 execTasks = new LongAdder8();
 
     /** */
-    private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx =
-        new ThreadLocal<>();
+    private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = new ThreadLocal<>();
 
     /** */
     private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
@@ -119,6 +118,24 @@ public class GridTaskProcessor extends GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+        IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
+
+        for (GridTaskWorker<?, ?> worker : tasks.values())
+            worker.finishTask(null, err);
+    }
+
+    /**
+     * @param reconnectFut Reconnect future.
+     * @return Client disconnected exception.
+     */
+    private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) {
+        return new IgniteClientDisconnectedCheckedException(
+            reconnectFut != null ? reconnectFut : ctx.cluster().clientReconnectFuture(),
+            "Failed to execute task, client node disconnected.");
+    }
+
+    /** {@inheritDoc} */
     @SuppressWarnings("TooBroadScope")
     @Override public void onKernalStop(boolean cancel) {
         lock.writeLock();
@@ -552,7 +569,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
         // Creates task session with task name and task version.
         GridTaskSessionImpl ses = ctx.session().createTaskSession(
             sesId,
-            ctx.config().getNodeId(),
+            ctx.localNodeId(),
             taskName,
             dep,
             taskCls == null ? null : taskCls.getName(),
@@ -597,25 +614,29 @@ public class GridTaskProcessor extends GridProcessorAdapter {
 
                 assert taskWorker0 == null : "Session ID is not unique: " + sesId;
 
-                if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
-                    try {
-                        // Start task execution in another thread.
-                        if (sys)
-                            ctx.getSystemExecutorService().execute(taskWorker);
-                        else
-                            ctx.getExecutorService().execute(taskWorker);
-                    }
-                    catch (RejectedExecutionException e) {
-                        tasks.remove(sesId);
+                if (!ctx.clientDisconnected()) {
+                    if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
+                        try {
+                            // Start task execution in another thread.
+                            if (sys)
+                                ctx.getSystemExecutorService().execute(taskWorker);
+                            else
+                                ctx.getExecutorService().execute(taskWorker);
+                        }
+                        catch (RejectedExecutionException e) {
+                            tasks.remove(sesId);
 
-                        release(dep);
+                            release(dep);
 
-                        handleException(new ComputeExecutionRejectedException("Failed to execute task " +
-                            "due to thread pool execution rejection: " + taskName, e), fut);
+                            handleException(new ComputeExecutionRejectedException("Failed to execute task " +
+                                "due to thread pool execution rejection: " + taskName, e), fut);
+                        }
                     }
+                    else
+                        taskWorker.run();
                 }
                 else
-                    taskWorker.run();
+                    taskWorker.finishTask(null, disconnectedError(null));
             }
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index eb5fa77..133a31f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1070,10 +1070,17 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                             PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
-                    if (!isDeadNode(nodeId))
-                        U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
-                            nodeId + ", taskName=" + ses.getTaskName() +
-                            ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+                    try {
+                        if (!isDeadNode(nodeId))
+                            U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
+                                nodeId + ", taskName=" + ses.getTaskName() +
+                                ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+                    }
+                    catch (IgniteClientDisconnectedCheckedException e0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send cancel request to node, client disconnected [nodeId=" +
+                                nodeId + ", taskName=" + ses.getTaskName() + ']');
+                    }
                 }
             }
         }
@@ -1169,24 +1176,39 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             }
         }
         catch (IgniteCheckedException e) {
-            boolean deadNode = isDeadNode(res.getNode().id());
+            IgniteException fakeErr = null;
 
-            // Avoid stack trace if node has left grid.
-            if (deadNode)
-                U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
-                    "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
-                    ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
-            else
-                U.error(log, "Failed to send job request: " + req, e);
+            try {
+                boolean deadNode = isDeadNode(res.getNode().id());
+
+                // Avoid stack trace if node has left grid.
+                if (deadNode) {
+                    U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
+                        "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
+                        ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
+
+                    fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);
+                }
+                else
+                    U.error(log, "Failed to send job request: " + req, e);
+
+            }
+            catch (IgniteClientDisconnectedCheckedException e0) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to send job request, client disconnected [node=" + node +
+                        ", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +
+                        res.getJobContext().getJobId() + ']');
+
+                fakeErr = U.convertException(e0);
+            }
 
             GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
                 res.getJobContext().getJobId(), null, null, null, null, null, null, false);
 
-            if (deadNode)
-                fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " +
-                    node, e));
-            else
-                fakeRes.setFakeException(U.convertException(e));
+            if (fakeErr == null)
+                fakeErr = U.convertException(e);
+
+            fakeRes.setFakeException(fakeErr);
 
             onResponse(fakeRes);
         }
@@ -1345,8 +1367,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
      *
      * @param uid UID of node to check.
      * @return {@code true} if node is dead, {@code false} is node is alive.
+     * @throws IgniteClientDisconnectedCheckedException if ping failed when client disconnected.
      */
-    private boolean isDeadNode(UUID uid) {
+    private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException {
         return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f457d6c..66eb596 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -626,6 +626,15 @@ public abstract class IgniteUtils {
             }
         });
 
+        m.put(IgniteClientDisconnectedCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
+            @Override public IgniteException apply(IgniteCheckedException e) {
+                return new IgniteClientDisconnectedException(
+                    ((IgniteClientDisconnectedCheckedException)e).reconnectFuture(),
+                    e.getMessage(),
+                    e);
+            }
+        });
+
         return m;
     }
 
@@ -673,6 +682,25 @@ public abstract class IgniteUtils {
      * @return Ignite runtime exception.
      */
     public static IgniteException convertException(IgniteCheckedException e) {
+        IgniteClientDisconnectedException e0 = e.getCause(IgniteClientDisconnectedException.class);
+
+        if (e0 != null) {
+            assert e0.reconnectFuture() != null : e0;
+
+            throw e0;
+        }
+
+        IgniteClientDisconnectedCheckedException disconnectedErr =
+            e instanceof IgniteClientDisconnectedCheckedException ?
+            (IgniteClientDisconnectedCheckedException)e
+            : e.getCause(IgniteClientDisconnectedCheckedException.class);
+
+        if (disconnectedErr != null) {
+            assert disconnectedErr.reconnectFuture() != null : disconnectedErr;
+
+            e = disconnectedErr;
+        }
+
         C1<IgniteCheckedException, IgniteException> converter = exceptionConverters.get(e.getClass());
 
         if (converter != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
index c935c4a..a4f7804 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
@@ -59,8 +59,9 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint {
      * @param outSpace Out space.
      * @param parent Parent logger.
      */
-    public IpcSharedMemoryClientEndpoint(IpcSharedMemorySpace inSpace, IpcSharedMemorySpace outSpace,
-                                         IgniteLogger parent) {
+    public IpcSharedMemoryClientEndpoint(IpcSharedMemorySpace inSpace,
+        IpcSharedMemorySpace outSpace,
+        IgniteLogger parent) {
         assert inSpace != null;
         assert outSpace != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..f3bcab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1570,6 +1570,7 @@ public class GridFunc {
      * @param <T> Type of the collection.
      * @return Light-weight view on given collection with provided predicate.
      */
+    @SafeVarargs
     public static <T> Collection<T> view(@Nullable final Collection<T> c,
         @Nullable final IgnitePredicate<? super T>... p) {
         if (isEmpty(c) || isAlwaysFalse(p))
@@ -2706,6 +2707,7 @@ public class GridFunc {
      * @param <T> Type of the free variable, i.e. the element the predicate is called on.
      * @return Negated predicate.
      */
+    @SafeVarargs
     public static <T> IgnitePredicate<T> not(@Nullable final IgnitePredicate<? super T>... p) {
         return isAlwaysFalse(p) ? F.<T>alwaysTrue() : isAlwaysTrue(p) ? F.<T>alwaysFalse() : new P1<T>() {
             @Override public boolean apply(T t) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
index 968d88d..0f6ed5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi;
 
+import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
@@ -106,4 +107,18 @@ public interface IgniteSpi {
      * @throws IgniteSpiException Thrown in case of any error during SPI stop.
      */
     public void spiStop() throws IgniteSpiException;
+
+    /**
+     * Client node disconnected callback.
+     *
+     * @param reconnectFut Future that will be completed when client reconnected.
+     */
+    public void onClientDisconnected(IgniteFuture<?> reconnectFut);
+
+    /**
+     * Client node reconnected callback.
+     *
+     * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
+     */
+    public void onClientReconnected(boolean clusterRestarted);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 5e557bd..07b39bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.resources.*;
@@ -58,9 +59,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     /** Ignite instance. */
     protected Ignite ignite;
 
-    /** Local node id. */
-    protected UUID nodeId;
-
     /** Grid instance name. */
     protected String gridName;
 
@@ -73,6 +71,9 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
     /** Discovery listener. */
     private GridLocalEventListener paramsLsnr;
 
+    /** Local node. */
+    private ClusterNode locNode;
+
     /**
      * Creates new adapter and initializes it from the current (this) class.
      * SPI name will be initialized to the simple name of the class
@@ -111,7 +112,19 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
     /** {@inheritDoc} */
     @Override public UUID getLocalNodeId() {
-        return nodeId;
+        return ignite.cluster().localNode().id();
+    }
+
+    /**
+     * @return Local node.
+     */
+    protected ClusterNode getLocalNode() {
+        if (locNode != null)
+            return locNode;
+
+        locNode = getSpiContext().localNode();
+
+        return locNode;
     }
 
     /** {@inheritDoc} */
@@ -194,17 +207,27 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClientReconnected(boolean clusterRestarted) {
+        // No-op.
+    }
+
     /**
      * Inject ignite instance.
+     *
+     * @param ignite Ignite instance.
      */
     @IgniteInstanceResource
     protected void injectResources(Ignite ignite) {
         this.ignite = ignite;
 
-        if (ignite != null) {
-            nodeId = ignite.configuration().getNodeId();
+        if (ignite != null)
             gridName = ignite.name();
-        }
     }
 
     /**


Mime
View raw message