Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 47BD418F7F for ; Fri, 20 Nov 2015 13:33:31 +0000 (UTC) Received: (qmail 4952 invoked by uid 500); 20 Nov 2015 13:33:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 4870 invoked by uid 500); 20 Nov 2015 13:33:31 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 4004 invoked by uid 99); 20 Nov 2015 13:33:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2015 13:33:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B27A9E0B28; Fri, 20 Nov 2015 13:33:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 20 Nov 2015 13:33:42 -0000 Message-Id: <93305bea98f44fb3b2b4981dd2fb96b9@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [20/24] ignite git commit: IGNITE-426 Implemented failover for Continuous query. IGNITE-426 Implemented failover for Continuous query. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce636372 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce636372 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce636372 Branch: refs/heads/ignite-single-op-get Commit: ce6363729f3553c6854ed2e8cfc3b5c244678fcd Parents: 8728a5b Author: nikolay_tikhonov Authored: Thu Nov 19 19:50:58 2015 +0300 Committer: nikolay_tikhonov Committed: Thu Nov 19 19:50:58 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 22 +- .../internal/GridMessageListenHandler.java | 18 + .../communication/GridIoMessageFactory.java | 8 +- .../processors/cache/GridCacheEntryEx.java | 12 +- .../processors/cache/GridCacheMapEntry.java | 147 +- .../GridCachePartitionExchangeManager.java | 4 +- .../cache/GridCacheUpdateAtomicResult.java | 15 +- .../cache/GridCacheUpdateTxResult.java | 24 +- .../GridDistributedTxRemoteAdapter.java | 22 +- .../dht/GridClientPartitionTopology.java | 38 +- .../distributed/dht/GridDhtLocalPartition.java | 35 + .../dht/GridDhtPartitionTopology.java | 26 +- .../dht/GridDhtPartitionTopologyImpl.java | 112 +- .../distributed/dht/GridDhtTxFinishFuture.java | 14 +- .../distributed/dht/GridDhtTxFinishRequest.java | 112 +- .../dht/atomic/GridDhtAtomicCache.java | 79 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 75 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 140 +- .../GridDhtPartitionsExchangeFuture.java | 35 +- .../preloader/GridDhtPartitionsFullMessage.java | 64 +- .../GridDhtPartitionsSingleMessage.java | 56 +- .../distributed/near/GridNearAtomicCache.java | 10 +- .../distributed/near/GridNearTxRemote.java | 7 + .../CacheContinuousQueryBatchAck.java | 163 ++ .../continuous/CacheContinuousQueryEntry.java | 196 +- .../continuous/CacheContinuousQueryEvent.java | 3 +- .../continuous/CacheContinuousQueryHandler.java | 811 ++++++- .../CacheContinuousQueryListener.java | 35 + .../continuous/CacheContinuousQueryManager.java | 151 +- .../cache/transactions/IgniteTxEntry.java | 20 + .../cache/transactions/IgniteTxHandler.java | 3 + .../transactions/IgniteTxLocalAdapter.java | 18 +- .../cache/transactions/IgniteTxRemoteEx.java | 7 +- .../continuous/GridContinuousBatch.java | 44 + .../continuous/GridContinuousBatchAdapter.java | 46 + .../continuous/GridContinuousHandler.java | 22 + .../continuous/GridContinuousProcessor.java | 221 +- .../StartRoutineAckDiscoveryMessage.java | 14 +- .../StartRoutineDiscoveryMessage.java | 21 +- .../processors/cache/GridCacheTestEntryEx.java | 10 +- ...ContinuousQueryFailoverAbstractSelfTest.java | 2235 ++++++++++++++++++ ...ryFailoverAtomicNearEnabledSelfSelfTest.java | 46 + ...FailoverAtomicPrimaryWriteOrderSelfTest.java | 44 + ...usQueryFailoverAtomicReplicatedSelfTest.java | 40 + ...inuousQueryFailoverTxReplicatedSelfTest.java | 32 + .../CacheContinuousQueryFailoverTxSelfTest.java | 39 + ...ridCacheContinuousQueryAbstractSelfTest.java | 153 +- .../GridCacheContinuousQueryTxSelfTest.java | 49 + ...CacheContinuousQueryClientReconnectTest.java | 187 ++ .../IgniteCacheContinuousQueryClientTest.java | 157 +- ...cheContinuousQueryClientTxReconnectTest.java | 32 + .../p2p/GridP2PSameClassLoaderSelfTest.java | 16 +- .../testframework/junits/GridAbstractTest.java | 2 +- .../junits/common/GridCommonAbstractTest.java | 3 + .../IgniteCacheQuerySelfTestSuite.java | 16 +- .../yardstick/cache/CacheEntryEventProbe.java | 156 ++ 56 files changed, 5753 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index b4ce4ab..3918976 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -23,6 +23,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; import java.util.LinkedList; +import java.util.Map; import java.util.Queue; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -38,6 +39,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.util.typedef.F; @@ -127,6 +130,11 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void updateCounters(Map cntrs) { + // No-op. + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; @@ -213,8 +221,8 @@ class GridEventConsumeHandler implements GridContinuousHandler { } } - ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false, - false); + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, + false, false); } catch (ClusterTopologyCheckedException ignored) { // No-op. @@ -377,6 +385,16 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index ff38949..aa837b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -22,10 +22,13 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; import org.apache.ignite.internal.util.typedef.internal.S; @@ -100,6 +103,11 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void updateCounters(Map cntrs) { + // No-op. + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { ctx.io().addUserMessageListener(topic, pred); @@ -167,6 +175,16 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3548aac..c671582 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -93,6 +93,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -708,7 +709,12 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..114] - this + case 118: + msg = new CacheContinuousQueryBatchAck(); + + break; + + // [-3..118] - this // [120..123] - DR // [-4..-22] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index af62e39..8d50616 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -360,6 +360,7 @@ public interface GridCacheEntryEx { * @param subjId Subject ID initiated this update. * @param taskName Task name. * @param dhtVer Dht version for near cache entry. + * @param updateCntr Update counter. * @return Tuple containing success flag and old value. If success is {@code false}, * then value is {@code null}. * @throws IgniteCheckedException If storing value failed. @@ -382,7 +383,8 @@ public interface GridCacheEntryEx { @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -417,7 +419,8 @@ public interface GridCacheEntryEx { @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -446,6 +449,7 @@ public interface GridCacheEntryEx { * @param intercept If {@code true} then calls cache interceptor. * @param subjId Subject ID initiated this update. * @param taskName Task name. + * @param updateCntr Update counter. * @return Tuple where first value is flag showing whether operation succeeded, * second value is old entry value if return value is requested, third is updated entry value, * fourth is the version to enqueue for deferred delete the fifth is DR conflict context @@ -478,7 +482,9 @@ public interface GridCacheEntryEx { boolean conflictResolve, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + @Nullable CacheObject prevVal, + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 0786a50..8d363ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; @@ -1060,7 +1061,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException { CacheObject old; @@ -1077,6 +1079,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object key0 = null; Object val0 = null; + long updateCntr0; + synchronized (this) { checkObsolete(); @@ -1155,6 +1159,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme deletedUnlocked(false); } + updateCntr0 = nextPartCounter(topVer); + + if (updateCntr != null && updateCntr != 0) + updateCntr0 = updateCntr; + update(val, expireTime, ttl, newVer); drReplicate(drType, val, newVer); @@ -1180,8 +1189,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, null, taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (cctx.isLocal() || cctx.isReplicated() || + (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()))) + cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(), + partition(), tx.local(), false, updateCntr0, topVer); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1197,7 +1208,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (intercept) cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, val, val0)); - return valid ? new GridCacheUpdateTxResult(true, retval ? old : null) : + return valid ? new GridCacheUpdateTxResult(true, retval ? old : null, updateCntr0) : new GridCacheUpdateTxResult(false, null); } @@ -1223,7 +1234,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable GridCacheVersion explicitVer, @Nullable UUID subjId, String taskName, - @Nullable GridCacheVersion dhtVer + @Nullable GridCacheVersion dhtVer, + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert cctx.transactional(); @@ -1245,6 +1257,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Cache.Entry entry0 = null; + Long updateCntr0; + boolean deferred; boolean marked = false; @@ -1261,7 +1275,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : - "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; + "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; boolean startVer = isStartVersion(); @@ -1318,6 +1332,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } + updateCntr0 = nextPartCounter(topVer); + + if (updateCntr != null && updateCntr != 0) + updateCntr0 = updateCntr; + drReplicate(drType, null, newVer); if (metrics && cctx.cache().configuration().isStatisticsEnabled()) @@ -1350,8 +1369,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, old, false); + if (cctx.isLocal() || cctx.isReplicated() || + (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()))) + cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal() + || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer); cctx.dataStructures().onEntryUpdated(key, true); @@ -1394,7 +1415,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else ret = old; - return new GridCacheUpdateTxResult(true, ret); + return new GridCacheUpdateTxResult(true, ret, updateCntr0); } else return new GridCacheUpdateTxResult(false, null); @@ -1686,7 +1707,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (!isNear()) { + long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE); + + cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(), + partition(), true, false, updateCntr, AffinityTopologyVersion.NONE); + } cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -1729,7 +1755,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean conflictResolve, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + @Nullable CacheObject prevVal, + @Nullable Long updateCntr ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -1755,6 +1783,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object key0 = null; Object updated0 = null; + Long updateCntr0 = null; + synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1862,7 +1892,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateCntr0 == null ? 0 : updateCntr0); } // Will update something. else { @@ -1911,6 +1942,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme "[entry=" + this + ", newVer=" + newVer + ']'); } + if (!cctx.isNear()) { + CacheObject evtVal; + + if (op == GridCacheOperation.TRANSFORM) { + EntryProcessor entryProcessor = + (EntryProcessor)writeObj; + + CacheInvokeEntry entry = + new CacheInvokeEntry<>(cctx, key, prevVal, version()); + + try { + entryProcessor.process(entry, invokeArgs); + + evtVal = entry.modified() ? + cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; + } + catch (Exception e) { + evtVal = prevVal; + } + } + else + evtVal = (CacheObject)writeObj; + + updateCntr0 = nextPartCounter(topVer); + + if (updateCntr != null) + updateCntr0 = updateCntr; + + cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal() + || !context().userCache(), partition(), primary, false, updateCntr0, topVer); + } + return new GridCacheUpdateAtomicResult(false, retval ? rawGetOrUnmarshalUnlocked(false) : null, null, @@ -1919,7 +1982,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateCntr0 == null ? 0 : updateCntr0); } } else @@ -1995,7 +2059,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateCntr0 == null ? 0 : updateCntr0); } } @@ -2042,7 +2107,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateCntr0 == null ? 0 : updateCntr); } } else @@ -2142,7 +2208,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateCntr0 == null ? 0 : updateCntr0); else if (interceptorVal != updated0) { updated0 = cctx.unwrapTemporary(interceptorVal); @@ -2179,6 +2246,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme update(updated, newExpireTime, newTtl, newVer); + updateCntr0 = nextPartCounter(topVer); + + if (updateCntr != null) + updateCntr0 = updateCntr; + drReplicate(drType, updated, newVer); recordNodeId(affNodeId, topVer); @@ -2218,7 +2290,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateCntr0 == null ? 0 : updateCntr0); } if (writeThrough) @@ -2270,6 +2343,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme recordNodeId(affNodeId, topVer); + updateCntr0 = nextPartCounter(topVer); + + if (updateCntr != null) + updateCntr0 = updateCntr; + drReplicate(drType, null, newVer); if (evt) { @@ -2299,9 +2377,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false); - cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); if (intercept) { @@ -2326,7 +2401,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme newSysExpireTime, enqueueVer, conflictCtx, - true); + true, + updateCntr0); } /** @@ -3146,11 +3222,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme else if (deletedUnlocked()) deletedUnlocked(false); + long updateCntr = 0; + + if (!preload) + updateCntr = nextPartCounter(topVer); + drReplicate(drType, val, ver); if (!skipQryNtf) { - if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) - cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload); + cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal() + || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer); cctx.dataStructures().onEntryUpdated(key, false); } @@ -3167,6 +3248,26 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } + /** + * @param topVer Topology version. + * @return Update counter. + */ + private long nextPartCounter(AffinityTopologyVersion topVer) { + long updateCntr; + + if (!cctx.isLocal() && !isNear()) { + GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false); + + assert locPart != null; + + updateCntr = locPart.nextUpdateCounter(); + } + else + updateCntr = 0; + + return updateCntr; + } + /** {@inheritDoc} */ @Override public synchronized boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped) throws IgniteCheckedException, http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e19b310..cd89416 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -989,7 +989,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue()) != null; + updated |= top.update(null, entry.getValue(), null) != null; } if (!cctx.kernalContext().clientNode() && updated) @@ -1032,7 +1032,7 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue()) != null; + updated |= top.update(null, entry.getValue(), null) != null; } if (updated) http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 3674284..9df476e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult { /** Whether update should be propagated to DHT node. */ private final boolean sndToDht; + /** */ + private final Long updateCntr; + /** Value computed by entry processor. */ private IgniteBiTuple res; @@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult { * @param rmvVer Version for deferred delete. * @param conflictRes DR resolution result. * @param sndToDht Whether update should be propagated to DHT node. + * @param updateCntr Partition update counter. */ public GridCacheUpdateAtomicResult(boolean success, @Nullable CacheObject oldVal, @@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult { long conflictExpireTime, @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext conflictRes, - boolean sndToDht) { + boolean sndToDht, + long updateCntr) { this.success = success; this.oldVal = oldVal; this.newVal = newVal; @@ -91,6 +96,7 @@ public class GridCacheUpdateAtomicResult { this.rmvVer = rmvVer; this.conflictRes = conflictRes; this.sndToDht = sndToDht; + this.updateCntr = updateCntr; } /** @@ -129,6 +135,13 @@ public class GridCacheUpdateAtomicResult { } /** + * @return Partition update index. + */ + public Long updateCounter() { + return updateCntr; + } + + /** * @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time * value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java index ffda7a2..461baa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateTxResult.java @@ -32,6 +32,9 @@ public class GridCacheUpdateTxResult { @GridToStringInclude private final CacheObject oldVal; + /** Partition idx. */ + private long updateCntr; + /** * Constructor. * @@ -44,6 +47,25 @@ public class GridCacheUpdateTxResult { } /** + * Constructor. + * + * @param success Success flag. + * @param oldVal Old value (if any), + */ + GridCacheUpdateTxResult(boolean success, @Nullable CacheObject oldVal, long updateCntr) { + this.success = success; + this.oldVal = oldVal; + this.updateCntr = updateCntr; + } + + /** + * @return Partition idx. + */ + public long updatePartitionCounter() { + return updateCntr; + } + + /** * @return Success flag. */ public boolean success() { @@ -61,4 +83,4 @@ public class GridCacheUpdateTxResult { @Override public String toString() { return S.toString(GridCacheUpdateTxResult.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 0d49584..f9ac2df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -265,6 +265,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } + /** {@inheritDoc} */ + @Override public void setPartitionUpdateCounters(long[] cntrs) { + if (writeMap() != null && !writeMap().isEmpty() && cntrs != null && cntrs.length > 0) { + int i = 0; + + for (IgniteTxEntry txEntry : writeMap().values()) { + txEntry.updateCounter(cntrs[i]); + + ++i; + } + } + } + /** * Adds completed versions to an entry. * @@ -529,7 +542,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + txEntry.updateCounter()); else { cached.innerSet(this, eventNodeId(), @@ -547,7 +561,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + txEntry.updateCounter()); // Keep near entry up to date. if (nearCached != null) { @@ -575,7 +590,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName(), - dhtVer); + dhtVer, + txEntry.updateCounter()); // Keep near entry up to date. if (nearCached != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 162c116..b7169bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -94,6 +94,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** Lock. */ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** Partition update counters. */ + private Map cntrMap = new HashMap<>(); + /** * @param cctx Context. * @param cacheId Cache ID. @@ -527,7 +530,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionFullMap partMap) { + GridDhtPartitionFullMap partMap, + Map cntrMap) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@ -602,6 +606,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { part2node = p2n; + if (cntrMap != null) + this.cntrMap = new HashMap<>(cntrMap); + consistencyCheck(); if (log.isDebugEnabled()) @@ -617,7 +624,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts) { + GridDhtPartitionMap parts, + Map cntrMap) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -698,6 +706,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } } + if (cntrMap != null) { + for (Map.Entry e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + } + consistencyCheck(); if (log.isDebugEnabled()) @@ -852,6 +869,23 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public Map updateCounters() { + lock.readLock().lock(); + + try { + return new HashMap<>(cntrMap); + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + return false; + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 1516ee4..63e2899 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicStampedReference; import java.util.concurrent.locks.ReentrantLock; @@ -114,6 +115,9 @@ public class GridDhtLocalPartition implements Comparable, /** Group reservations. */ private final CopyOnWriteArrayList reservations = new CopyOnWriteArrayList<>(); + /** Update counter. */ + private final AtomicLong cntr = new AtomicLong(); + /** * @param cctx Context. * @param id Partition ID. @@ -532,6 +536,8 @@ public class GridDhtLocalPartition implements Comparable, if (cctx.isDrEnabled()) cctx.dr().partitionEvicted(id); + cctx.continuousQueries().onPartitionEvicted(id); + cctx.dataStructures().onPartitionEvicted(id); rent.onDone(); @@ -599,6 +605,35 @@ public class GridDhtLocalPartition implements Comparable, } /** + * @return Next update index. + */ + public long nextUpdateCounter() { + return cntr.incrementAndGet(); + } + + /** + * @return Current update index. + */ + public long updateCounter() { + return cntr.get(); + } + + /** + * @param val Update index value. + */ + public void updateCounter(long val) { + while (true) { + long val0 = cntr.get(); + + if (val0 >= val) + break; + + if (cntr.compareAndSet(val0, val)) + break; + } + } + + /** * Clears values for this partition. */ private void clearAll() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index d642314..3ac2b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; @@ -51,6 +52,8 @@ public interface GridDhtPartitionTopology { * * @param exchId Exchange ID. * @param exchFut Exchange future. + * @param updateSeq Update sequence. + * @param stopping Stopping flag. * @throws IgniteInterruptedCheckedException If interrupted. */ public void updateTopologyVersion( @@ -193,17 +196,27 @@ public interface GridDhtPartitionTopology { /** * @param exchId Exchange ID. * @param partMap Update partition map. + * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ - public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap); + public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, + GridDhtPartitionFullMap partMap, + @Nullable Map cntrMap); /** * @param exchId Exchange ID. * @param parts Partitions. + * @param cntrMap Partition update counters. * @return Local partition map if there were evictions or {@code null} otherwise. */ @Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts); + GridDhtPartitionMap parts, + @Nullable Map cntrMap); + + /** + * @return Partition update counters. + */ + public Map updateCounters(); /** * @param part Partition to own. @@ -213,6 +226,7 @@ public interface GridDhtPartitionTopology { /** * @param part Evicted partition. + * @param updateSeq Update sequence increment flag. */ public void onEvicted(GridDhtLocalPartition part, boolean updateSeq); @@ -228,4 +242,10 @@ public interface GridDhtPartitionTopology { * @param threshold Threshold for number of entries. */ public void printMemoryStats(int threshold); -} \ No newline at end of file + + /** + * @param topVer Topology version. + * @return {@code True} if rebalance process finished. + */ + public boolean rebalanceFinished(AffinityTopologyVersion topVer); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 6bd283a..39c55db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -102,6 +102,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** Lock. */ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + /** Partition update counter. */ + private Map cntrMap = new HashMap<>(); + + /** */ + private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; + /** * @param cctx Context. */ @@ -131,6 +137,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { topReadyFut = null; topVer = AffinityTopologyVersion.NONE; + + rebalancedTopVer = AffinityTopologyVersion.NONE; } finally { lock.writeLock().unlock(); @@ -220,6 +228,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { updateSeq.setIfGreater(updSeq); topReadyFut = exchFut; + + rebalancedTopVer = AffinityTopologyVersion.NONE; } finally { lock.writeLock().unlock(); @@ -292,6 +302,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); + cntrMap.clear(); + // If this is the oldest node. if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) { if (node2part == null) { @@ -525,6 +537,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + updateRebalanceVersion(); + consistencyCheck(); } finally { @@ -732,7 +746,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param states Additional partition states. * @return List of nodes for the partition. */ - private List nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { + private List nodes(int p, + AffinityTopologyVersion topVer, + GridDhtPartitionState state, + GridDhtPartitionState... states) { Collection allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; lock.readLock().lock(); @@ -831,7 +848,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionFullMap partMap) { + GridDhtPartitionFullMap partMap, + @Nullable Map cntrMap) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@ -843,6 +861,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; + if (cntrMap != null) { + for (Map.Entry e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr = cntrMap.get(part.id()); + + if (cntr != null) + part.updateCounter(cntr); + } + } + if (exchId != null && lastExchangeId != null && lastExchangeId.compareTo(exchId) >= 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" + @@ -913,6 +947,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean changed = checkEvictions(updateSeq); + updateRebalanceVersion(); + consistencyCheck(); if (log.isDebugEnabled()) @@ -928,7 +964,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts) { + GridDhtPartitionMap parts, @Nullable Map cntrMap) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@ -946,6 +982,22 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return null; + if (cntrMap != null) { + for (Map.Entry e : cntrMap.entrySet()) { + Long cntr = this.cntrMap.get(e.getKey()); + + if (cntr == null || cntr < e.getValue()) + this.cntrMap.put(e.getKey(), e.getValue()); + } + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr = cntrMap.get(part.id()); + + if (cntr != null) + part.updateCounter(cntr); + } + } + if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) { if (log.isDebugEnabled()) log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" + @@ -1008,6 +1060,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { changed |= checkEvictions(updateSeq); + updateRebalanceVersion(); + consistencyCheck(); if (log.isDebugEnabled()) @@ -1254,6 +1308,33 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public Map updateCounters() { + lock.readLock().lock(); + + try { + Map res = new HashMap<>(cntrMap); + + for (GridDhtLocalPartition part : locParts.values()) { + Long cntr0 = res.get(part.id()); + Long cntr1 = part.updateCounter(); + + if (cntr0 == null || cntr1 > cntr0) + res.put(part.id(), cntr1); + } + + return res; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { + return topVer.equals(rebalancedTopVer); + } + + /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']'); @@ -1266,6 +1347,31 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * + */ + private void updateRebalanceVersion() { + if (!rebalancedTopVer.equals(topVer)) { + for (int i = 0; i < cctx.affinity().partitions(); i++) { + List affNodes = cctx.affinity().nodes(i, topVer); + + // Topology doesn't contain server nodes (just clients). + if (affNodes.isEmpty() || (node2part != null && !node2part.valid())) + continue; + + List owners = owners(i); + + if (affNodes.size() != owners.size() || !owners.containsAll(affNodes)) + return; + } + + rebalancedTopVer = topVer; + + if (log.isDebugEnabled()) + log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']'); + } + } + + /** * @param p Partition. * @param nodeId Node ID. * @param match State to match. http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index bb370a5..e8ef5d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.util.ArrayList; import java.util.Collection; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -31,7 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; @@ -42,9 +42,7 @@ import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; import static org.apache.ignite.transactions.TransactionState.COMMITTING; @@ -356,6 +354,11 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur add(fut); // Append new future. + Collection updCntrs = new ArrayList<>(dhtMapping.entries().size()); + + for (IgniteTxEntry e : dhtMapping.entries()) + updCntrs.add(e.updateCounter()); + GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( tx.nearNodeId(), futId, @@ -379,7 +382,8 @@ public final class GridDhtTxFinishFuture extends GridCompoundIdentityFutur tx.size(), tx.subjectId(), tx.taskNameHash(), - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + updCntrs); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index caa0aa5..65f1cb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -66,6 +67,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** Check comitted flag. */ private boolean checkCommitted; + /** Partition update counter. */ + @GridToStringInclude + @GridDirectCollection(Long.class) + private GridLongList partUpdateCnt; + /** One phase commit write version. */ private GridCacheVersion writeVer; @@ -163,6 +169,76 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { } /** + * @param nearNodeId Near node ID. + * @param futId Future ID. + * @param miniId Mini future ID. + * @param topVer Topology version. + * @param xidVer Transaction ID. + * @param threadId Thread ID. + * @param commitVer Commit version. + * @param isolation Transaction isolation. + * @param commit Commit flag. + * @param invalidate Invalidate flag. + * @param sys System flag. + * @param sysInvalidate System invalidation flag. + * @param syncCommit Synchronous commit flag. + * @param syncRollback Synchronous rollback flag. + * @param baseVer Base version. + * @param committedVers Committed versions. + * @param rolledbackVers Rolled back versions. + * @param pendingVers Pending versions. + * @param txSize Expected transaction size. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. + * @param updateIdxs Partition update idxs. + * @param addDepInfo Deployment info flag. + */ + public GridDhtTxFinishRequest( + UUID nearNodeId, + IgniteUuid futId, + IgniteUuid miniId, + @NotNull AffinityTopologyVersion topVer, + GridCacheVersion xidVer, + GridCacheVersion commitVer, + long threadId, + TransactionIsolation isolation, + boolean commit, + boolean invalidate, + boolean sys, + byte plc, + boolean sysInvalidate, + boolean syncCommit, + boolean syncRollback, + GridCacheVersion baseVer, + Collection committedVers, + Collection rolledbackVers, + Collection pendingVers, + int txSize, + @Nullable UUID subjId, + int taskNameHash, + boolean addDepInfo, + Collection updateIdxs + ) { + this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc, + sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize, + subjId, taskNameHash, addDepInfo); + + if (updateIdxs != null && !updateIdxs.isEmpty()) { + partUpdateCnt = new GridLongList(updateIdxs.size()); + + for (Long idx : updateIdxs) + partUpdateCnt.add(idx); + } + } + + /** + * @return Partition update counters. + */ + public GridLongList partUpdateCounters(){ + return partUpdateCnt; + } + + /** * @return Mini ID. */ public IgniteUuid miniId() { @@ -294,36 +370,42 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); case 22: - if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) return false; writer.incrementState(); case 23: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 24: - if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 25: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) return false; writer.incrementState(); case 26: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 27: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 28: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -382,7 +464,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 22: - pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); + partUpdateCnt = reader.readMessage("partUpdateCnt"); if (!reader.isLastRead()) return false; @@ -390,7 +472,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 23: - subjId = reader.readUuid("subjId"); + pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -398,7 +480,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 24: - sysInvalidate = reader.readBoolean("sysInvalidate"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -406,7 +488,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 25: - taskNameHash = reader.readInt("taskNameHash"); + sysInvalidate = reader.readBoolean("sysInvalidate"); if (!reader.isLastRead()) return false; @@ -414,7 +496,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 26: - topVer = reader.readMessage("topVer"); + taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) return false; @@ -422,6 +504,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 27: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 28: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -441,6 +531,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 28; + return 29; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 75f8c2f..3ee1048 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; @@ -1204,7 +1205,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { Collection> deleted = null; try { - topology().readLock(); + GridDhtPartitionTopology top = topology(); + + top.readLock(); try { if (topology().stopping()) { @@ -1221,7 +1224,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { // Also do not check topology version if topology was locked on near node by // external transaction or explicit lock. if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || - !needRemap(req.topologyVersion(), topology().topologyVersion())) { + !needRemap(req.topologyVersion(), top.topologyVersion())) { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { @@ -1236,7 +1239,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (ver == null) { // Assign next version for update inside entries lock. - ver = ctx.versions().next(topology().topologyVersion()); + ver = ctx.versions().next(top.topologyVersion()); if (hasNear) res.nearVersion(ver); @@ -1248,6 +1251,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { log.debug("Using cache version for update request on primary node [ver=" + ver + ", req=" + req + ']'); + boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); + dhtFut = createDhtFuture(ver, req, res, completionCb, false); expiry = expiryPolicy(req.expiry()); @@ -1270,7 +1275,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { completionCb, ctx.isDrEnabled(), taskName, - expiry); + expiry, + sndPrevVal); deleted = updRes.deleted(); dhtFut = updRes.dhtFuture(); @@ -1289,7 +1295,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { completionCb, ctx.isDrEnabled(), taskName, - expiry); + expiry, + sndPrevVal); retVal = updRes.returnValue(); deleted = updRes.deleted(); @@ -1309,7 +1316,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { remap = true; } finally { - topology().readUnlock(); + top.readUnlock(); } } catch (GridCacheEntryRemovedException e) { @@ -1384,6 +1391,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param replicate Whether replication is enabled. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Deleted entries. * @throws GridCacheEntryRemovedException Should not be thrown. */ @@ -1399,7 +1407,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { CI2 completionCb, boolean replicate, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) throws GridCacheEntryRemovedException { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. @@ -1546,7 +1555,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); firstEntryIdx = i; @@ -1594,7 +1604,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); firstEntryIdx = i; @@ -1713,7 +1724,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); } else assert filtered.isEmpty(); @@ -1790,6 +1802,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param replicate Whether DR is enabled for that cache. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ @@ -1804,7 +1817,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { CI2 completionCb, boolean replicate, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) throws GridCacheEntryRemovedException { GridCacheReturn retVal = null; Collection> deleted = null; @@ -1861,7 +1875,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - req.returnValue(), + sndPrevVal || req.returnValue(), expiry, true, true, @@ -1876,7 +1890,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { true, intercept, req.subjectId(), - taskName); + taskName, + null, + null); if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -1901,7 +1917,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { entryProcessor, updRes.newTtl(), updRes.conflictExpireTime(), - newConflictVer); + newConflictVer, + sndPrevVal, + updRes.oldValue(), + updRes.updateCounter()); } if (!F.isEmpty(filteredReaders)) @@ -1918,6 +1937,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); } } + else if (!entry.isNear() && updRes.success()) { + ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(), + entry.isInternal() || !context().userCache(), entry.partition(), primary, false, + updRes.updateCounter(), topVer); + } if (hasNear) { if (primary && updRes.sendToDht()) { @@ -2008,6 +2032,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param batchRes Batch update result. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Deleted entries. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -2028,7 +2053,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { boolean replicate, UpdateBatchResult batchRes, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) { assert putMap == null ^ rmvKeys == null; @@ -2130,7 +2156,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { null, /*write-through*/false, /*read-through*/false, - /*retval*/false, + /*retval*/sndPrevVal, expiry, /*event*/true, /*metrics*/true, @@ -2145,7 +2171,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { /*conflict resolve*/false, /*intercept*/false, req.subjectId(), - taskName); + taskName, + null, + null); assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; @@ -2184,7 +2212,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { entryProcessor, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE, - null); + null, + sndPrevVal, + updRes.oldValue(), + updRes.updateCounter()); if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, @@ -2573,7 +2604,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { entry = entryExx(key); CacheObject val = req.value(i); + CacheObject prevVal = req.previousValue(i); + EntryProcessor entryProcessor = req.entryProcessor(i); + Long updateIdx = req.updateCounter(i); GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; @@ -2605,11 +2639,18 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { false, intercept, req.subjectId(), - taskName); + taskName, + prevVal, + updateIdx); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); + if (updRes.success() && !entry.isNear()) + ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), + updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(), + false, false, updRes.updateCounter(), req.topologyVersion()); + entry.onUnlock(); break; // While. http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index c34dcfd..c73b3b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -215,13 +215,17 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param updateCntr Partition update counter. */ public void addWriteEntry(GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor entryProcessor, long ttl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + @Nullable CacheObject prevVal, + @Nullable Long updateCntr) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); Collection dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); @@ -261,7 +265,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter entryProcessor, ttl, conflictExpireTime, - conflictVer); + conflictVer, + addPrevVal, + entry.partition(), + prevVal, + updateCntr); + } + else if (dhtNodes.size() == 1) { + try { + cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal, + entry.key().internal() || !cctx.userCache(), entry.partition(), true, false, + updateCntr, updateReq.topologyVersion()); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal=" + + val + ", err=" + e + "]"); + } } } } @@ -331,8 +350,56 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter cctx.mvcc().removeAtomicFuture(version()); if (err != null) { - for (KeyCacheObject key : keys) - updateRes.addFailedKey(key, err); + if (!mappings.isEmpty()) { + Collection hndKeys = new ArrayList<>(keys.size()); + + exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { + for (int i = 0; i < req.size(); i++) { + KeyCacheObject key = req.key(i); + + if (!hndKeys.contains(key)) { + updateRes.addFailedKey(key, err); + + cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i), + updateReq.topologyVersion()); + + hndKeys.add(key); + + if (hndKeys.size() == keys.size()) + break exit; + } + } + } + } + else + for (KeyCacheObject key : keys) + updateRes.addFailedKey(key, err); + } + else { + Collection hndKeys = new ArrayList<>(keys.size()); + + exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { + for (int i = 0; i < req.size(); i++) { + KeyCacheObject key = req.key(i); + + if (!hndKeys.contains(key)) { + try { + cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i), + key.internal() || !cctx.userCache(), req.partitionId(i), true, false, + req.updateCounter(i), updateReq.topologyVersion()); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal=" + + req.value(i) + ", err=" + e + "]"); + } + + hndKeys.add(key); + + if (hndKeys.size() == keys.size()) + break exit; + } + } + } } if (updateReq.writeSynchronizationMode() == FULL_SYNC)