Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DD031200C21 for ; Mon, 20 Feb 2017 12:33:46 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id DBA12160B62; Mon, 20 Feb 2017 11:33:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 68022160B79 for ; Mon, 20 Feb 2017 12:33:45 +0100 (CET) Received: (qmail 54496 invoked by uid 500); 20 Feb 2017 11:33:44 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 54431 invoked by uid 99); 20 Feb 2017 11:33:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Feb 2017 11:33:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27F47DFF0F; Mon, 20 Feb 2017 11:33:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 20 Feb 2017 11:33:48 -0000 Message-Id: In-Reply-To: <832cdf80fd2b4a86b019de0551793536@git.apache.org> References: <832cdf80fd2b4a86b019de0551793536@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] ignite git commit: ignite-4705 archived-at: Mon, 20 Feb 2017 11:33:47 -0000 ignite-4705 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b979880 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b979880 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b979880 Branch: refs/heads/ignite-4705 Commit: 7b979880cd1f987d9e707627339f3cb8890ceb30 Parents: 743f8b4 Author: sboikov Authored: Mon Feb 20 12:43:47 2017 +0300 Committer: sboikov Committed: Mon Feb 20 14:33:40 2017 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 2 + .../apache/ignite/internal/IgniteKernal.java | 1 + .../communication/GridIoMessageFactory.java | 6 + .../processors/cache/GridCacheIoManager.java | 3 - .../processors/cache/GridCacheMessage.java | 2 +- .../processors/cache/GridCacheMvccManager.java | 9 +- .../GridDhtAtomicAbstractUpdateFuture.java | 31 ++- .../dht/atomic/GridDhtAtomicCache.java | 76 ++++++-- .../GridDhtAtomicDeferredUpdateResponse.java | 6 + .../atomic/GridDhtAtomicSingleUpdateFuture.java | 3 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 3 +- .../GridNearAtomicAbstractUpdateFuture.java | 6 +- .../atomic/GridNearAtomicMappingResponse.java | 193 +++++++++++++++++++ .../GridNearAtomicSingleUpdateFuture.java | 34 +++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 15 +- 15 files changed, 352 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 6636bf2..50876dd 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; @@ -196,6 +197,7 @@ public class MessageCodeGenerator { gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class); gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class); gen.generateAndWrite(GridNearAtomicUpdateResponse.class); + gen.generateAndWrite(GridNearAtomicMappingResponse.class); // gen.generateAndWrite(GridMessageCollection.class); // gen.generateAndWrite(DataStreamerEntry.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 750c316..9f01615 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1151,6 +1151,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { "Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL + " ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL + " ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL + + " ^-- Futures [futs=" + ctx.cache().context().mvcc().atomicFuturesCount() + "]" + NL + " ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" + dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL + " ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" + http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 5ed46ff..908d1d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; @@ -175,6 +176,11 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -47: + msg = new GridNearAtomicMappingResponse(); + + break; + case -46: msg = new UpdateErrors(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 0f7371d..4d3ab1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -362,9 +362,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (depEnabled) cctx.deploy().ignoreOwnership(true); - if (!cacheMsg.partitionExchangeMessage()) - log.info("Cache message: " + cacheMsg); - unmarshall(nodeId, cacheMsg); if (cacheMsg.classError() != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 3ec5323..f3acf66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message { private static final long serialVersionUID = 0L; /** Maximum number of cache lookup indexes. */ - public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 6; + public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7; /** Cache message index field name. */ public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX"; http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index e8a5c8d..7f9f18c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -107,7 +107,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap>> mvccFuts = newMap(); /** Pending atomic futures. */ - private final ConcurrentMap> atomicFuts = new ConcurrentHashMap8<>(); + private final ConcurrentHashMap8> atomicFuts = new ConcurrentHashMap8<>(); /** Pending data streamer futures. */ private final GridConcurrentHashSet dataStreamerFuts = new GridConcurrentHashSet<>(); @@ -453,6 +453,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @return Collection of pending atomic futures. + */ + public int atomicFuturesCount() { + return atomicFuts.size(); + } + + /** * @return Collection of pending data streamer futures. */ public Collection dataStreamerFutures() { http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index da6616b..b512bdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -81,7 +82,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Completion callback. */ @GridToStringExclude - private final CI2 completionCb; + private final GridDhtAtomicCache.UpdateReplyClosure completionCb; /** Update request. */ protected final GridNearAtomicAbstractUpdateRequest updateReq; @@ -108,7 +109,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte */ protected GridDhtAtomicAbstractUpdateFuture( GridCacheContext cctx, - CI2 completionCb, + GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes) { @@ -367,9 +368,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte List dhtNodes = null; if (fullSync) { - dhtNodes = new ArrayList<>(mappings.size()); + if (!F.isEmpty(mappings)) { + dhtNodes = new ArrayList<>(mappings.size()); - dhtNodes.addAll(mappings.keySet()); + dhtNodes.addAll(mappings.keySet()); + } + else + dhtNodes = Collections.emptyList(); if (primaryReplyToNear) updateRes.mapping(dhtNodes); @@ -380,9 +385,25 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte if (primaryReplyToNear) completionCb.apply(updateReq, updateRes); + else { + if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR) { + GridNearAtomicMappingResponse mappingRes = new GridNearAtomicMappingResponse( + cctx.cacheId(), + updateReq.partition(), + updateReq.futureId(), + dhtNodes); + + try { + cctx.io().send(updateRes.nodeId(), mappingRes, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(msgLog, "Failed to send mapping response [futId=" + futId + + ", writeVer=" + writeVer + ", node=" + updateRes.nodeId() + ']'); + } + } + } } else { - // Reply. completionCb.apply(updateReq, updateRes); onDone(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d402c86..73a5acb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -35,6 +35,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; @@ -110,7 +111,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -137,9 +137,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT = Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500); + /** */ + static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR = IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR", false); + /** Update reply closure. */ @GridToStringExclude - private CI2 updateReplyClos; + private UpdateReplyClosure updateReplyClos; /** Pending */ private GridDeferredAckMessageSender deferredUpdateMsgSnd; @@ -208,7 +211,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Override protected void init() { super.init(); - updateReplyClos = new CI2() { + updateReplyClos = new UpdateReplyClosure() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { if (req.writeSynchronizationMode() != FULL_ASYNC) @@ -231,6 +234,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Override public void start() throws IgniteCheckedException { super.start(); + // TODO IGNITE-4705. + log.info("Atomic cache start [name=" + name() + + ", mode=" + configuration().getWriteSynchronizationMode() + + ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR + ']'); + deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) { @Override public int getTimeout() { return DEFERRED_UPDATE_RESPONSE_TIMEOUT; @@ -423,6 +431,19 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } }); + ctx.io().addHandler(ctx.cacheId(), + GridNearAtomicMappingResponse.class, + new CI2() { + @Override public void apply(UUID uuid, GridNearAtomicMappingResponse msg) { + processDhtAtomicNearMappingResponse(uuid, msg); + } + + @Override public String toString() { + return "GridNearAtomicMappingResponse handler " + + "[msgIdx=" + GridNearAtomicMappingResponse.CACHE_MSG_IDX + ']'; + } + }); + if (near == null) { ctx.io().addHandler( ctx.cacheId(), @@ -1697,10 +1718,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param req Update request. * @param completionCb Completion callback. */ - public void updateAllAsyncInternal( + void updateAllAsyncInternal( final UUID nodeId, final GridNearAtomicAbstractUpdateRequest req, - final CI2 completionCb + final UpdateReplyClosure completionCb ) { IgniteInternalFuture forceFut = preldr.request(req, req.topologyVersion()); @@ -1749,7 +1770,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { */ private void onForceKeysError(final UUID nodeId, final GridNearAtomicAbstractUpdateRequest req, - final CI2 completionCb, + final UpdateReplyClosure completionCb, IgniteCheckedException e ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), @@ -1772,7 +1793,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { private void updateAllAsyncInternal0( UUID nodeId, GridNearAtomicAbstractUpdateRequest req, - CI2 completionCb + UpdateReplyClosure completionCb ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(), ctx.deploymentEnabled()); @@ -1997,7 +2018,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { final List locked, final GridCacheVersion ver, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - final CI2 completionCb, + final GridDhtAtomicCache.UpdateReplyClosure completionCb, final boolean replicate, final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, @@ -2420,7 +2441,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { List locked, GridCacheVersion ver, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - CI2 completionCb, + GridDhtAtomicCache.UpdateReplyClosure completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -2651,7 +2672,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable final Collection rmvKeys, @Nullable final Map> entryProcessorMap, @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, - final CI2 completionCb, + final GridDhtAtomicCache.UpdateReplyClosure completionCb, final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final boolean replicate, @@ -3133,7 +3154,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes, - CI2 completionCb, + GridDhtAtomicCache.UpdateReplyClosure completionCb, boolean force ) { if (updateReq.size() == 1) @@ -3370,6 +3391,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } /** + * @param primaryId Primary node ID. * @param req Request. * @param nearRes Response to send. */ @@ -3424,6 +3446,31 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param nodeId Node ID. * @param res Response. */ + private void processDhtAtomicNearMappingResponse(UUID nodeId, GridNearAtomicMappingResponse res) { + GridNearAtomicAbstractUpdateFuture updateFut = + (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); + + if (updateFut != null) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Received near mapping response [futId=" + res.futureId() + + ", node=" + nodeId + ']'); + } + + updateFut.onMappingReceived(nodeId, res); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to find future for near mapping response [futId=" + res.futureId() + + ", node=" + nodeId + + ", res=" + res + ']'); + } + } + } + + /** + * @param nodeId Node ID. + * @param res Response. + */ private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) { GridNearAtomicAbstractUpdateFuture updateFut = (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); @@ -3663,4 +3710,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { return Collections.emptyList(); } } + + /** + * + */ + static interface UpdateReplyClosure extends CI2 { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index bd2bae0..671d516 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -147,4 +148,9 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem @Override public byte fieldsCount() { return 4; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAtomicDeferredUpdateResponse.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index 4ee90a0..1e63165 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -58,7 +57,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture */ GridDhtAtomicSingleUpdateFuture( GridCacheContext cctx, - CI2 completionCb, + GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 20c3d4f..dece1d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -61,7 +60,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { */ GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2 completionCb, + GridDhtAtomicCache.UpdateReplyClosure completionCb, GridCacheVersion writeVer, GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 82f171d..15075c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -197,7 +197,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** * Performs future mapping. */ - public void map() { + public final void map() { AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); if (topVer == null) @@ -257,7 +257,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, - new CI2() { + new GridDhtAtomicCache.UpdateReplyClosure() { @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { onResult(res.nodeId(), res, false); } @@ -300,6 +300,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt public abstract void onResult(UUID nodeId, GridDhtAtomicNearResponse res); + public abstract void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res); + /** * @param req Request. * @param e Error. http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java new file mode 100644 index 0000000..be1f0f5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class GridNearAtomicMappingResponse extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** */ + private int part; + + /** */ + @GridDirectCollection(UUID.class) + private List mapping; + + /** */ + private long futId; + + /** + * + */ + public GridNearAtomicMappingResponse() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param part Partition. + * @param futId Future ID. + * @param mapping Mapping. + */ + GridNearAtomicMappingResponse(int cacheId, int part, long futId, List mapping) { + assert part >= 0 : part; + + this.cacheId = cacheId; + this.part = part; + this.futId = futId; + this.mapping = mapping; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return part; + } + + /** + * @return Mapping. + */ + public List mapping() { + return mapping; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -47; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 6; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearAtomicMappingResponse.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearAtomicMappingResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 2016c98..8bfbe72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -217,6 +217,29 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (cctx.discovery().node(dhtNodeId) != null) mapping.add(dhtNodeId); } + + if (rcvd != null) + mapping.removeAll(rcvd); + } + + /** {@inheritDoc} */ + @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) { + GridCacheReturn opRes0 = null; + + synchronized (mux) { + if (futId == null || futId != res.futureId()) + return; + + if (mapping == null) { + initMapping(res.mapping()); + + if (mapping.isEmpty() && opRes != null) + opRes0 = opRes; + } + } + + if (opRes0 != null) + onDone(opRes0); } /** {@inheritDoc} */ @@ -251,11 +274,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (opRes == null && res.hasResult()) opRes = res.result(); - if (mapping.isEmpty() && opRes != null) { + if (mapping.isEmpty() && opRes != null) opRes0 = opRes; - - futId = null; - } } if (opRes0 != null) @@ -324,12 +344,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda else opRes = ret; - if (res.mapping() != null) { + if (res.mapping() != null) initMapping(res.mapping()); - - if (rcvd != null) - mapping.removeAll(rcvd); - } else mapping = Collections.emptySet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 7b573b1..e135000 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -256,6 +256,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ + @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) { throw new UnsupportedOperationException(); } @@ -591,8 +596,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (locUpdate != null) { cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, - new CI2() { - @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) { + new GridDhtAtomicCache.UpdateReplyClosure() { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { onResult(res.nodeId(), res, false); } }); @@ -615,6 +620,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu void map(AffinityTopologyVersion topVer, Long futId, @Nullable Collection remapKeys) { + if (true) { + onDone(new IgniteCheckedException("Failed")); + + return; + } + Collection topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) {