Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 06145189B2 for ; Thu, 25 Feb 2016 12:31:18 +0000 (UTC) Received: (qmail 99493 invoked by uid 500); 25 Feb 2016 12:31:02 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 99312 invoked by uid 500); 25 Feb 2016 12:31:02 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 98158 invoked by uid 99); 25 Feb 2016 12:31:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 12:31:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE23BE8EFA; Thu, 25 Feb 2016 12:31:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 25 Feb 2016 12:31:22 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [23/51] [abbrv] ignite git commit: ignite-2523 : Generalized usage of GridNearAtomicUpdateRequest/Response. ignite-2523 : Generalized usage of GridNearAtomicUpdateRequest/Response. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb5bdb3f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb5bdb3f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb5bdb3f Branch: refs/heads/ignite-2523 Commit: cb5bdb3f19cefe14380ac169d49e6e86bde1899a Parents: 3c8d02a Author: Ilya Lantukh Authored: Mon Feb 8 18:51:55 2016 +0300 Committer: Ilya Lantukh Committed: Mon Feb 8 18:51:55 2016 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 45 +++++++++++--------- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 ++-- 2 files changed, 30 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cb5bdb3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index b0504db..05205e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; @@ -139,7 +140,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500); /** Update reply closure. */ - private CI2 updateReplyClos; + private CI2 updateReplyClos; /** Pending */ private ConcurrentMap pendingResponses = new ConcurrentHashMap8<>(); @@ -192,9 +193,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } }); - updateReplyClos = new CI2() { + updateReplyClos = new CI2() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicMultipleUpdateResponse res) { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { assert req.writeSynchronizationMode() != FULL_ASYNC : req; @@ -1311,7 +1312,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { public void updateAllAsyncInternal( final UUID nodeId, final GridNearAtomicUpdateRequest req, - final CI2 completionCb + final CI2 completionCb ) { IgniteInternalFuture forceFut = preldr.request(req.keys(), req.topologyVersion()); @@ -1336,10 +1337,16 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { public void updateAllAsyncInternal0( UUID nodeId, GridNearAtomicUpdateRequest req, - CI2 completionCb + CI2 completionCb ) { - GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), - ctx.deploymentEnabled()); + GridNearAtomicUpdateResponse res; + + if (req instanceof GridNearAtomicSingleUpdateRequest) + res = new GridNearAtomicSingleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), + ctx.deploymentEnabled()); + else + res = new GridNearAtomicMultipleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), + ctx.deploymentEnabled()); List keys = req.keys(); @@ -1424,7 +1431,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { UpdateBatchResult updRes = updateWithBatch(node, hasNear, req, - res, + (GridNearAtomicMultipleUpdateResponse) res, locked, ver, dhtFut, @@ -1563,7 +1570,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { List locked, GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2 completionCb, + CI2 completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -1797,7 +1804,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (intercept) { CacheObject old = entry.innerGet( - null, + null, /*read swap*/true, /*read through*/ctx.loadPreviousValue(), /*fail fast*/false, @@ -1812,7 +1819,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { req.keepBinary()); Object val = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(ctx, entry.key(), - old, req.keepBinary()), + old, req.keepBinary()), updated.value(ctx.cacheObjectContext(), false)); if (val == null) @@ -1975,11 +1982,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { ClusterNode node, boolean hasNear, GridNearAtomicUpdateRequest req, - GridNearAtomicMultipleUpdateResponse res, + GridNearAtomicUpdateResponse res, List locked, GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2 completionCb, + CI2 completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -2214,7 +2221,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable Collection rmvKeys, @Nullable Map> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2 completionCb, + CI2 completionCb, final GridNearAtomicUpdateRequest req, final GridNearAtomicMultipleUpdateResponse res, boolean replicate, @@ -2688,8 +2695,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable private GridDhtAtomicUpdateFuture createDhtFuture( GridCacheVersion writeVer, GridNearAtomicUpdateRequest updateReq, - GridNearAtomicMultipleUpdateResponse updateRes, - CI2 completionCb, + GridNearAtomicUpdateResponse updateRes, + CI2 completionCb, boolean force ) { if (!force) { @@ -2944,9 +2951,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param nodeId Originating node ID. * @param res Near update response. */ - private void sendNearUpdateReply(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) { + private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) { try { - ctx.io().send(nodeId, res, ctx.ioPolicy()); + ctx.io().send(nodeId, (GridCacheMessage) res, ctx.ioPolicy()); } catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send near update reply to node because it left grid: " + @@ -3188,7 +3195,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { respVers.add(ver); - if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) + if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) snd = true; } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/cb5bdb3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 68c639d..3a31700 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -77,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter implement /** Completion callback. */ @GridToStringExclude - private final CI2 completionCb; + private final CI2 completionCb; /** Mappings. */ @GridToStringInclude @@ -90,7 +90,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter implement private final GridNearAtomicUpdateRequest updateReq; /** Update response. */ - private final GridNearAtomicMultipleUpdateResponse updateRes; + private final GridNearAtomicUpdateResponse updateRes; /** Future keys. */ private final Collection keys; @@ -110,10 +110,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter implement */ public GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2 completionCb, + CI2 completionCb, GridCacheVersion writeVer, GridNearAtomicUpdateRequest updateReq, - GridNearAtomicMultipleUpdateResponse updateRes + GridNearAtomicUpdateResponse updateRes ) { this.cctx = cctx; this.writeVer = writeVer;