Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2F11318902 for ; Fri, 29 Apr 2016 10:55:41 +0000 (UTC) Received: (qmail 46371 invoked by uid 500); 29 Apr 2016 10:55:40 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 46274 invoked by uid 500); 29 Apr 2016 10:55:40 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 46251 invoked by uid 99); 29 Apr 2016 10:55:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2016 10:55:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50E9ADFB73; Fri, 29 Apr 2016 10:55:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Fri, 29 Apr 2016 10:55:42 -0000 Message-Id: <3fdb768924e14b24ac589a31029e7c12@git.apache.org> In-Reply-To: <0563feffb94248c89102d9340309ab79@git.apache.org> References: <0563feffb94248c89102d9340309ab79@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/20] ignite git commit: Refactoring: use abstract response instead of concrete. Refactoring: use abstract response instead of concrete. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/279ce729 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/279ce729 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/279ce729 Branch: refs/heads/ignite-2523-1 Commit: 279ce72971f8d8e3bb9a2d8691a480f20bc08541 Parents: 70d2dd7 Author: vozerov-gridgain Authored: Tue Apr 26 15:57:37 2016 +0300 Committer: vozerov-gridgain Committed: Tue Apr 26 15:57:37 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 5 +-- .../dht/atomic/GridDhtAtomicCache.java | 37 ++++++++++---------- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 ++--- .../GridNearAtomicAbstractUpdateRequest.java | 4 +-- .../GridNearAtomicSingleUpdateFuture.java | 13 +++---- .../GridNearAtomicSingleUpdateRequest.java | 6 ++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 20 ++++++----- .../dht/atomic/GridNearAtomicUpdateRequest.java | 6 ++-- .../distributed/near/GridNearAtomicCache.java | 4 +-- .../IgniteClientReconnectAbstractTest.java | 3 +- .../IgniteClientReconnectCacheTest.java | 31 ++++++++++++---- .../IgniteClientReconnectCollectionsTest.java | 3 +- 12 files changed, 82 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 82b7604..4113749 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; @@ -429,7 +430,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 40: { GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg; - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, req.futureVersion(), @@ -607,7 +608,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case -23: { GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg; - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), nodeId, req.futureVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 37a5f45..36cd098 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -139,7 +139,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { /** Update reply closure. */ @GridToStringExclude - private CI2 updateReplyClos; + private CI2 updateReplyClos; /** Pending */ private ConcurrentMap pendingResponses = new ConcurrentHashMap8<>(); @@ -192,9 +192,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } }); - updateReplyClos = new CI2() { + updateReplyClos = new CI2() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, + GridNearAtomicAbstractUpdateResponse res) { if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { assert req.writeSynchronizationMode() != FULL_ASYNC : req; @@ -1477,7 +1478,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { public void updateAllAsyncInternal( final UUID nodeId, final GridNearAtomicAbstractUpdateRequest req, - final CI2 completionCb + final CI2 completionCb ) { IgniteInternalFuture forceFut = preldr.request(req.keys(), req.topologyVersion()); @@ -1502,10 +1503,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { public void updateAllAsyncInternal0( UUID nodeId, GridNearAtomicAbstractUpdateRequest req, - CI2 completionCb + CI2 completionCb ) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), - ctx.deploymentEnabled()); + GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, + req.futureVersion(), ctx.deploymentEnabled()); assert !req.returnValue() || (req.operation() == TRANSFORM || req.keysCount() == 1); @@ -1726,11 +1727,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { final ClusterNode node, final boolean hasNear, final GridNearAtomicAbstractUpdateRequest req, - final GridNearAtomicUpdateResponse res, + final GridNearAtomicAbstractUpdateResponse res, final List locked, final GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - final CI2 completionCb, + final CI2 completionCb, final boolean replicate, final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, @@ -2145,11 +2146,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { ClusterNode node, boolean hasNear, GridNearAtomicAbstractUpdateRequest req, - GridNearAtomicUpdateResponse res, + GridNearAtomicAbstractUpdateResponse res, List locked, GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2 completionCb, + CI2 completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -2378,9 +2379,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable final Collection rmvKeys, @Nullable final Map> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture dhtFut, - final CI2 completionCb, + final CI2 completionCb, final GridNearAtomicAbstractUpdateRequest req, - final GridNearAtomicUpdateResponse res, + final GridNearAtomicAbstractUpdateResponse res, final boolean replicate, final UpdateBatchResult batchRes, final String taskName, @@ -2757,7 +2758,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @return {@code True} if filter evaluation succeeded. */ private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest req, - GridNearAtomicUpdateResponse res) { + GridNearAtomicAbstractUpdateResponse res) { try { return ctx.isAllLocked(entry, req.filter()); } @@ -2851,8 +2852,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable private GridDhtAtomicUpdateFuture createDhtFuture( GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes, - CI2 completionCb, + GridNearAtomicAbstractUpdateResponse updateRes, + CI2 completionCb, boolean force ) { if (!force) { @@ -2897,7 +2898,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param res Near atomic update response. */ @SuppressWarnings("unchecked") - private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { + private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicAbstractUpdateResponse res) { if (log.isDebugEnabled()) log.debug("Processing near atomic update response [nodeId=" + nodeId + ", res=" + res + ']'); @@ -3108,7 +3109,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param nodeId Originating node ID. * @param res Near update response. */ - private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) { + private void sendNearUpdateReply(UUID nodeId, GridNearAtomicAbstractUpdateResponse res) { try { ctx.io().send(nodeId, res, ctx.ioPolicy()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 1fcac71..82238e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -79,7 +79,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter /** Completion callback. */ @GridToStringExclude - private final CI2 completionCb; + private final CI2 completionCb; /** Mappings. */ @GridToStringInclude @@ -92,7 +92,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter private final GridNearAtomicAbstractUpdateRequest updateReq; /** Update response. */ - private final GridNearAtomicUpdateResponse updateRes; + private final GridNearAtomicAbstractUpdateResponse updateRes; /** Future keys. */ private final Collection keys; @@ -115,10 +115,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter */ public GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2 completionCb, + CI2 completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes + GridNearAtomicAbstractUpdateResponse updateRes ) { this.cctx = cctx; this.writeVer = writeVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index 6e868e4..7e3e2e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -215,12 +215,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa * @param res Response. * @return {@code True} if current response was {@code null}. */ - public abstract boolean onResponse(GridNearAtomicUpdateResponse res); + public abstract boolean onResponse(GridNearAtomicAbstractUpdateResponse res); /** * @return Response. */ - @Nullable public abstract GridNearAtomicUpdateResponse response(); + @Nullable public abstract GridNearAtomicAbstractUpdateResponse response(); /** * Cleanup values. http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 34399ac..064e067 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -146,7 +146,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - GridNearAtomicUpdateResponse res = null; + GridNearAtomicAbstractUpdateResponse res = null; synchronized (mux) { GridNearAtomicAbstractUpdateRequest req = this.req != null && this.req.nodeId().equals(nodeId) ? @@ -212,7 +212,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param nodeErr {@code True} if response was created on node failure. */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + public void onResult(UUID nodeId, GridNearAtomicAbstractUpdateResponse res, boolean nodeErr) { GridNearAtomicAbstractUpdateRequest req; AffinityTopologyVersion remapTopVer = null; @@ -389,7 +389,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param req Update request. * @param res Update response. */ - private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { + private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicAbstractUpdateResponse res) { assert nearEnabled; if (res.remapKeysCount() > 0 || !req.hasPrimary()) @@ -463,8 +463,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda private void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, - new CI2() { - @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { + new CI2() { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, + GridNearAtomicAbstractUpdateResponse res) { onResult(res.nodeId(), res, false); } }); @@ -491,7 +492,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda */ void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { synchronized (mux) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), req.nodeId(), req.futureVersion(), cctx.deploymentEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 13b990e..ef1f50a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -152,7 +152,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd /** */ @GridDirectTransient - private GridNearAtomicUpdateResponse res; + private GridNearAtomicAbstractUpdateResponse res; /** Target node ID. */ @GridDirectTransient @@ -427,7 +427,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd } /** {@inheritDoc} */ - @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { + @Override public boolean onResponse(GridNearAtomicAbstractUpdateResponse res) { if (this.res == null) { this.res = res; @@ -438,7 +438,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd } /** {@inheritDoc} */ - @Override @Nullable public GridNearAtomicUpdateResponse response() { + @Override @Nullable public GridNearAtomicAbstractUpdateResponse response() { return res; } http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index bad4647..07071f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -164,7 +164,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { - GridNearAtomicUpdateResponse res = null; + GridNearAtomicAbstractUpdateResponse res = null; synchronized (mux) { GridNearAtomicUpdateRequest req; @@ -258,7 +258,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param nodeErr {@code True} if response was created on node failure. */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + public void onResult(UUID nodeId, GridNearAtomicAbstractUpdateResponse res, boolean nodeErr) { GridNearAtomicUpdateRequest req; AffinityTopologyVersion remapTopVer = null; @@ -409,7 +409,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (rcvAll && nearEnabled) { if (mappings != null) { for (GridNearAtomicUpdateRequest req0 : mappings.values()) { - GridNearAtomicUpdateResponse res0 = req0.response(); + GridNearAtomicAbstractUpdateResponse res0 = req0.response(); assert res0 != null : req0; @@ -484,7 +484,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param req Update request. * @param res Update response. */ - private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicAbstractUpdateResponse res) { assert nearEnabled; if (res.remapKeysCount() > 0 || !req.hasPrimary()) @@ -558,8 +558,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, - new CI2() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + new CI2() { + @Override public void apply(GridNearAtomicUpdateRequest req, + GridNearAtomicAbstractUpdateResponse res) { onResult(res.nodeId(), res, false); } }); @@ -613,8 +614,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (locUpdate != null) { cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + new CI2() { + @Override public void apply(GridNearAtomicUpdateRequest req, + GridNearAtomicAbstractUpdateResponse res) { onResult(res.nodeId(), res, false); } }); @@ -630,7 +632,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu */ void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { synchronized (mux) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + GridNearAtomicAbstractUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), req.nodeId(), req.futureVersion(), cctx.deploymentEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index d458ac2..75c530d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -149,7 +149,7 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq /** */ @GridDirectTransient - private GridNearAtomicUpdateResponse res; + private GridNearAtomicAbstractUpdateResponse res; /** Target node ID. */ @GridDirectTransient @@ -506,7 +506,7 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq } /** {@inheritDoc} */ - @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { + @Override public boolean onResponse(GridNearAtomicAbstractUpdateResponse res) { if (this.res == null) { this.res = res; @@ -517,7 +517,7 @@ public class GridNearAtomicUpdateRequest extends GridNearAtomicAbstractUpdateReq } /** {@inheritDoc} */ - @Override @Nullable public GridNearAtomicUpdateResponse response() { + @Override @Nullable public GridNearAtomicAbstractUpdateResponse response() { return res; } http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index ac1ef70..995b948 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -127,7 +127,7 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { */ public void processNearAtomicUpdateResponse( GridNearAtomicAbstractUpdateRequest req, - GridNearAtomicUpdateResponse res + GridNearAtomicAbstractUpdateResponse res ) { if (res.failedKeysCount() == req.keysCount()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 4d49366..36e8aa2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -419,6 +419,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra private IgniteLogger log; /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) throws IgniteSpiException { Class msgCls0 = msgCls; @@ -427,7 +428,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra classes.put(((GridIoMessage)msg).message().getClass().getName(), node); if (msgCls0 != null && msg instanceof GridIoMessage - && ((GridIoMessage)msg).message().getClass().equals(msgCls)) { + && msgCls0.isAssignableFrom(((GridIoMessage)msg).message().getClass())) { log.info("Block message: " + msg); return; http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index ad6c46f..8451a55 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -805,14 +807,20 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac ccfg.setWriteSynchronizationMode(syncMode); if (syncMode != FULL_ASYNC) { - Class cls = (ccfg.getAtomicityMode() == ATOMIC) ? - GridNearAtomicUpdateResponse.class : GridNearTxPrepareResponse.class; - log.info("Test cache put [atomicity=" + atomicityMode + ", writeOrder=" + writeOrder + ", syncMode=" + syncMode + ']'); - checkOperationInProgressFails(client, ccfg, cls, putOp); + if (ccfg.getAtomicityMode() == ATOMIC) { + Collection clss = new HashSet<>(); + + clss.add(GridNearAtomicUpdateResponse.class); + // TODO: Add single. + + checkOperationInProgressFails(client, ccfg, clss, putOp); + } + else + checkOperationInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, putOp); client.destroyCache(ccfg.getName()); } @@ -1250,16 +1258,24 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac */ static class TestClass5 implements Serializable {} + private void checkOperationInProgressFails(IgniteEx client, + final CacheConfiguration ccfg, + Class msgToBlock, + final IgniteInClosure> c) + throws Exception { + checkOperationInProgressFails(client, ccfg, Collections.singleton(msgToBlock), c); + } + /** * @param client Client. * @param ccfg Cache configuration. - * @param msgToBlock Message to block. + * @param msgsToBlock Message to block. * @param c Cache operation closure. * @throws Exception If failed. */ private void checkOperationInProgressFails(IgniteEx client, final CacheConfiguration ccfg, - Class msgToBlock, + Collection msgsToBlock, final IgniteInClosure> c) throws Exception { @@ -1272,7 +1288,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac for (int i = 0; i < SRV_CNT; i++) { TestCommunicationSpi srvCommSpi = (TestCommunicationSpi)grid(i).configuration().getCommunicationSpi(); - srvCommSpi.blockMessages(msgToBlock, client.localNode().id()); + for (Class msgToBlock : msgsToBlock) + srvCommSpi.blockMessages(msgToBlock, client.localNode().id()); } IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { http://git-wip-us.apache.org/repos/asf/ignite/blob/279ce729/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index 8ee669c..e8b7d06 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteSet; import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.testframework.GridTestUtils; @@ -315,7 +316,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA BlockTcpCommunicationSpi commSpi = commSpi(srv); if (colCfg.getAtomicityMode() == ATOMIC) - commSpi.blockMessage(GridNearAtomicUpdateResponse.class); + commSpi.blockMessage(GridNearAtomicAbstractUpdateResponse.class); else commSpi.blockMessage(GridNearTxPrepareResponse.class);