ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/2] ignite git commit: ignite-4705
Date Tue, 28 Feb 2017 14:52:49 GMT
ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb84ae5e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb84ae5e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb84ae5e

Branch: refs/heads/ignite-4705
Commit: eb84ae5e56f6c7ddebf941a71798b13e9c76d782
Parents: fa2ab53
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Feb 28 11:29:05 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Feb 28 17:32:03 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheObjectContext.java    |   3 +-
 .../processors/cache/KeyCacheObjectImpl.java    |   1 -
 .../dht/atomic/GridDhtAtomicCache.java          |  38 ++-----
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   6 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |   5 +-
 .../IgniteCacheObjectProcessorImpl.java         |  14 +--
 .../IgniteCommunicationBalanceTest.java         |   1 +
 .../cache/CacheRebalancingSelfTest.java         |  16 ++-
 .../GridCacheAtomicMessageCountSelfTest.java    |   8 +-
 .../GridCacheVersionTopologyChangeTest.java     |   3 +
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |   1 +
 ...eMarshallerCacheConcurrentReadWriteTest.java |   1 +
 .../IgniteCollectionAbstractTest.java           |   1 +
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   1 +
 .../atomic/IgniteCacheAtomicProtocolTest.java   | 103 ++++++++++++++++++-
 .../OptimizedMarshallerNodeFailoverTest.java    |   3 +
 16 files changed, 150 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
index c4203ef..a777ab6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
@@ -33,7 +33,8 @@ import org.apache.ignite.internal.util.typedef.F;
 /**
  *
  */
-@SuppressWarnings("TypeMayBeWeakened") public class CacheObjectContext {
+@SuppressWarnings("TypeMayBeWeakened")
+public class CacheObjectContext {
     /** */
     private GridKernalContext kernalCtx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index eb305bf..4f8570c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -47,7 +47,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
      */
     public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) {
         assert val != null;
-        assert part >= 0 : part;
 
         this.val = val;
         this.valBytes = valBytes;

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 97be512..2f5a74e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1330,8 +1330,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
         CacheEntryPredicate[] filters = CU.filterArray(filter);
 
-        if (conflictPutVal == null &&
-            conflictRmvVer == null) {
+        if (conflictPutVal == null && conflictRmvVer == null) {
             return new GridNearAtomicSingleUpdateFuture(
                 ctx,
                 this,
@@ -1377,19 +1376,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     }
 
     /**
-     * Whether this is fast-map operation.
-     *
-     * @param filters Filters.
-     * @param op Operation.
-     * @return {@code True} if fast-map.
-     */
-    public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) {
-        return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode()
== FULL_SYNC &&
-            ctx.config().getAtomicWriteOrderMode() == CLOCK &&
-            !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
-    }
-
-    /**
      * Entry point for all public API remove methods.
      *
      * @param keys Keys to remove.
@@ -1777,7 +1763,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         try {
             // If batch store update is enabled, we need to lock all entries.
             // First, need to acquire locks on cache entries, then check filter.
-            List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion());
+            List<GridDhtCacheEntry> locked = null;
 
             Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted
= null;
 
@@ -1808,6 +1794,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                             return;
                         }
 
+                        locked = lockEntries(req, req.topologyVersion());
+
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
                         // Assign next version for update inside entries lock.
@@ -1925,6 +1913,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 log.debug("Caught invalid partition exception for cache entry (will remap
update request): " + req);
 
             remap = true;
+
+            res.remapTopologyVersion(ctx.topology().topologyVersion());
         }
         catch (Throwable e) {
             // At least RuntimeException can be thrown by the code above when GridCacheContext
is cleaned and there is
@@ -2747,7 +2737,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         /*event*/true,
                         /*metrics*/true,
                         primary,
-                        ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version
in CLOCK mode on primary node.
+                        /*verCheck*/false,
                         topVer,
                         null,
                         replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
@@ -2898,11 +2888,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         return Collections.singletonList(entry);
                 }
                 catch (GridDhtInvalidPartitionException e) {
-                    // Ignore invalid partition exception in CLOCK ordering mode.
-                    if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
-                        return Collections.singletonList(null);
-                    else
-                        throw e;
+                    throw e;
                 }
             }
         }
@@ -2917,11 +2903,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         locked.add(entry);
                     }
                     catch (GridDhtInvalidPartitionException e) {
-                        // Ignore invalid partition exception in CLOCK ordering mode.
-                        if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
-                            locked.add(null);
-                        else
-                            throw e;
+                        throw e;
                     }
                 }
 
@@ -3075,7 +3057,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             drPutVals = null;
         }
 
-        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+        GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,
             ctx.config().getWriteSynchronizationMode(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7ce4ebd..9d60a74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -183,7 +183,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     res = primaryFailedResponse(req, nodeId);
             }
             else {
-                assert mappings != null;
+                if (mappings == null)
+                    return false;
 
                 PrimaryRequestState reqState = mappings.get(nodeId);
 
@@ -419,7 +420,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                 rcvAll = singleReq.onPrimaryResponse(cctx, res);
             }
             else {
-                assert mappings != null;
+                if (mappings == null)
+                    return;
 
                 PrimaryRequestState reqState = mappings.get(nodeId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 5ca3da8..35fbe11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -140,7 +140,10 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
      *        before stored in cache.
      * @return Cache key object.
      */
-    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext
cctx, Object obj, boolean userObj);
+    public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx,
+        @Nullable GridCacheContext cctx,
+        Object obj,
+        boolean userObj);
 
     /**
      * @param ctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index 5cacb6f..dbd8ea1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -231,8 +231,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter
impleme
                 cctx.affinity().partition(obj, false) :
                 ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null);
         }
-        catch (IgniteCheckedException ignored) {
-            U.error(log, "Failed to get partition");
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to get partition", e);
 
             return  -1;
         }
@@ -327,13 +327,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter
impleme
 
         /**
          * @param key Key.
-         */
-        UserKeyCacheObjectImpl(Object key) {
-            this(key, -1);
-        }
-
-        /**
-         * @param key Key.
+         * @param part Partition.
          */
         UserKeyCacheObjectImpl(Object key, int part) {
             super(key, null, part);
@@ -341,6 +335,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter
impleme
 
         /**
          * @param key Key.
+         * @param valBytes Marshalled key.
+         * @param part Partition.
          */
         UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
             super(key, valBytes, part);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
index 4271417..9287d3d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteCommunicationBalanceTest.java
@@ -64,6 +64,7 @@ public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest
{
         commSpi.setSharedMemoryPort(-1);
         commSpi.setConnectionsPerNode(connectionsPerNode());
         commSpi.setUsePairedConnections(usePairedConnections());
+        commSpi.setIdleConnectionTimeout(500);
 
         if (selectors > 0)
             commSpi.setSelectorsCount(selectors);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
index 8d1f67a..2b63e62 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRebalancingSelfTest.java
@@ -40,6 +40,13 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
     /**
      * @throws Exception If failed.
      */
@@ -63,13 +70,12 @@ public class CacheRebalancingSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param future Future.
+     * @param fut Future.
      * @return Internal future.
      */
-    private static IgniteInternalFuture internalFuture(IgniteFuture future) {
-        assert future instanceof IgniteFutureImpl;
+    private static IgniteInternalFuture internalFuture(IgniteFuture fut) {
+        assert fut instanceof IgniteFutureImpl : fut;
 
-        return ((IgniteFutureImpl)future).internalFuture();
+        return ((IgniteFutureImpl) fut).internalFuture();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index e8c5db1..dab37fe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -206,14 +206,14 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
         private Map<Class<?>, AtomicInteger> cntMap = new HashMap<>();
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackC)
             throws IgniteSpiException {
             AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass());
 
             if (cntr != null)
                 cntr.incrementAndGet();
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
 
         /**
@@ -221,7 +221,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
          *
          * @param cls Class to count.
          */
-        public void registerMessage(Class<?> cls) {
+        void registerMessage(Class<?> cls) {
             AtomicInteger cntr = cntMap.get(cls);
 
             if (cntr == null)
@@ -232,7 +232,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
          * @param cls Message type to get count.
          * @return Number of messages of given class.
          */
-        public int messageCount(Class<?> cls) {
+        int messageCount(Class<?> cls) {
             AtomicInteger cntr = cntMap.get(cls);
 
             return cntr == null ? 0 : cntr.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java
index 3e80525..6510db5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheVersionTopologyChangeTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -56,6 +57,8 @@ public class GridCacheVersionTopologyChangeTest extends GridCommonAbstractTest
{
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(500);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 1259f3e..a7aff6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -79,6 +79,7 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
 
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setIdleConnectionTimeout(500);
 
         cfg.setCommunicationSpi(commSpi);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java
index ad6f604..ed4a7c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java
@@ -53,6 +53,7 @@ public class IgniteMarshallerCacheConcurrentReadWriteTest extends GridCommonAbst
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(500);
 
         CacheConfiguration ccfg = new CacheConfiguration();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
index 3e38a58..b4ab7c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCollectionAbstractTest.java
@@ -47,6 +47,7 @@ public abstract class IgniteCollectionAbstractTest extends GridCommonAbstractTes
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
         ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(500);
 
         TcpDiscoverySpi spi = new TcpDiscoverySpi();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 3fd4dd8..86644e2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -101,6 +101,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
         DelayCommunicationSpi spi = new DelayCommunicationSpi();
 
         spi.setSharedMemoryPort(-1);
+        spi.setIdleConnectionTimeout(500);
 
         cfg.setCommunicationSpi(spi);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 5b084d9..d26f11b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -22,20 +22,24 @@ import java.util.List;
 import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -55,6 +59,8 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        cfg.setConsistentId(gridName);
+
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
         TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
@@ -100,7 +106,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
 
         Ignite client = startGrid(4);
 
-        IgniteCache<Integer, Integer> nearCache = client.createCache(cacheConfiguration(1));
+        IgniteCache<Integer, Integer> nearCache = client.createCache(cacheConfiguration(1,
FULL_SYNC));
         IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
 
         awaitPartitionMapExchange();
@@ -152,7 +158,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
 
         Ignite client = startGrid(4);
 
-        IgniteCache<Integer, Integer> nearCache = client.createCache(cacheConfiguration(1));
+        IgniteCache<Integer, Integer> nearCache = client.createCache(cacheConfiguration(1,
FULL_SYNC));
         IgniteCache<Integer, Integer> nearAsyncCache = nearCache.withAsync();
 
         awaitPartitionMapExchange();
@@ -188,6 +194,93 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testFullAsyncPutRemap() throws Exception {
+        fullAsyncRemap(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFullAsyncPutAllRemap() throws Exception {
+        fullAsyncRemap(true);
+    }
+
+    /**
+     * @param putAll Test putAll flag.
+     * @throws Exception If failed.
+     */
+    private void fullAsyncRemap(boolean putAll) throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        client = true;
+
+        Ignite clientNode = startGrid(1);
+
+        client = false;
+
+        final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1,
FULL_ASYNC));
+
+        List<Integer> keys = primaryKeys(srv0.cache(TEST_CACHE), putAll ? 3 : 1);
+
+        testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name());
+        testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
+
+        final Map<Integer, Integer> map = new HashMap<>();
+
+        for (Integer key : keys)
+            map.put(key, -key);
+
+        if (putAll)
+            nearCache.putAll(map);
+        else
+            nearCache.put(keys.get(0), map.get(keys.get(0)));
+
+        int nodeIdx = 2;
+
+        Affinity<Object> aff = clientNode.affinity(TEST_CACHE);
+
+        int keysMoved;
+
+        do {
+            startGrid(nodeIdx);
+
+            awaitPartitionMapExchange();
+
+            keysMoved = 0;
+
+            for (Integer key : keys) {
+                if (!aff.isPrimary(srv0.cluster().localNode(), key))
+                    keysMoved++;
+            }
+
+            if (keysMoved == keys.size())
+                break;
+
+            nodeIdx++;
+        }
+        while (nodeIdx < 10);
+
+        assertEquals(keys.size(), keysMoved);
+
+        testSpi(clientNode).stopBlock(true);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (Integer key : map.keySet()) {
+                    if (nearCache.get(key) == null)
+                        return false;
+                }
+
+                return true;
+            }
+        }, 5000);
+
+        checkData(map);
+    }
+
+    /**
      * @param expData Expected cache data.
      */
     private void checkData(Map<Integer, Integer> expData) {
@@ -231,14 +324,16 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest
{
 
     /**
      * @param backups Number of backups.
+     * @param writeSync Cache write synchronization mode.
      * @return Cache configuration.
      */
-    private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups) {
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups,
+        CacheWriteSynchronizationMode writeSync) {
         CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
 
         ccfg.setName(TEST_CACHE);
         ccfg.setAtomicityMode(ATOMIC);
-        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setWriteSynchronizationMode(writeSync);
         ccfg.setBackups(backups);
 
         return ccfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/eb84ae5e/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
index ebbd8ac..41a73d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerNodeFailoverTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -77,6 +78,8 @@ public class OptimizedMarshallerNodeFailoverTest extends GridCommonAbstractTest
         else
             cfg.setClientMode(true);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setIdleConnectionTimeout(500);
+
         return cfg;
     }
 


Mime
View raw message