ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: MvccFuture map optimization.
Date Wed, 18 Nov 2015 14:21:45 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-single-op-get 647e99041 -> 7e32db9b6


MvccFuture map optimization.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e32db9b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e32db9b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e32db9b

Branch: refs/heads/ignite-single-op-get
Commit: 7e32db9b64b9c34e1d3f5d805b1f410a9d5cbf17
Parents: 647e990
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Nov 18 16:59:09 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Nov 18 17:21:01 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  16 +-
 .../processors/cache/GridCacheIoManager.java    |  16 +-
 .../processors/cache/GridCacheMvccManager.java  |   4 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   9 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   2 +-
 .../dht/GridPartitionedGetFuture.java           |   1 -
 .../dht/GridPartitionedSingleGetFuture.java     |   1 -
 .../dht/atomic/GridDhtAtomicCache.java          |   2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  38 +-
 .../near/GridNearTxFinishFuture.java            |   2 +-
 .../IgniteClientReconnectCacheTest.java         |  11 +-
 .../cache/CacheFutureExceptionSelfTest.java     |   8 +-
 .../CacheGetFutureHangsSelfTest.java            |   6 +
 .../IgniteCacheSingleGetMessageTest.java        | 357 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   3 +
 15 files changed, 435 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d19bb8d..10b8976 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1476,14 +1476,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      * @return Future for the get operation.
      */
     protected IgniteInternalFuture<V> getAsync(
-            final K key,
-            boolean forcePrimary,
-            boolean skipTx,
-            @Nullable UUID subjId,
-            String taskName,
-            boolean deserializePortable,
-            final boolean skipVals,
-            boolean canRemap
+        final K key,
+        boolean forcePrimary,
+        boolean skipTx,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        final boolean skipVals,
+        boolean canRemap
     ) {
         return getAllAsync(Collections.singletonList(key),
             forcePrimary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index ab9b2eb..5c3eb3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -34,14 +34,26 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.util.F0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index ddea8f5..f28f9b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -146,7 +146,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
 
                 if (futCol != null) {
                     synchronized (futCol) {
-                        for (GridCacheMvccFuture fut : futCol) {
+                        for (GridCacheMvccFuture<?> fut : futCol) {
                             if (!fut.isDone()) {
                                 GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut;
 
@@ -430,6 +430,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter
{
         GridCacheFuture<?> old = futs.put(futId, fut);
 
         assert old == null : old;
+
+        onFutureAdded(fut);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index aa22f45..237a35d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
 import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -48,7 +47,13 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index e544817..bb370a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -66,7 +66,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
     private GridCacheSharedContext<K, V> cctx;
 
     /** Future ID. */
-    private IgniteUuid futId;
+    private final IgniteUuid futId;
 
     /** Transaction. */
     @GridToStringExclude

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 08748ae..c3d9836 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -56,7 +56,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index a12219c..38b8687 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.UUID;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index b1911dc..75f8c2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -60,7 +60,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -979,6 +978,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
      * @param skipStore Skip store flag.
+     * @param canRemap Can remap flag.
      * @return Get future.
      */
     private IgniteInternalFuture<V> getAsync0(KeyCacheObject key,

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 5443c9f..5b90558 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -214,28 +214,30 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
         if (tx != null && !tx.implicit() && !skipTx) {
             return asyncOp(tx, new AsyncOp<V>() {
                 @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx)
{
-                    return tx.getAllAsync(ctx,
-                            Collections.singleton(ctx.toCacheKeyObject(key)),
-                            deserializePortable,
-                            skipVals,
-                            false,
-                            opCtx != null && opCtx.skipStore()).chain(
-                                new CX1<IgniteInternalFuture<Map<Object, Object>>,
V>() {
-                                @Override public V applyx(IgniteInternalFuture<Map<Object,
Object>> e)
-                                    throws IgniteCheckedException {
-                                    Map<Object, Object> map = e.get();
+                    IgniteInternalFuture<Map<Object, Object>>  fut = tx.getAllAsync(ctx,
+                        Collections.singleton(ctx.toCacheKeyObject(key)),
+                        deserializePortable,
+                        skipVals,
+                        false,
+                        opCtx != null && opCtx.skipStore());
 
-                                    assert map.isEmpty() || map.size() == 1 : map.size();
+                    return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>,
V>() {
+                        @SuppressWarnings("unchecked")
+                        @Override public V applyx(IgniteInternalFuture<Map<Object,
Object>> e)
+                            throws IgniteCheckedException {
+                            Map<Object, Object> map = e.get();
 
-                                    if (skipVals) {
-                                        Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map);
+                            assert map.isEmpty() || map.size() == 1 : map.size();
 
-                                        return (V)(val);
-                                    }
+                            if (skipVals) {
+                                Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map);
 
-                                    return (V)map.get(key);
-                                }
-                            });
+                                return (V)(val);
+                            }
+
+                            return (V)map.get(key);
+                        }
+                    });
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index eb5db4c..166d6df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -78,7 +78,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     private GridCacheSharedContext<K, V> cctx;
 
     /** Future ID. */
-    private IgniteUuid futId;
+    private final IgniteUuid futId;
 
     /** Transaction. */
     @GridToStringInclude

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 6131f54..6cf10f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -765,6 +766,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
             }
         };
 
+        IgniteInClosure<IgniteCache<Object, Object>> getAllOp = new CI1<IgniteCache<Object,
Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                cache.getAll(F.asSet(1, 2));
+            }
+        };
+
         int cnt = 0;
 
         for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
@@ -799,7 +806,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
 
                     log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode="
+ syncMode + ']');
 
-                    checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class,
getOp);
+                    checkOperationInProgressFails(client, ccfg, GridNearSingleGetResponse.class,
getOp);
+
+                    checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class,
getAllOp);
 
                     client.destroyCache(ccfg.getName());
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheFutureExceptionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheFutureExceptionSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheFutureExceptionSelfTest.java
index b0dcb78..494ca52 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheFutureExceptionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheFutureExceptionSelfTest.java
@@ -78,13 +78,13 @@ public class CacheFutureExceptionSelfTest extends GridCommonAbstractTest
{
 
         startGrid(1);
 
-       // testGet(false, false);
+        testGet(false, false);
 
         testGet(false, true);
 
-       // testGet(true, false);
+        testGet(true, false);
 
-       // testGet(true, true);
+        testGet(true, true);
     }
 
     /**
@@ -139,7 +139,7 @@ public class CacheFutureExceptionSelfTest extends GridCommonAbstractTest
{
             }
         });
 
-        assertTrue(futLatch.await(5666, SECONDS));
+        assertTrue(futLatch.await(5, SECONDS));
 
         srv.destroyCache(cache.getName());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index 53ac648..c005945 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -29,6 +30,7 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -135,12 +137,16 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest
{
                     @Override public void run() {
                         T2<Ignite, Integer> ignite;
 
+                        Set<Integer> keys = F.asSet(1, 2, 3, 4, 5);
+
                         while ((ignite = randomNode()) != null) {
                             IgniteCache<Object, Object> cache = ignite.get1().cache(null);
 
                             for (int i = 0; i < 100; i++)
                                 cache.containsKey(ThreadLocalRandom.current().nextInt(100_000));
 
+                            cache.containsKeys(keys);
+
                             assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1()));
 
                             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
new file mode 100644
index 0000000..42b5ee3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(client);
+
+        TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(SRVS);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, 1);
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSingleGetMessage() throws Exception {
+        assertFalse(ignite(0).configuration().isClientMode());
+        assertTrue(ignite(SRVS).configuration().isClientMode());
+
+        List<CacheConfiguration<Integer, Integer>> ccfgs = cacheConfigurations();
+
+        for (int i = 0; i < ccfgs.size(); i++) {
+            CacheConfiguration<Integer, Integer> ccfg = ccfgs.get(i);
+
+            ccfg.setName("cache-" + i);
+
+            log.info("Test cache: " + i);
+
+            ignite(0).createCache(ccfg);
+
+            try {
+                IgniteCache<Integer, Integer> srvCache = ignite(0).cache(ccfg.getName());
+                IgniteCache<Integer, Integer> clientCache = ignite(SRVS).cache(ccfg.getName());
+
+                Integer key = nearKey(clientCache);
+
+                checkSingleGetMessage(clientCache, key, false);
+
+                if (ccfg.getBackups() > 0) {
+                    key = backupKeys(srvCache, 1, 100_000).get(0);
+
+                    checkSingleGetMessage(srvCache, key, true);
+                }
+
+                if (ccfg.getCacheMode() != REPLICATED) {
+                    key = nearKeys(srvCache, 1, 200_000).get(0);
+
+                    checkSingleGetMessage(srvCache, key, false);
+                }
+            }
+            finally {
+                ignite(0).destroyCache(ccfg.getName());
+            }
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param backup {@code True} if given key is backup key.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void checkSingleGetMessage(IgniteCache<Integer, Integer> cache,
+        Integer key,
+        boolean backup) throws Exception {
+        CacheConfiguration<Integer, Integer> ccfg = cache.getConfiguration(CacheConfiguration.class);
+
+        Ignite node = cache.unwrap(Ignite.class);
+
+        TestCommunicationSpi spi = (TestCommunicationSpi)node.configuration().getCommunicationSpi();
+
+        spi.record(GridNearSingleGetRequest.class);
+
+        Ignite primary = primaryNode(key, cache.getName());
+
+        assertNotSame(node, primary);
+
+        TestCommunicationSpi primarySpi = (TestCommunicationSpi)primary.configuration().getCommunicationSpi();
+
+        primarySpi.record(GridNearSingleGetResponse.class);
+
+        assertNull(cache.get(key));
+
+        checkMessages(spi, primarySpi);
+
+        assertFalse(cache.containsKey(key));
+
+        checkMessages(spi, primarySpi);
+
+        cache.put(key, 1);
+
+        assertNotNull(cache.get(key));
+
+        if (backup)
+            checkNoMessages(spi, primarySpi);
+        else
+            checkMessages(spi, primarySpi);
+
+        assertTrue(cache.containsKey(key));
+
+        if (backup)
+            checkNoMessages(spi, primarySpi);
+        else
+            checkMessages(spi, primarySpi);
+
+        if (ccfg.getAtomicityMode() == TRANSACTIONAL) {
+            cache.remove(key);
+
+            try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ))
{
+                assertNull(cache.get(key));
+
+                tx.commit();
+            }
+
+            checkMessages(spi, primarySpi);
+
+            try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ))
{
+                assertFalse(cache.containsKey(key));
+
+                tx.commit();
+            }
+
+            checkMessages(spi, primarySpi);
+
+            cache.put(key, 1);
+
+            try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ))
{
+                assertNotNull(cache.get(key));
+
+                tx.commit();
+            }
+
+            if (backup)
+                checkNoMessages(spi, primarySpi);
+            else
+                checkMessages(spi, primarySpi);
+
+            try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ))
{
+                assertTrue(cache.containsKey(key));
+
+                tx.commit();
+            }
+
+            if (backup)
+                checkNoMessages(spi, primarySpi);
+            else
+                checkMessages(spi, primarySpi);
+        }
+    }
+
+    /**
+     * @param spi Near node SPI.
+     * @param primarySpi Primary node SPI.
+     */
+    private void checkMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi)
{
+        List<Object> msgs = spi.recordedMessages();
+
+        assertEquals(1, msgs.size());
+        assertTrue(msgs.get(0) instanceof GridNearSingleGetRequest);
+
+        msgs = primarySpi.recordedMessages();
+
+        assertEquals(1, msgs.size());
+        assertTrue(msgs.get(0) instanceof GridNearSingleGetResponse);
+    }
+
+    /**
+     * @param spi Near node SPI.
+     * @param primarySpi Primary node SPI.
+     */
+    private void checkNoMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi)
{
+        List<Object> msgs = spi.recordedMessages();
+        assertEquals(0, msgs.size());
+
+        msgs = primarySpi.recordedMessages();
+        assertEquals(0, msgs.size());
+    }
+
+    /**
+     * @return Cache configurations to test.
+     */
+    private List<CacheConfiguration<Integer, Integer>> cacheConfigurations()
{
+        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+
+        ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, FULL_SYNC, 0));
+        ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, FULL_SYNC, 1));
+        ccfgs.add(cacheConfiguration(REPLICATED, TRANSACTIONAL, FULL_SYNC, 0));
+
+        ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, FULL_SYNC, 0));
+        ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, FULL_SYNC, 1));
+        ccfgs.add(cacheConfiguration(REPLICATED, ATOMIC, FULL_SYNC, 0));
+
+        return ccfgs;
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Cache atomicity mode.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(
+        CacheMode cacheMode,
+        CacheAtomicityMode atomicityMode,
+        CacheWriteSynchronizationMode syncMode,
+        int backups) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(syncMode);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        private Class<?> recordCls;
+
+        /** */
+        private List<Object> recordedMsgs = new ArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackC)
+            throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Object msg0 = ((GridIoMessage)msg).message();
+
+                synchronized (this) {
+                    if (recordCls != null && msg0.getClass().equals(recordCls))
+                        recordedMsgs.add(msg0);
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /**
+         * @param recordCls Message class to record.
+         */
+        void record(@Nullable Class<?> recordCls) {
+            synchronized (this) {
+                this.recordCls = recordCls;
+            }
+        }
+
+        /**
+         * @return Recorded messages.
+         */
+        List<Object> recordedMessages() {
+            synchronized (this) {
+                List<Object> msgs = recordedMsgs;
+
+                recordedMsgs = new ArrayList<>();
+
+                return msgs;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e32db9b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index b89bffd..fcc8d37 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHan
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -283,6 +284,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         suite.addTestSuite(CacheGetFutureHangsSelfTest.class);
 
+        suite.addTestSuite(IgniteCacheSingleGetMessageTest.class);
+
         return suite;
     }
 }
\ No newline at end of file


Mime
View raw message