ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [03/20] ignite git commit: Refactoring: use abstract response instead of concrete.
Date Fri, 29 Apr 2016 10:55:42 GMT
Refactoring: use abstract response instead of concrete.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/279ce729
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/279ce729
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/279ce729

Branch: refs/heads/ignite-2523-1
Commit: 279ce72971f8d8e3bb9a2d8691a480f20bc08541
Parents: 70d2dd7
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Apr 26 15:57:37 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Apr 26 15:57:37 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  5 +--
 .../dht/atomic/GridDhtAtomicCache.java          | 37 ++++++++++----------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  8 ++---
 .../GridNearAtomicAbstractUpdateRequest.java    |  4 +--
 .../GridNearAtomicSingleUpdateFuture.java       | 13 +++----
 .../GridNearAtomicSingleUpdateRequest.java      |  6 ++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 20 ++++++-----
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  6 ++--
 .../distributed/near/GridNearAtomicCache.java   |  4 +--
 .../IgniteClientReconnectAbstractTest.java      |  3 +-
 .../IgniteClientReconnectCacheTest.java         | 31 ++++++++++++----
 .../IgniteClientReconnectCollectionsTest.java   |  3 +-
 12 files changed, 82 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 82b7604..4113749 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
@@ -429,7 +430,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
             case 40: {
                 GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg;
 
-                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
                     req.futureVersion(),
@@ -607,7 +608,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter
{
             case -23: {
                 GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
 
-                GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+                GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
                     req.futureVersion(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 37a5f45..36cd098 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
     /** Update reply closure. */
     @GridToStringExclude
-    private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
updateReplyClos;
+    private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
updateReplyClos;
 
     /** Pending  */
     private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +192,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             }
         });
 
-        updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>()
{
+        updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>()
{
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-            @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse
res) {
+            @Override public void apply(GridNearAtomicAbstractUpdateRequest req,
+                GridNearAtomicAbstractUpdateResponse res) {
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                     assert req.writeSynchronizationMode() != FULL_ASYNC : req;
 
@@ -1477,7 +1478,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     public void updateAllAsyncInternal(
         final UUID nodeId,
         final GridNearAtomicAbstractUpdateRequest req,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
@@ -1502,10 +1503,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     public void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicAbstractUpdateRequest req,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb
     ) {
-        GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId, req.futureVersion(),
-            ctx.deploymentEnabled());
+        GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
+            req.futureVersion(), ctx.deploymentEnabled());
 
         assert !req.returnValue() || (req.operation() == TRANSFORM || req.keysCount() ==
1);
 
@@ -1726,11 +1727,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         final ClusterNode node,
         final boolean hasNear,
         final GridNearAtomicAbstractUpdateRequest req,
-        final GridNearAtomicUpdateResponse res,
+        final GridNearAtomicAbstractUpdateResponse res,
         final List<GridDhtCacheEntry> locked,
         final GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb,
         final boolean replicate,
         final String taskName,
         @Nullable final IgniteCacheExpiryPolicy expiry,
@@ -2145,11 +2146,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         ClusterNode node,
         boolean hasNear,
         GridNearAtomicAbstractUpdateRequest req,
-        GridNearAtomicUpdateResponse res,
+        GridNearAtomicAbstractUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2378,9 +2379,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         @Nullable final Collection<KeyCacheObject> rmvKeys,
         @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>>
entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb,
+        final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb,
         final GridNearAtomicAbstractUpdateRequest req,
-        final GridNearAtomicUpdateResponse res,
+        final GridNearAtomicAbstractUpdateResponse res,
         final boolean replicate,
         final UpdateBatchResult batchRes,
         final String taskName,
@@ -2757,7 +2758,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @return {@code True} if filter evaluation succeeded.
      */
     private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest
req,
-        GridNearAtomicUpdateResponse res) {
+        GridNearAtomicAbstractUpdateResponse res) {
         try {
             return ctx.isAllLocked(entry, req.filter());
         }
@@ -2851,8 +2852,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
-        GridNearAtomicUpdateResponse updateRes,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        GridNearAtomicAbstractUpdateResponse updateRes,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb,
         boolean force
     ) {
         if (!force) {
@@ -2897,7 +2898,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param res Near atomic update response.
      */
     @SuppressWarnings("unchecked")
-    private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse
res) {
+    private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicAbstractUpdateResponse
res) {
         if (log.isDebugEnabled())
             log.debug("Processing near atomic update response [nodeId=" + nodeId + ", res="
+ res + ']');
 
@@ -3108,7 +3109,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param nodeId Originating node ID.
      * @param res Near update response.
      */
-    private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) {
+    private void sendNearUpdateReply(UUID nodeId, GridNearAtomicAbstractUpdateResponse res)
{
         try {
             ctx.io().send(nodeId, res, ctx.ioPolicy());
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 1fcac71..82238e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -79,7 +79,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Completion callback. */
     @GridToStringExclude
-    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>
completionCb;
+    private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb;
 
     /** Mappings. */
     @GridToStringInclude
@@ -92,7 +92,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     private final GridNearAtomicAbstractUpdateRequest updateReq;
 
     /** Update response. */
-    private final GridNearAtomicUpdateResponse updateRes;
+    private final GridNearAtomicAbstractUpdateResponse updateRes;
 
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
@@ -115,10 +115,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+        CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>
completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicAbstractUpdateRequest updateReq,
-        GridNearAtomicUpdateResponse updateRes
+        GridNearAtomicAbstractUpdateResponse updateRes
     ) {
         this.cctx = cctx;
         this.writeVer = writeVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 6e868e4..7e3e2e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -215,12 +215,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
      * @param res Response.
      * @return {@code True} if current response was {@code null}.
      */
-    public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+    public abstract boolean onResponse(GridNearAtomicAbstractUpdateResponse res);
 
     /**
      * @return Response.
      */
-    @Nullable public abstract GridNearAtomicUpdateResponse response();
+    @Nullable public abstract GridNearAtomicAbstractUpdateResponse response();
 
     /**
      * Cleanup values.

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 34399ac..064e067 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -146,7 +146,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        GridNearAtomicUpdateResponse res = null;
+        GridNearAtomicAbstractUpdateResponse res = null;
 
         synchronized (mux) {
             GridNearAtomicAbstractUpdateRequest req = this.req != null && this.req.nodeId().equals(nodeId)
?
@@ -212,7 +212,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param nodeErr {@code True} if response was created on node failure.
      */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr)
{
+    public void onResult(UUID nodeId, GridNearAtomicAbstractUpdateResponse res, boolean nodeErr)
{
         GridNearAtomicAbstractUpdateRequest req;
 
         AffinityTopologyVersion remapTopVer = null;
@@ -389,7 +389,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse
res) {
+    private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicAbstractUpdateResponse
res) {
         assert nearEnabled;
 
         if (res.remapKeysCount() > 0 || !req.hasPrimary())
@@ -463,8 +463,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     private void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>()
{
-                    @Override public void apply(GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res) {
+                new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicAbstractUpdateResponse>()
{
+                    @Override public void apply(GridNearAtomicAbstractUpdateRequest req,
+                        GridNearAtomicAbstractUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
                 });
@@ -491,7 +492,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
      */
     void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
         synchronized (mux) {
-            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                 req.nodeId(),
                 req.futureVersion(),
                 cctx.deploymentEnabled());

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 13b990e..ef1f50a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -152,7 +152,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
     /** */
     @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
+    private GridNearAtomicAbstractUpdateResponse res;
 
     /** Target node ID. */
     @GridDirectTransient
@@ -427,7 +427,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+    @Override public boolean onResponse(GridNearAtomicAbstractUpdateResponse res) {
         if (this.res == null) {
             this.res = res;
 
@@ -438,7 +438,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public GridNearAtomicUpdateResponse response() {
+    @Override @Nullable public GridNearAtomicAbstractUpdateResponse response() {
         return res;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index bad4647..07071f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -164,7 +164,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        GridNearAtomicUpdateResponse res = null;
+        GridNearAtomicAbstractUpdateResponse res = null;
 
         synchronized (mux) {
             GridNearAtomicUpdateRequest req;
@@ -258,7 +258,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @param nodeErr {@code True} if response was created on node failure.
      */
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
-    public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr)
{
+    public void onResult(UUID nodeId, GridNearAtomicAbstractUpdateResponse res, boolean nodeErr)
{
         GridNearAtomicUpdateRequest req;
 
         AffinityTopologyVersion remapTopVer = null;
@@ -409,7 +409,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
                 for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
-                    GridNearAtomicUpdateResponse res0 = req0.response();
+                    GridNearAtomicAbstractUpdateResponse res0 = req0.response();
 
                     assert res0 != null : req0;
 
@@ -484,7 +484,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      * @param req Update request.
      * @param res Update response.
      */
-    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse
res) {
+    private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicAbstractUpdateResponse
res) {
         assert nearEnabled;
 
         if (res.remapKeysCount() > 0 || !req.hasPrimary())
@@ -558,8 +558,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(nodeId, req,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>()
{
-                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse
res) {
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicAbstractUpdateResponse>()
{
+                    @Override public void apply(GridNearAtomicUpdateRequest req,
+                        GridNearAtomicAbstractUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
                 });
@@ -613,8 +614,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
 
         if (locUpdate != null) {
             cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>()
{
-                    @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse
res) {
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicAbstractUpdateResponse>()
{
+                    @Override public void apply(GridNearAtomicUpdateRequest req,
+                        GridNearAtomicAbstractUpdateResponse res) {
                         onResult(res.nodeId(), res, false);
                     }
                 });
@@ -630,7 +632,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
      */
     void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
         synchronized (mux) {
-            GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+            GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
                 req.nodeId(),
                 req.futureVersion(),
                 cctx.deploymentEnabled());

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index d458ac2..75c530d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -149,7 +149,7 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq
 
     /** */
     @GridDirectTransient
-    private GridNearAtomicUpdateResponse res;
+    private GridNearAtomicAbstractUpdateResponse res;
 
     /** Target node ID. */
     @GridDirectTransient
@@ -506,7 +506,7 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+    @Override public boolean onResponse(GridNearAtomicAbstractUpdateResponse res) {
         if (this.res == null) {
             this.res = res;
 
@@ -517,7 +517,7 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq
     }
 
     /** {@inheritDoc} */
-    @Override @Nullable public GridNearAtomicUpdateResponse response() {
+    @Override @Nullable public GridNearAtomicAbstractUpdateResponse response() {
         return res;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index ac1ef70..995b948 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateResponse;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -127,7 +127,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K,
V> {
      */
     public void processNearAtomicUpdateResponse(
         GridNearAtomicAbstractUpdateRequest req,
-        GridNearAtomicUpdateResponse res
+        GridNearAtomicAbstractUpdateResponse res
     ) {
         if (res.failedKeysCount() == req.keysCount())
             return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 4d49366..36e8aa2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -419,6 +419,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
         private IgniteLogger log;
 
         /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
         @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException>
ackC)
             throws IgniteSpiException {
             Class msgCls0 = msgCls;
@@ -427,7 +428,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
                 classes.put(((GridIoMessage)msg).message().getClass().getName(), node);
 
             if (msgCls0 != null && msg instanceof GridIoMessage
-                && ((GridIoMessage)msg).message().getClass().equals(msgCls)) {
+                && msgCls0.isAssignableFrom(((GridIoMessage)msg).message().getClass()))
{
                 log.info("Block message: " + msg);
 
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index ad6c46f..8451a55 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -805,14 +807,20 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
                     ccfg.setWriteSynchronizationMode(syncMode);
 
                     if (syncMode != FULL_ASYNC) {
-                        Class<?> cls = (ccfg.getAtomicityMode() == ATOMIC) ?
-                            GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class;
-
                         log.info("Test cache put [atomicity=" + atomicityMode +
                             ", writeOrder=" + writeOrder +
                             ", syncMode=" + syncMode + ']');
 
-                        checkOperationInProgressFails(client, ccfg, cls, putOp);
+                        if (ccfg.getAtomicityMode() == ATOMIC) {
+                            Collection<Class> clss = new HashSet<>();
+
+                            clss.add(GridNearAtomicUpdateResponse.class);
+                            // TODO: Add single.
+
+                            checkOperationInProgressFails(client, ccfg, clss, putOp);
+                        }
+                        else
+                            checkOperationInProgressFails(client, ccfg, GridNearTxPrepareResponse.class,
putOp);
 
                         client.destroyCache(ccfg.getName());
                     }
@@ -1250,16 +1258,24 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
      */
     static class TestClass5 implements Serializable {}
 
+    private void checkOperationInProgressFails(IgniteEx client,
+        final CacheConfiguration<Object, Object> ccfg,
+        Class msgToBlock,
+        final IgniteInClosure<IgniteCache<Object, Object>> c)
+        throws Exception {
+        checkOperationInProgressFails(client, ccfg, Collections.singleton(msgToBlock), c);
+    }
+
     /**
      * @param client Client.
      * @param ccfg Cache configuration.
-     * @param msgToBlock Message to block.
+     * @param msgsToBlock Message to block.
      * @param c Cache operation closure.
      * @throws Exception If failed.
      */
     private void checkOperationInProgressFails(IgniteEx client,
         final CacheConfiguration<Object, Object> ccfg,
-        Class<?> msgToBlock,
+        Collection<Class> msgsToBlock,
         final IgniteInClosure<IgniteCache<Object, Object>> c)
         throws Exception
     {
@@ -1272,7 +1288,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
         for (int i = 0; i < SRV_CNT; i++) {
             TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi();
 
-            srvCommSpi.blockMessages(msgToBlock, client.localNode().id());
+            for (Class msgToBlock : msgsToBlock)
+                srvCommSpi.blockMessages(msgToBlock, client.localNode().id());
         }
 
         IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>()
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index 8ee669c..e8b7d06 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteQueue;
 import org.apache.ignite.IgniteSet;
 import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -315,7 +316,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
         BlockTcpCommunicationSpi commSpi = commSpi(srv);
 
         if (colCfg.getAtomicityMode() == ATOMIC)
-            commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
+            commSpi.blockMessage(GridNearAtomicAbstractUpdateResponse.class);
         else
             commSpi.blockMessage(GridNearTxPrepareResponse.class);
 


Mime
View raw message