ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/40] incubator-ignite git commit: # ignite-883
Date Tue, 09 Jun 2015 12:29:57 GMT
# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/873e01bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/873e01bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/873e01bb

Branch: refs/heads/ignite-gg-10299
Commit: 873e01bbb61ed5782635fa81e1c9834f261f50fe
Parents: 7a2e898
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jun 2 12:29:00 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Jun 3 16:52:47 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  |  38 +-
 .../internal/managers/GridManagerAdapter.java   |   9 +
 .../checkpoint/GridCheckpointManager.java       |  52 ++-
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../processors/cache/GridCacheTtlManager.java   |   9 +-
 .../cache/transactions/IgniteTxManager.java     |   3 -
 .../datastructures/DataStructuresProcessor.java |  66 +--
 .../timeout/GridSpiTimeoutObject.java           |  59 +++
 .../timeout/GridTimeoutProcessor.java           |  24 +-
 .../util/nio/GridCommunicationClient.java       |  26 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |  23 +-
 .../org/apache/ignite/spi/IgniteSpiContext.java |  10 +
 .../ignite/spi/IgniteSpiTimeoutObject.java      |  40 ++
 .../spi/checkpoint/noop/NoopCheckpointSpi.java  |   3 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 409 ++++++-------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |   3 -
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   1 -
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 156 +------
 .../IgniteCountDownLatchAbstractSelfTest.java   | 102 +++++
 .../IgniteCacheClientNearCacheExpiryTest.java   | 103 +++++
 .../IgniteCacheExpiryPolicyTestSuite.java       |   2 +
 .../testframework/GridSpiTestContext.java       |  10 +
 22 files changed, 642 insertions(+), 510 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index ed33365..5cbe377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1447,6 +1447,17 @@ public class IgnitionEx {
                 ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
             }
 
+            execSvc = new IgniteThreadPoolExecutor(
+                "pub-" + cfg.getGridName(),
+                cfg.getPublicThreadPoolSize(),
+                cfg.getPublicThreadPoolSize(),
+                DFLT_PUBLIC_KEEP_ALIVE_TIME,
+                new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+            if (!myCfg.isClientMode())
+                // Pre-start all threads as they are guaranteed to be needed.
+                ((ThreadPoolExecutor)execSvc).prestartAllCoreThreads();
+
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
             sysExecSvc = new IgniteThreadPoolExecutor(
@@ -1456,30 +1467,8 @@ public class IgnitionEx {
                 DFLT_SYSTEM_KEEP_ALIVE_TIME,
                 new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
 
-            boolean isClientMode = Boolean.TRUE.equals(myCfg.isClientMode());
-
-            if (isClientMode) {
-                execSvc = new IgniteThreadPoolExecutor(
-                    "pub-" + cfg.getGridName(),
-                    0,
-                    cfg.getPublicThreadPoolSize(),
-                    2000,
-                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-            }
-            else {
-                execSvc = new IgniteThreadPoolExecutor(
-                    "pub-" + cfg.getGridName(),
-                    cfg.getPublicThreadPoolSize(),
-                    cfg.getPublicThreadPoolSize(),
-                    DFLT_PUBLIC_KEEP_ALIVE_TIME,
-                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
-                // Pre-start all threads as they are guaranteed to be needed.
-                ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
-                // Pre-start all threads as they are guaranteed to be needed.
-                ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
-            }
+            // Pre-start all threads as they are guaranteed to be needed.
+            ((ThreadPoolExecutor)sysExecSvc).prestartAllCoreThreads();
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
@@ -2007,7 +1996,6 @@ public class IgnitionEx {
             ccfg.setWriteSynchronizationMode(FULL_SYNC);
             ccfg.setCacheMode(cfg.getCacheMode());
             ccfg.setNodeFilter(CacheConfiguration.ALL_NODES);
-            ccfg.setNearConfiguration(new NearCacheConfiguration());
 
             if (cfg.getCacheMode() == PARTITIONED)
                 ccfg.setBackups(cfg.getBackups());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index 1eb7143..bea4256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -483,6 +484,14 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return ctx.discovery().tryFailNode(nodeId);
                     }
 
+                    @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+                        ctx.timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+                    }
+
+                    @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+                        ctx.timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+                    }
+
                     /**
                      * @param e Exception to handle.
                      * @return GridSpiException Converted exception.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
index 2e80b6f..ce2a36c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/checkpoint/GridCheckpointManager.java
@@ -56,11 +56,10 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
     private final GridMessageListener lsnr = new CheckpointRequestListener();
 
     /** */
-    private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<IgniteUuid, CheckpointSet> keyMap;
 
     /** */
-    private final Collection<IgniteUuid> closedSess = new GridBoundedConcurrentLinkedHashSet<>(
-        MAX_CLOSED_SESS, MAX_CLOSED_SESS, 0.75f, 256, PER_SEGMENT_Q);
+    private final Collection<IgniteUuid> closedSess;
 
     /** Grid marshaller. */
     private final Marshaller marsh;
@@ -72,6 +71,21 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
         super(ctx, ctx.config().getCheckpointSpi());
 
         marsh = ctx.config().getMarshaller();
+
+        if (enabled()) {
+            keyMap = new ConcurrentHashMap8<>();
+
+            closedSess = new GridBoundedConcurrentLinkedHashSet<>(MAX_CLOSED_SESS,
+                MAX_CLOSED_SESS,
+                0.75f,
+                256,
+                PER_SEGMENT_Q);
+        }
+        else {
+            keyMap = null;
+
+            closedSess = null;
+        }
     }
 
     /** {@inheritDoc} */
@@ -112,7 +126,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Session IDs.
      */
     public Collection<IgniteUuid> sessionIds() {
-        return new ArrayList<>(keyMap.keySet());
+        return enabled() ? new ArrayList<>(keyMap.keySet()) : Collections.<IgniteUuid>emptyList();
     }
 
     /**
@@ -125,8 +139,17 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return {@code true} if checkpoint has been actually saved, {@code false} otherwise.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public boolean storeCheckpoint(GridTaskSessionInternal ses, String key, Object state, ComputeTaskSessionScope scope,
-        long timeout, boolean override) throws IgniteCheckedException {
+    public boolean storeCheckpoint(GridTaskSessionInternal ses,
+        String key,
+        Object state,
+        ComputeTaskSessionScope scope,
+        long timeout,
+        boolean override)
+        throws IgniteCheckedException
+    {
+        if (!enabled())
+            return false;
+
         assert ses != null;
         assert key != null;
 
@@ -239,6 +262,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Whether or not checkpoint was removed.
      */
     public boolean removeCheckpoint(String key) {
+        if (!enabled())
+            return false;
+
         assert key != null;
 
         boolean rmv = false;
@@ -256,6 +282,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @return Whether or not checkpoint was removed.
      */
     public boolean removeCheckpoint(GridTaskSessionInternal ses, String key) {
+        if (!enabled())
+            return false;
+
         assert ses != null;
         assert key != null;
 
@@ -283,6 +312,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @Nullable public Serializable loadCheckpoint(GridTaskSessionInternal ses, String key) throws IgniteCheckedException {
+        if (!enabled())
+            return null;
+
         assert ses != null;
         assert key != null;
 
@@ -309,6 +341,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
      * @param cleanup Whether cleanup or not.
      */
     public void onSessionEnd(GridTaskSessionInternal ses, boolean cleanup) {
+        if (!enabled())
+            return;
+
         closedSess.add(ses.getId());
 
         // If on task node.
@@ -358,7 +393,7 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
     @Override public void printMemoryStats() {
         X.println(">>>");
         X.println(">>> Checkpoint manager memory stats [grid=" + ctx.gridName() + ']');
-        X.println(">>>  keyMap: " + keyMap.size());
+        X.println(">>>  keyMap: " + (keyMap != null ? keyMap.size() : 0));
     }
 
     /**
@@ -407,6 +442,9 @@ public class GridCheckpointManager extends GridManagerAdapter<CheckpointSpi> {
             if (log.isDebugEnabled())
                 log.debug("Received checkpoint request: " + req);
 
+            if (!enabled())
+                return;
+
             IgniteUuid sesId = req.getSessionId();
 
             if (closedSess.contains(sesId)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 9d98ce7..4216895 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -395,6 +395,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        assert !CU.isUtilityCache(ctx.name());
+        assert !CU.isAtomicsCache(ctx.name());
+        assert !CU.isMarshallerCache(ctx.name());
+
         CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index 5f9049a..9bd6321 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl())
+        boolean cleanupDisabled = cctx.kernalContext().isDaemon() ||
+            !cctx.config().isEagerTtl() ||
+            CU.isAtomicsCache(cctx.name()) ||
+            CU.isMarshallerCache(cctx.name()) ||
+            CU.isUtilityCache(cctx.name()) ||
+            (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null);
+
+        if (cleanupDisabled)
             return;
 
         cleanupWorker = new CleanupWorker();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 4666cca..b6c77f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1221,9 +1221,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 collectPendingVersions(dhtTxLoc);
             }
 
-            // 3.1 Call dataStructures manager.
-            cctx.kernalContext().dataStructures().onTxCommitted(tx);
-
             // 4. Unlock write resources.
             unlockMultiple(tx, tx.writeEntries());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 72911af..27f6a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -32,6 +32,7 @@ import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
+import javax.cache.event.*;
 import javax.cache.processor.*;
 import java.io.*;
 import java.util.*;
@@ -40,7 +41,6 @@ import java.util.concurrent.*;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
 import static org.apache.ignite.cache.CacheRebalanceMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
 import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.*;
 import static org.apache.ignite.transactions.TransactionConcurrency.*;
 import static org.apache.ignite.transactions.TransactionIsolation.*;
@@ -99,6 +99,9 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /** */
     private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache;
 
+    /** */
+    private UUID qryId;
+
     /**
      * @param ctx Context.
      */
@@ -112,7 +115,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public void onKernalStart() {
+    @Override public void onKernalStart() throws IgniteCheckedException {
         if (ctx.config().isDaemon())
             return;
 
@@ -139,10 +142,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
             seqView = atomicsCache;
 
-            dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context();
+            dsCacheCtx = atomicsCache.context();
+
+            qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+                null,
+                dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+                false);
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (qryId != null)
+            dsCacheCtx.continuousQueries().cancelInternalQuery(qryId);
+    }
+
     /**
      * Gets a sequence from cache or creates one if it's not cached.
      *
@@ -906,8 +922,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 dsCacheCtx.gate().enter();
 
                 try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
-                    GridCacheCountDownLatchValue val = cast(dsView.get(key),
-                        GridCacheCountDownLatchValue.class);
+                    GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class);
 
                     // Check that count down hasn't been created in other thread yet.
                     GridCacheCountDownLatchEx latch = cast(dsMap.get(key), GridCacheCountDownLatchEx.class);
@@ -1034,28 +1049,19 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Transaction committed callback for transaction manager.
      *
-     * @param tx Committed transaction.
      */
-    public <K, V> void onTxCommitted(IgniteInternalTx tx) {
-        if (dsCacheCtx == null)
-            return;
-
-        if (!dsCacheCtx.isDht() && tx.internal() && (!dsCacheCtx.isColocated() || dsCacheCtx.isReplicated())) {
-            Collection<IgniteTxEntry> entries = tx.writeEntries();
-
-            if (log.isDebugEnabled())
-                log.debug("Committed entries: " + entries);
-
-            for (IgniteTxEntry entry : entries) {
-                // Check updated or created GridCacheInternalKey keys.
-                if ((entry.op() == CREATE || entry.op() == UPDATE) && entry.key().internal()) {
-                    GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
-
-                    Object val0 = CU.value(entry.value(), entry.context(), false);
+    private class DataStructuresEntryListener implements CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+            throws CacheEntryListenerException {
+            for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
+                if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
+                    GridCacheInternal val0 = evt.getValue();
 
                     if (val0 instanceof GridCacheCountDownLatchValue) {
+                        GridCacheInternalKey key = evt.getKey();
+
                         // Notify latch on changes.
                         GridCacheRemovable latch = dsMap.get(key);
 
@@ -1067,8 +1073,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                             latch0.onUpdate(val.get());
 
                             if (val.get() == 0 && val.autoDelete()) {
-                                entry.cached().markObsolete(dsCacheCtx.versions().next());
-
                                 dsMap.remove(key);
 
                                 latch.onRemoved();
@@ -1080,11 +1084,12 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                                 ", actual=" + latch.getClass() + ", value=" + latch + ']');
                         }
                     }
+
                 }
+                else {
+                    assert evt.getEventType() == EventType.REMOVED : evt;
 
-                // Check deleted GridCacheInternal keys.
-                if (entry.op() == DELETE && entry.key().internal()) {
-                    GridCacheInternal key = entry.key().value(entry.context().cacheObjectContext(), false);
+                    GridCacheInternal key = evt.getKey();
 
                     // Entry's val is null if entry deleted.
                     GridCacheRemovable obj = dsMap.remove(key);
@@ -1094,6 +1099,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
                 }
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructuresEntryListener.class, this);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
new file mode 100644
index 0000000..82267a2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.timeout;
+
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+
+/**
+ * Wrapper for {@link IgniteSpiTimeoutObject}.
+ */
+public class GridSpiTimeoutObject implements GridTimeoutObject {
+    /** */
+    @GridToStringInclude
+    private final IgniteSpiTimeoutObject obj;
+
+    /**
+     * @param obj SPI object.
+     */
+    public GridSpiTimeoutObject(IgniteSpiTimeoutObject obj) {
+        this.obj = obj;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTimeout() {
+        obj.onTimeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid timeoutId() {
+        return obj.id();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long endTime() {
+        return obj.endTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public final String toString() {
+        return S.toString(GridSpiTimeoutObject.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index e9b7717..e4f370c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
@@ -111,8 +112,8 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
      * @return Cancelable to cancel task.
      */
     public CancelableTask schedule(Runnable task, long delay, long period) {
-        assert delay >= 0;
-        assert period > 0 || period == -1;
+        assert delay >= 0 : delay;
+        assert period > 0 || period == -1 : period;
 
         CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
 
@@ -203,7 +204,7 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
      */
     public class CancelableTask implements GridTimeoutObject, Closeable {
         /** */
-        private final IgniteUuid id = new IgniteUuid();
+        private final IgniteUuid id = IgniteUuid.randomUuid();
 
         /** */
         private long endTime;
@@ -215,12 +216,13 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         private volatile boolean cancel;
 
         /** */
+        @GridToStringInclude
         private final Runnable task;
 
         /**
+         * @param task Task to execute.
          * @param firstTime First time.
          * @param period Period.
-         * @param task Task to execute.
          */
         CancelableTask(Runnable task, long firstTime, long period) {
             this.task = task;
@@ -243,19 +245,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
             if (cancel)
                 return;
 
-            long startTime = U.currentTimeMillis();
-
             try {
                 task.run();
             }
             finally {
-                long executionTime = U.currentTimeMillis() - startTime;
-
-                if (executionTime > 10) {
-                    U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " +
-                        "quickly [executionTime=" + executionTime + ']');
-                }
-
                 if (!cancel && period > 0) {
                     endTime = U.currentTimeMillis() + period;
 
@@ -273,5 +266,10 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
                 removeTimeoutObject(this);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CancelableTask.class, this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 31396fb..2f7fd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -38,58 +38,58 @@ public interface GridCommunicationClient {
      * @param handshakeC Handshake.
      * @throws IgniteCheckedException If handshake failed.
      */
-    void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
+    public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException;
 
     /**
      * @return {@code True} if client has been closed by this call,
      *      {@code false} if failed to close client (due to concurrent reservation or concurrent close).
      */
-    boolean close();
+    public boolean close();
 
     /**
      * Forces client close.
      */
-    void forceClose();
+    public void forceClose();
 
     /**
      * @return {@code True} if client is closed;
      */
-    boolean closed();
+    public boolean closed();
 
     /**
      * @return {@code True} if client was reserved, {@code false} otherwise.
      */
-    boolean reserve();
+    public boolean reserve();
 
     /**
      * Releases this client by decreasing reservations.
      */
-    void release();
+    public void release();
 
     /**
      * @return {@code True} if client was reserved.
      */
-    boolean reserved();
+    public boolean reserved();
 
     /**
      * Gets idle time of this client.
      *
      * @return Idle time of this client.
      */
-    long getIdleTime();
+    public long getIdleTime();
 
     /**
      * @param data Data to send.
      * @throws IgniteCheckedException If failed.
      */
-    void sendMessage(ByteBuffer data) throws IgniteCheckedException;
+    public void sendMessage(ByteBuffer data) throws IgniteCheckedException;
 
     /**
      * @param data Data to send.
      * @param len Length.
      * @throws IgniteCheckedException If failed.
      */
-    void sendMessage(byte[] data, int len) throws IgniteCheckedException;
+    public void sendMessage(byte[] data, int len) throws IgniteCheckedException;
 
     /**
      * @param nodeId Node ID (provided only if versions of local and remote nodes are different).
@@ -97,16 +97,16 @@ public interface GridCommunicationClient {
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if should try to resend message.
      */
-    boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
+    public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
 
     /**
      * @param timeout Timeout.
      * @throws IOException If failed.
      */
-    void flushIfNeeded(long timeout) throws IOException;
+    public void flushIfNeeded(long timeout) throws IOException;
 
     /**
      * @return {@code True} if send is asynchronous.
      */
-    boolean async();
+    public boolean async();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index d3c0587..c9c633f 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -542,10 +543,20 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         return U.spiAttribute(this, attrName);
     }
 
+    /** {@inheritDoc} */
+    protected void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+        spiCtx.addTimeoutObject(obj);
+    }
+
+    /** {@inheritDoc} */
+    protected void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+        spiCtx.removeTimeoutObject(obj);
+    }
+
     /**
      * Temporarily SPI context.
      */
-    private static class GridDummySpiContext implements IgniteSpiContext {
+    private class GridDummySpiContext implements IgniteSpiContext {
         /** */
         private final ClusterNode locNode;
 
@@ -716,5 +727,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
         @Override public boolean tryFailNode(UUID nodeId) {
             return false;
         }
+
+        /** {@inheritDoc} */
+        @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+            ((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+            ((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 55f46e5..f83326c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -310,4 +310,14 @@ public interface IgniteSpiContext {
      * @return If node was failed.
      */
     public boolean tryFailNode(UUID nodeId);
+
+    /**
+     * @param c Timeout object.
+     */
+    public void addTimeoutObject(IgniteSpiTimeoutObject c);
+
+    /**
+     * @param c Timeout object.
+     */
+    public void removeTimeoutObject(IgniteSpiTimeoutObject c);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
new file mode 100644
index 0000000..f7fbd27f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiTimeoutObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.lang.*;
+
+/**
+ *
+ */
+public interface IgniteSpiTimeoutObject {
+    /**
+     * @return Unique object ID.
+     */
+    public IgniteUuid id();
+
+    /**
+     * @return End time.
+     */
+    public long endTime();
+
+    /**
+     * Timeout callback.
+     */
+    public void onTimeout();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
index 460cff3..832d872 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/checkpoint/noop/NoopCheckpointSpi.java
@@ -51,8 +51,7 @@ public class NoopCheckpointSpi extends IgniteSpiAdapter implements CheckpointSpi
     }
 
     /** {@inheritDoc} */
-    @Override
-    public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) throws IgniteSpiException {
+    @Override public boolean saveCheckpoint(String key, byte[] state, long timeout, boolean overwrite) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 19e54c8..b324ab2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -267,7 +267,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                                 log.debug("Session was closed but there are unacknowledged messages, " +
                                                     "will try to reconnect [rmtNode=" + recoveryData.node().id() + ']');
 
-                                            recoveryWorker.addReconnectRequest(recoveryData);
+                                            commWorker.addReconnectRequest(recoveryData);
                                         }
                                     }
                                     else
@@ -647,17 +647,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Socket write timeout. */
     private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
 
-    /** Idle client worker. */
-    private IdleClientWorker idleClientWorker;
-
     /** Flush client worker. */
     private ClientFlushWorker clientFlushWorker;
 
-    /** Socket timeout worker. */
-    private SocketTimeoutWorker sockTimeoutWorker;
-
-    /** Recovery worker. */
-    private RecoveryWorker recoveryWorker;
+    /** Recovery and idle clients handler. */
+    private CommunicationWorker commWorker;
 
     /** Clients. */
     private final ConcurrentMap<UUID, GridCommunicationClient> clients = GridConcurrentFactory.newMap();
@@ -1274,13 +1268,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         nioSrvr.start();
 
-        idleClientWorker = new IdleClientWorker();
-
-        idleClientWorker.start();
-
-        recoveryWorker = new RecoveryWorker();
+        commWorker = new CommunicationWorker();
 
-        recoveryWorker.start();
+        commWorker.start();
 
         if (connBufSize > 0) {
             clientFlushWorker = new ClientFlushWorker();
@@ -1288,10 +1278,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             clientFlushWorker.start();
         }
 
-        sockTimeoutWorker = new SocketTimeoutWorker();
-
-        sockTimeoutWorker.start();
-
         // Ack start.
         if (log.isDebugEnabled())
             log.debug(startInfo());
@@ -1445,15 +1431,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(idleClientWorker);
         U.interrupt(clientFlushWorker);
-        U.interrupt(sockTimeoutWorker);
-        U.interrupt(recoveryWorker);
+        U.interrupt(commWorker);
 
-        U.join(idleClientWorker, log);
         U.join(clientFlushWorker, log);
-        U.join(sockTimeoutWorker, log);
-        U.join(recoveryWorker, log);
+        U.join(commWorker, log);
 
         // Force closing on stop (safety).
         for (GridCommunicationClient client : clients.values())
@@ -1461,7 +1443,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         // Clear resources.
         nioSrvr = null;
-        idleClientWorker = null;
+        commWorker = null;
 
         boundTcpPort = -1;
 
@@ -1899,7 +1881,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, U.currentTimeMillis() + timeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         long rcvCnt = 0;
 
@@ -2005,7 +1987,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Ignoring whatever happened after timeout - reporting only timeout event.
             if (!cancelled)
@@ -2041,15 +2023,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (nioSrvr != null)
             nioSrvr.stop();
 
-        U.interrupt(idleClientWorker);
         U.interrupt(clientFlushWorker);
-        U.interrupt(sockTimeoutWorker);
-        U.interrupt(recoveryWorker);
+        U.interrupt(commWorker);
 
-        U.join(idleClientWorker, log);
         U.join(clientFlushWorker, log);
-        U.join(sockTimeoutWorker, log);
-        U.join(recoveryWorker, log);
+        U.join(commWorker, log);
 
         for (GridCommunicationClient client : clients.values())
             client.forceClose();
@@ -2156,119 +2134,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private class IdleClientWorker extends IgniteSpiThread {
-        /**
-         *
-         */
-        IdleClientWorker() {
-            super(gridName, "nio-idle-client-collector", log);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings({"BusyWait"})
-        @Override protected void body() throws InterruptedException {
-            while (!isInterrupted()) {
-                cleanupRecovery();
-
-                for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
-                    UUID nodeId = e.getKey();
-
-                    GridCommunicationClient client = e.getValue();
-
-                    ClusterNode node = getSpiContext().node(nodeId);
-
-                    if (node == null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Forcing close of non-existent node connection: " + nodeId);
-
-                        client.forceClose();
-
-                        clients.remove(nodeId, client);
-
-                        continue;
-                    }
-
-                    GridNioRecoveryDescriptor recovery = null;
-
-                    if (client instanceof GridTcpNioCommunicationClient) {
-                        recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
-
-                        if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
-                            RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
-
-                            if (log.isDebugEnabled())
-                                log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
-                                    ", rcvCnt=" + msg.received() + ']');
-
-                            nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
-
-                            recovery.lastAcknowledged(msg.received());
-
-                            continue;
-                        }
-                    }
-
-                    long idleTime = client.getIdleTime();
-
-                    if (idleTime >= idleConnTimeout) {
-                        if (recovery != null &&
-                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
-                            !recovery.messagesFutures().isEmpty()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Node connection is idle, but there are unacknowledged messages, " +
-                                    "will wait: " + nodeId);
-
-                            continue;
-                        }
-
-                        if (log.isDebugEnabled())
-                            log.debug("Closing idle node connection: " + nodeId);
-
-                        if (client.close() || client.closed())
-                            clients.remove(nodeId, client);
-                    }
-                }
-
-                Thread.sleep(idleConnTimeout);
-            }
-        }
-
-        /**
-         *
-         */
-        private void cleanupRecovery() {
-            Set<ClientKey> left = null;
-
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
-                if (left != null && left.contains(e.getKey()))
-                    continue;
-
-                GridNioRecoveryDescriptor recoverySnd = e.getValue();
-
-                if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
-                    if (left == null)
-                        left = new HashSet<>();
-
-                    left.add(e.getKey());
-                }
-            }
-
-            if (left != null) {
-                assert !left.isEmpty();
-
-                for (ClientKey id : left) {
-                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
-
-                    if (recoverySnd != null)
-                        recoverySnd.onNodeLeft();
-                }
-            }
-        }
-    }
-
-    /**
-     *
-     */
     private class ClientFlushWorker extends IgniteSpiThread {
         /**
          *
@@ -2319,157 +2184,164 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
-     * Handles sockets timeouts.
+     *
      */
-    private class SocketTimeoutWorker extends IgniteSpiThread {
-        /** Time-based sorted set for timeout objects. */
-        private final GridConcurrentSkipListSet<HandshakeTimeoutObject> timeoutObjs =
-            new GridConcurrentSkipListSet<>(new Comparator<HandshakeTimeoutObject>() {
-                @Override public int compare(HandshakeTimeoutObject o1, HandshakeTimeoutObject o2) {
-                    long time1 = o1.endTime();
-                    long time2 = o2.endTime();
-
-                    long id1 = o1.id();
-                    long id2 = o2.id();
-
-                    return time1 < time2 ? -1 : time1 > time2 ? 1 :
-                        id1 < id2 ? -1 : id1 > id2 ? 1 : 0;
-                }
-            });
-
-        /** Mutex. */
-        private final Object mux0 = new Object();
+    private class CommunicationWorker extends IgniteSpiThread {
+        /** */
+        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
 
         /**
          *
          */
-        SocketTimeoutWorker() {
-            super(gridName, "tcp-comm-sock-timeout-worker", log);
+        private CommunicationWorker() {
+            super(gridName, "tcp-comm-worker", log);
         }
 
-        /**
-         * @param timeoutObj Timeout object to add.
-         */
-        @SuppressWarnings({"NakedNotify"})
-        public void addTimeoutObject(HandshakeTimeoutObject timeoutObj) {
-            assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            if (log.isDebugEnabled())
+                log.debug("Tcp communication worker has been started.");
 
-            timeoutObjs.add(timeoutObj);
+            while (!isInterrupted()) {
+                GridNioRecoveryDescriptor recoveryDesc = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
 
-            if (timeoutObjs.firstx() == timeoutObj) {
-                synchronized (mux0) {
-                    mux0.notifyAll();
-                }
+                if (recoveryDesc != null)
+                    processRecovery(recoveryDesc);
+                else
+                    processIdle();
             }
         }
 
         /**
-         * @param timeoutObj Timeout object to remove.
+         *
          */
-        public void removeTimeoutObject(HandshakeTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
+        private void processIdle() {
+            cleanupRecovery();
 
-            timeoutObjs.remove(timeoutObj);
-        }
+            for (Map.Entry<UUID, GridCommunicationClient> e : clients.entrySet()) {
+                UUID nodeId = e.getKey();
 
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Socket timeout worker has been started.");
+                GridCommunicationClient client = e.getValue();
 
-            while (!isInterrupted()) {
-                long now = U.currentTimeMillis();
+                ClusterNode node = getSpiContext().node(nodeId);
 
-                for (Iterator<HandshakeTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
-                    HandshakeTimeoutObject timeoutObj = iter.next();
+                if (node == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Forcing close of non-existent node connection: " + nodeId);
+
+                    client.forceClose();
+
+                    clients.remove(nodeId, client);
+
+                    continue;
+                }
 
-                    if (timeoutObj.endTime() <= now) {
-                        iter.remove();
+                GridNioRecoveryDescriptor recovery = null;
 
-                        timeoutObj.onTimeout();
+                if (client instanceof GridTcpNioCommunicationClient) {
+                    recovery = recoveryDescs.get(new ClientKey(node.id(), node.order()));
+
+                    if (recovery != null && recovery.lastAcknowledged() != recovery.received()) {
+                        RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received());
+
+                        if (log.isDebugEnabled())
+                            log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId +
+                                ", rcvCnt=" + msg.received() + ']');
+
+                        nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+
+                        recovery.lastAcknowledged(msg.received());
+
+                        continue;
                     }
-                    else
-                        break;
                 }
 
-                synchronized (mux0) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTimeoutObject(..)' method.
-                        HandshakeTimeoutObject first = timeoutObjs.firstx();
+                long idleTime = client.getIdleTime();
 
-                        if (first != null) {
-                            long waitTime = first.endTime() - U.currentTimeMillis();
+                if (idleTime >= idleConnTimeout) {
+                    if (recovery != null &&
+                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+                        !recovery.messagesFutures().isEmpty()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Node connection is idle, but there are unacknowledged messages, " +
+                                "will wait: " + nodeId);
 
-                            if (waitTime > 0)
-                                mux0.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux0.wait(5000);
+                        continue;
                     }
+
+                    if (log.isDebugEnabled())
+                        log.debug("Closing idle node connection: " + nodeId);
+
+                    if (client.close() || client.closed())
+                        clients.remove(nodeId, client);
                 }
             }
         }
-    }
-
-    /**
-     *
-     */
-    private class RecoveryWorker extends IgniteSpiThread {
-        /** */
-        private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
 
         /**
          *
          */
-        private RecoveryWorker() {
-            super(gridName, "tcp-comm-recovery-worker", log);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Recovery worker has been started.");
+        private void cleanupRecovery() {
+            Set<ClientKey> left = null;
 
-            while (!isInterrupted()) {
-                GridNioRecoveryDescriptor recoveryDesc = q.take();
+            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : recoveryDescs.entrySet()) {
+                if (left != null && left.contains(e.getKey()))
+                    continue;
 
-                assert recoveryDesc != null;
+                GridNioRecoveryDescriptor recoverySnd = e.getValue();
 
-                ClusterNode node = recoveryDesc.node();
+                if (!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+                    if (left == null)
+                        left = new HashSet<>();
 
-                if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
-                    continue;
+                    left.add(e.getKey());
+                }
+            }
 
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+            if (left != null) {
+                assert !left.isEmpty();
 
-                    GridCommunicationClient client = reserveClient(node);
+                for (ClientKey id : left) {
+                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
 
-                    client.release();
+                    if (recoverySnd != null)
+                        recoverySnd.onNodeLeft();
                 }
-                catch (IgniteCheckedException | IgniteException e) {
-                    if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, will retry " +
-                                "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+            }
+        }
 
-                        addReconnectRequest(recoveryDesc);
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Recovery reconnect failed, " +
-                                "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+        /**
+         * @param recoveryDesc Recovery descriptor.
+         */
+        private void processRecovery(GridNioRecoveryDescriptor recoveryDesc) {
+            ClusterNode node = recoveryDesc.node();
 
-                        onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
-                            e);
-                    }
+            if (clients.containsKey(node.id()) || !recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                return;
 
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Recovery reconnect [rmtNode=" + recoveryDesc.node().id() + ']');
+
+                GridCommunicationClient client = reserveClient(node);
+
+                client.release();
+            }
+            catch (IgniteCheckedException | IgniteException e) {
+                if (recoveryDesc.nodeAlive(getSpiContext().node(node.id()))) {
+                    if (log.isDebugEnabled())
+                        log.debug("Recovery reconnect failed, will retry " +
+                            "[rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                    addReconnectRequest(recoveryDesc);
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Recovery reconnect failed, " +
+                            "node left [rmtNode=" + recoveryDesc.node().id() + ", err=" + e + ']');
+
+                    onException("Recovery reconnect failed, node left [rmtNode=" + recoveryDesc.node().id() + "]",
+                        e);
                 }
             }
         }
@@ -2497,12 +2369,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /**
      *
      */
-    private static class HandshakeTimeoutObject<T> {
-        /** */
-        private static final AtomicLong idGen = new AtomicLong();
-
+    private static class HandshakeTimeoutObject<T> implements IgniteSpiTimeoutObject {
         /** */
-        private final long id = idGen.incrementAndGet();
+        private final IgniteUuid id = IgniteUuid.randomUuid();
 
         /** */
         private final T obj;
@@ -2533,34 +2402,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return done.compareAndSet(false, true);
         }
 
-        /**
-         * @return {@code True} if object has not yet been canceled.
-         */
-        boolean onTimeout() {
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
             if (done.compareAndSet(false, true)) {
                 // Close socket - timeout occurred.
                 if (obj instanceof GridCommunicationClient)
                     ((GridCommunicationClient)obj).forceClose();
                 else
                     U.closeQuiet((AbstractInterruptibleChannel)obj);
-
-                return true;
             }
-
-            return false;
         }
 
-        /**
-         * @return End time.
-         */
-        long endTime() {
+        /** {@inheritDoc} */
+        @Override public long endTime() {
             return endTime;
         }
 
-        /**
-         * @return ID.
-         */
-        long id() {
+        /** {@inheritDoc} */
+        @Override public IgniteUuid id() {
             return id;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e672d64..5b66019 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -116,7 +116,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
         b.append("    Socket reader: ").append(threadStatus(sockReader)).append(U.nl());
         b.append("    Socket writer: ").append(threadStatus(sockWriter)).append(U.nl());
-        b.append("    Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
 
         b.append(U.nl());
 
@@ -524,11 +523,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         U.interrupt(sockWriter);
         U.interrupt(msgWorker);
-        U.interrupt(spi.sockTimeoutWorker);
 
         U.join(sockWriter, log);
         U.join(msgWorker, log);
-        U.join(spi.sockTimeoutWorker, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 57c13d6..485f57d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1412,7 +1412,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             b.append("    Message worker: ").append(threadStatus(msgWorker)).append(U.nl());
             b.append("    Check status sender: ").append(threadStatus(chkStatusSnd)).append(U.nl());
             b.append("    HB sender: ").append(threadStatus(hbsSnd)).append(U.nl());
-            b.append("    Socket timeout worker: ").append(threadStatus(spi.sockTimeoutWorker)).append(U.nl());
             b.append("    IP finder cleaner: ").append(threadStatus(ipFinderCleaner)).append(U.nl());
             b.append("    Stats printer: ").append(threadStatus(statsPrinter)).append(U.nl());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 56fb63f..8365716 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -255,9 +255,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     /** Internal and external addresses of local node. */
     protected Collection<InetSocketAddress> locNodeAddrs;
 
-    /** Socket timeout worker. */
-    protected SocketTimeoutWorker sockTimeoutWorker;
-
     /** Start time of the very first grid node. */
     protected volatile long gridStartTime;
 
@@ -1118,7 +1115,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         IOException err = null;
 
@@ -1136,7 +1133,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Throw original exception.
             if (err != null)
@@ -1180,7 +1177,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         IOException err = null;
 
@@ -1198,7 +1195,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Throw original exception.
             if (err != null)
@@ -1222,7 +1219,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
 
-        sockTimeoutWorker.addTimeoutObject(obj);
+        addTimeoutObject(obj);
 
         OutputStream out = sock.getOutputStream();
 
@@ -1240,7 +1237,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             boolean cancelled = obj.cancel();
 
             if (cancelled)
-                sockTimeoutWorker.removeTimeoutObject(obj);
+                removeTimeoutObject(obj);
 
             // Throw original exception.
             if (err != null)
@@ -1599,9 +1596,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
                 mcastIpFinder.setLocalAddress(locAddr);
         }
 
-        sockTimeoutWorker = new SocketTimeoutWorker();
-        sockTimeoutWorker.start();
-
         impl.spiStart(gridName);
     }
 
@@ -1611,9 +1605,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             // Safety.
             ctxInitLatch.countDown();
 
-        U.interrupt(sockTimeoutWorker);
-        U.join(sockTimeoutWorker, log);
-
         if (ipFinder != null) {
             try {
                 ipFinder.close();
@@ -1754,117 +1745,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
     }
 
     /**
-     * Handles sockets timeouts.
-     */
-    protected class SocketTimeoutWorker extends IgniteSpiThread {
-        /** Time-based sorted set for timeout objects. */
-        private final GridConcurrentSkipListSet<SocketTimeoutObject> timeoutObjs =
-            new GridConcurrentSkipListSet<>(new Comparator<SocketTimeoutObject>() {
-                @Override public int compare(SocketTimeoutObject o1, SocketTimeoutObject o2) {
-                    int res = Long.compare(o1.endTime(), o2.endTime());
-
-                    if (res != 0)
-                        return res;
-
-                    return Long.compare(o1.id(), o2.id());
-                }
-            });
-
-        /** Mutex. */
-        private final Object mux0 = new Object();
-
-        /**
-         *
-         */
-        SocketTimeoutWorker() {
-            super(gridName, "tcp-disco-sock-timeout-worker", log);
-
-            setPriority(threadPri);
-        }
-
-        /**
-         * @param timeoutObj Timeout object to add.
-         */
-        @SuppressWarnings({"NakedNotify"})
-        public void addTimeoutObject(SocketTimeoutObject timeoutObj) {
-            assert timeoutObj != null && timeoutObj.endTime() > 0 && timeoutObj.endTime() != Long.MAX_VALUE;
-
-            timeoutObjs.add(timeoutObj);
-
-            if (timeoutObjs.firstx() == timeoutObj) {
-                synchronized (mux0) {
-                    mux0.notifyAll();
-                }
-            }
-        }
-
-        /**
-         * @param timeoutObj Timeout object to remove.
-         */
-        public void removeTimeoutObject(SocketTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
-
-            timeoutObjs.remove(timeoutObj);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            if (log.isDebugEnabled())
-                log.debug("Socket timeout worker has been started.");
-
-            while (!isInterrupted()) {
-                long now = U.currentTimeMillis();
-
-                for (Iterator<SocketTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
-                    SocketTimeoutObject timeoutObj = iter.next();
-
-                    if (timeoutObj.endTime() <= now) {
-                        iter.remove();
-
-                        if (timeoutObj.onTimeout()) {
-                            LT.warn(log, null, "Socket write has timed out (consider increasing " +
-                                "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
-
-                            stats.onSocketTimeout();
-                        }
-                    }
-                    else
-                        break;
-                }
-
-                synchronized (mux0) {
-                    while (true) {
-                        // Access of the first element must be inside of
-                        // synchronization block, so we don't miss out
-                        // on thread notification events sent from
-                        // 'addTimeoutObject(..)' method.
-                        SocketTimeoutObject first = timeoutObjs.firstx();
-
-                        if (first != null) {
-                            long waitTime = first.endTime() - U.currentTimeMillis();
-
-                            if (waitTime > 0)
-                                mux0.wait(waitTime);
-                            else
-                                break;
-                        }
-                        else
-                            mux0.wait(5000);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
      * Socket timeout object.
      */
-    private static class SocketTimeoutObject {
+    private class SocketTimeoutObject implements IgniteSpiTimeoutObject {
         /** */
-        private static final AtomicLong idGen = new AtomicLong();
-
-        /** */
-        private final long id = idGen.incrementAndGet();
+        private final IgniteUuid id = IgniteUuid.randomUuid();
 
         /** */
         private final Socket sock;
@@ -1894,31 +1779,26 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
             return done.compareAndSet(false, true);
         }
 
-        /**
-         * @return {@code True} if object has not yet been canceled.
-         */
-        boolean onTimeout() {
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
             if (done.compareAndSet(false, true)) {
                 // Close socket - timeout occurred.
                 U.closeQuiet(sock);
 
-                return true;
-            }
+                LT.warn(log, null, "Socket write has timed out (consider increasing " +
+                    "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
 
-            return false;
+                stats.onSocketTimeout();
+            }
         }
 
-        /**
-         * @return End time.
-         */
-        long endTime() {
+        /** {@inheritDoc} */
+        @Override public long endTime() {
             return endTime;
         }
 
-        /**
-         * @return ID.
-         */
-        long id() {
+        /** {@inheritDoc} */
+        @Override public  IgniteUuid id() {
             return id;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
index 0f2a898..80e6123 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
@@ -28,6 +28,7 @@ import org.jetbrains.annotations.*;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 import static java.util.concurrent.TimeUnit.*;
 
@@ -258,6 +259,107 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
         checkRemovedLatch(latch);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLatchMultinode1() throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        IgniteCountDownLatch latch = grid(0).countDownLatch("l1", 10,
+            true,
+            true);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        final AtomicBoolean countedDown = new AtomicBoolean();
+
+        for (int i = 0; i < gridCount(); i++) {
+            final Ignite ignite = grid(i);
+
+            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteCountDownLatch latch = ignite.countDownLatch("l1", 10,
+                        true,
+                        false);
+
+                    assertNotNull(latch);
+
+                    boolean wait = latch.await(30_000);
+
+                    assertTrue(countedDown.get());
+
+                    assertEquals(0, latch.count());
+
+                    assertTrue(wait);
+
+                    return null;
+                }
+            }));
+        }
+
+        for (int i = 0; i < 10; i++) {
+            if (i == 9)
+                countedDown.set(true);
+
+            latch.countDown();
+        }
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get(30_000);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLatchMultinode2() throws Exception {
+        if (gridCount() == 1)
+            return;
+
+        IgniteCountDownLatch latch = grid(0).countDownLatch("l2", gridCount() * 3,
+            true,
+            true);
+
+        assertNotNull(latch);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        final AtomicInteger cnt = new AtomicInteger();
+
+        for (int i = 0; i < gridCount(); i++) {
+            final Ignite ignite = grid(i);
+
+            futs.add(GridTestUtils.runAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    IgniteCountDownLatch latch = ignite.countDownLatch("l2", 10,
+                        true,
+                        false);
+
+                    assertNotNull(latch);
+
+                    for (int i = 0; i < 3; i++) {
+                        cnt.incrementAndGet();
+
+                        latch.countDown();
+                    }
+
+                    boolean wait = latch.await(30_000);
+
+                    assertEquals(gridCount() * 3, cnt.get());
+
+                    assertEquals(0, latch.count());
+
+                    assertTrue(wait);
+
+                    return null;
+                }
+            }));
+        }
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get(30_000);
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
new file mode 100644
index 0000000..602ac18
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheClientNearCacheExpiryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.expiry.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteCacheClientNearCacheExpiryTest extends IgniteCacheAbstractTest {
+    /** */
+    private static final int NODES = 3;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return NODES;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.equals(getTestGridName(NODES - 1)))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpirationOnClient() throws Exception {
+        Ignite ignite = grid(NODES - 1);
+
+        assertTrue(ignite.configuration().isClientMode());
+
+        IgniteCache<Object, Object> cache = ignite.cache(null);
+
+        assertTrue(((IgniteCacheProxy)cache).context().isNear());
+
+        for (int i = 0 ; i < 100; i++)
+            cache.put(i, i);
+
+        CreatedExpiryPolicy plc = new CreatedExpiryPolicy(new Duration(TimeUnit.MILLISECONDS, 500));
+
+        IgniteCache<Object, Object> cacheWithExpiry = cache.withExpiryPolicy(plc);
+
+        for (int i = 100 ; i < 200; i++) {
+            cacheWithExpiry.put(i, i);
+
+            assertEquals(i, cacheWithExpiry.localPeek(i));
+        }
+
+        U.sleep(1000);
+
+        for (int i = 0 ; i < 100; i++)
+            assertEquals(i, cacheWithExpiry.localPeek(i));
+
+        for (int i = 100 ; i < 200; i++)
+            assertNull(cache.localPeek(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index c006f69..c78ec5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -50,6 +50,8 @@ public class IgniteCacheExpiryPolicyTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteCacheTtlCleanupSelfTest.class);
 
+        suite.addTestSuite(IgniteCacheClientNearCacheExpiryTest.class);
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/873e01bb/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 5867fb8..21f9424 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -501,6 +501,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+        // No-op.
+    }
+
     /**
      * @param cacheName Cache name.
      * @return Map representing cache.


Mime
View raw message