Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 881AE200CE1 for ; Thu, 31 Aug 2017 16:48:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8480E16B3DD; Thu, 31 Aug 2017 14:48:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3437016B3DC for ; Thu, 31 Aug 2017 16:48:04 +0200 (CEST) Received: (qmail 76738 invoked by uid 500); 31 Aug 2017 14:48:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 76728 invoked by uid 99); 31 Aug 2017 14:48:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Aug 2017 14:48:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ABFBFF556C; Thu, 31 Aug 2017 14:48:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 31 Aug 2017 14:48:03 -0000 Message-Id: <10ba303bb4a64789aff897fcd4fa9ede@git.apache.org> In-Reply-To: <7c174840881341199c59addd8b33e5f7@git.apache.org> References: <7c174840881341199c59addd8b33e5f7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] ignite git commit: ignite-3478 archived-at: Thu, 31 Aug 2017 14:48:06 -0000 ignite-3478 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08be7310 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08be7310 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08be7310 Branch: refs/heads/ignite-3478 Commit: 08be7310a93d3ce455215b97cf8ab1a2c3f0ab31 Parents: 855c2d4 Author: sboikov Authored: Thu Aug 31 12:52:23 2017 +0300 Committer: sboikov Committed: Thu Aug 31 16:59:15 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 1 - .../discovery/GridDiscoveryManager.java | 3 - .../processors/cache/GridCacheEntryEx.java | 7 +- .../processors/cache/GridCacheMapEntry.java | 7 +- .../GridDistributedTxRemoteAdapter.java | 12 +- .../distributed/dht/GridDhtTxFinishFuture.java | 12 +- .../distributed/dht/GridDhtTxFinishRequest.java | 49 +- .../cache/distributed/dht/GridDhtTxLocal.java | 47 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 487 ++++++++++--------- .../dht/GridDhtTxPrepareRequest.java | 63 ++- .../GridDhtPartitionsExchangeFuture.java | 2 + .../GridNearPessimisticTxPrepareFuture.java | 37 ++ .../near/GridNearTxFinishFuture.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 20 +- .../near/GridNearTxPrepareRequest.java | 17 + .../near/GridNearTxPrepareResponse.java | 18 + .../mvcc/CacheCoordinatorsSharedManager.java | 77 ++- .../mvcc/CoordinatorAssignmentHistory.java | 71 +++ .../cache/transactions/IgniteInternalTx.java | 5 + .../cache/transactions/IgniteTxAdapter.java | 36 ++ .../cache/transactions/IgniteTxHandler.java | 17 +- .../IgniteTxImplicitSingleStateImpl.java | 7 + .../transactions/IgniteTxLocalAdapter.java | 15 +- .../IgniteTxRemoteSingleStateImpl.java | 5 + .../transactions/IgniteTxRemoteStateImpl.java | 10 + .../cache/transactions/IgniteTxState.java | 6 + .../cache/transactions/IgniteTxStateImpl.java | 9 + .../processors/cache/GridCacheTestEntryEx.java | 8 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 2 +- 29 files changed, 714 insertions(+), 340 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 4b57eb8..95e855a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.managers.discovery; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 8e3f9fc..cbd2738 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -30,7 +30,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,9 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.zip.CRC32; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index b2cabac..5b97195 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -381,7 +382,8 @@ public interface GridCacheEntryEx { @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable TxMvccVersion mvccVer ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** @@ -423,7 +425,8 @@ public interface GridCacheEntryEx { @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable TxMvccVersion mvccVer ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 61f6fb4..5336b22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy; @@ -888,7 +889,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable TxMvccVersion mvccVer ) throws IgniteCheckedException, GridCacheEntryRemovedException { CacheObject old; @@ -1082,7 +1084,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable UUID subjId, String taskName, @Nullable GridCacheVersion dhtVer, - @Nullable Long updateCntr + @Nullable Long updateCntr, + @Nullable TxMvccVersion mvccVer ) throws IgniteCheckedException, GridCacheEntryRemovedException { assert cctx.transactional(); http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index ea6461d..db1e2dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWra import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -474,6 +475,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter cctx.database().checkpointReadLock(); try { + TxMvccVersion mvccVer = createMvccVersion(); + Collection entries = near() ? allEntries() : writeEntries(); List dataEntries = null; @@ -594,7 +597,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer, - txEntry.updateCounter()); + txEntry.updateCounter(), + mvccVer); else { assert val != null : txEntry; @@ -618,7 +622,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer, - txEntry.updateCounter()); + txEntry.updateCounter(), + mvccVer); // Keep near entry up to date. if (nearCached != null) { @@ -650,7 +655,8 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter CU.subjectId(this, cctx), resolveTaskName(), dhtVer, - txEntry.updateCounter()); + txEntry.updateCounter(), + mvccVer); // Keep near entry up to date. if (nearCached != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 5311ddc..9ca1412 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFutu import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -347,7 +348,8 @@ public final class GridDhtTxFinishFuture extends GridCacheCompoundIdentity tx.taskNameHash(), tx.activeCachesDeploymentEnabled(), false, - false); + false, + TxMvccVersion.COUNTER_NA); try { cctx.io().send(n, req, tx.ioPolicy()); @@ -395,6 +397,8 @@ public final class GridDhtTxFinishFuture extends GridCacheCompoundIdentity if (tx.onePhaseCommit()) return false; + assert !commit || !tx.txState().mvccEnabled(cctx) || tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA; + boolean sync = tx.syncMode() == FULL_SYNC; if (tx.explicitLock()) @@ -450,7 +454,8 @@ public final class GridDhtTxFinishFuture extends GridCacheCompoundIdentity tx.activeCachesDeploymentEnabled(), updCntrs, false, - false); + false, + tx.mvccCoordinatorCounter()); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); @@ -519,7 +524,8 @@ public final class GridDhtTxFinishFuture extends GridCacheCompoundIdentity tx.taskNameHash(), tx.activeCachesDeploymentEnabled(), false, - false); + false, + tx.mvccCoordinatorCounter()); req.writeVersion(tx.writeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 90f3687..976a534 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -25,6 +25,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -66,6 +67,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** One phase commit write version. */ private GridCacheVersion writeVer; + /** */ + private long mvccCrdCntr = TxMvccVersion.COUNTER_NA; + /** * Empty constructor required for {@link Externalizable}. */ @@ -121,7 +125,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { int taskNameHash, boolean addDepInfo, boolean retVal, - boolean waitRemoteTxs + boolean waitRemoteTxs, + long mvccCrdCntr ) { super( xidVer, @@ -150,6 +155,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { this.nearNodeId = nearNodeId; this.isolation = isolation; this.miniId = miniId; + this.mvccCrdCntr = mvccCrdCntr; needReturnValue(retVal); waitRemoteTransactions(waitRemoteTxs); @@ -206,7 +212,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { boolean addDepInfo, Collection updateIdxs, boolean retVal, - boolean waitRemoteTxs + boolean waitRemoteTxs, + long mvccCrdCntr ) { this(nearNodeId, futId, @@ -231,7 +238,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { taskNameHash, addDepInfo, retVal, - waitRemoteTxs); + waitRemoteTxs, + mvccCrdCntr); if (updateIdxs != null && !updateIdxs.isEmpty()) { partUpdateCnt = new GridLongList(updateIdxs.size()); @@ -242,6 +250,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { } /** + * @return Counter. + */ + public long mvccCoordinatorCounter() { + return mvccCrdCntr; + } + + /** * @return Partition update counters. */ public GridLongList partUpdateCounters(){ @@ -367,24 +382,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { writer.incrementState(); case 23: - if (!writer.writeUuid("nearNodeId", nearNodeId)) + if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr)) return false; writer.incrementState(); case 24: - if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 25: - if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) return false; writer.incrementState(); case 26: + if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 27: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -427,7 +448,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 23: - nearNodeId = reader.readUuid("nearNodeId"); + mvccCrdCntr = reader.readLong("mvccCrdCntr"); if (!reader.isLastRead()) return false; @@ -435,7 +456,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 24: - partUpdateCnt = reader.readMessage("partUpdateCnt"); + nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) return false; @@ -443,7 +464,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 25: - pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); + partUpdateCnt = reader.readMessage("partUpdateCnt"); if (!reader.isLastRead()) return false; @@ -451,6 +472,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 26: + pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 27: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -470,7 +499,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 27; + return 28; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 5b8a7b5..44e2a54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -313,24 +314,10 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa /** * Prepares next batch of entries in dht transaction. * - * @param reads Read entries. - * @param writes Write entries. - * @param verMap Version map. - * @param msgId Message ID. - * @param nearMiniId Near mini future ID. - * @param txNodes Transaction nodes mapping. - * @param last {@code True} if this is last prepare request. + * @param req Prepare request. * @return Future that will be completed when locks are acquired. */ - public final IgniteInternalFuture prepareAsync( - @Nullable Collection reads, - @Nullable Collection writes, - Map verMap, - long msgId, - int nearMiniId, - Map> txNodes, - boolean last - ) { + public final IgniteInternalFuture prepareAsync(GridNearTxPrepareRequest req) { // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture fut = prepFut; @@ -344,14 +331,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa cctx, this, timeout, - nearMiniId, - verMap, - last, + req.miniId(), + req.dhtVersions(), + req.last(), needReturnValue()))) { GridDhtTxPrepareFuture f = prepFut; - assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + - "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; + assert f.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " + + "[futMiniId=" + f.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + f + ']'; if (timeout == -1) f.onError(timeoutException()); @@ -360,8 +347,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } else { - assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + - "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; + assert fut.nearMiniId() == req.miniId() : "Wrong near mini id on existing future " + + "[futMiniId=" + fut.nearMiniId() + ", miniId=" + req.miniId() + ", fut=" + fut + ']'; // Prepare was called explicitly. return chainOnePhasePrepare(fut); @@ -389,14 +376,14 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } try { - if (reads != null) { - for (IgniteTxEntry e : reads) - addEntry(msgId, e); + if (req.reads() != null) { + for (IgniteTxEntry e : req.reads()) + addEntry(req.messageId(), e); } - if (writes != null) { - for (IgniteTxEntry e : writes) - addEntry(msgId, e); + if (req.writes() != null) { + for (IgniteTxEntry e : req.writes()) + addEntry(req.messageId(), e); } userPrepare(null); @@ -407,7 +394,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (isSystemInvalidate()) fut.complete(); else - fut.prepare(reads, writes, txNodes); + fut.prepare(req); } catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) { fut.onError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 03d99fc..a3d67d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -59,7 +59,9 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -84,6 +86,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFutureCancelledException; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -168,14 +171,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture reads; - - /** Prepare writes. */ - private Iterable writes; - - /** Tx nodes. */ - private Map> txNodes; + /** Prepare request. */ + private GridNearTxPrepareRequest req; /** Trackable flag. */ private boolean trackable = true; @@ -341,7 +338,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture reads, Collection writes, - Map> txNodes) { + public void prepare(GridNearTxPrepareRequest req) { + assert req != null; + if (tx.empty()) { tx.setRollbackOnly(); onDone((GridNearTxPrepareResponse)null); } - this.reads = reads; - this.writes = writes; - this.txNodes = txNodes; + this.req = req; boolean ser = tx.serializable() && tx.optimistic(); - if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) { + if (!F.isEmpty(req.writes()) || (ser && !F.isEmpty(req.reads()))) { Map> forceKeys = null; - for (IgniteTxEntry entry : writes) + for (IgniteTxEntry entry : req.writes()) forceKeys = checkNeedRebalanceKeys(entry, forceKeys); if (ser) { - for (IgniteTxEntry entry : reads) + for (IgniteTxEntry entry : req.reads()) forceKeys = checkNeedRebalanceKeys(entry, forceKeys); } @@ -1191,15 +1187,17 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture waitCoordCntrFut = null; + + if (req.requestMvccCounter()) { + assert tx.txState().mvccEnabled(cctx); + + ClusterNode crd = cctx.coordinators().coordinator(tx.topologyVersion()); + + assert crd != null : tx.topologyVersion(); + + if (crd.isLocal()) + tx.mvccCoordinatorCounter(cctx.coordinators().requestTxCounterOnCoordinator(tx.nearXidVersion())); + else { + IgniteInternalFuture coordCntrFut = cctx.coordinators().requestTxCounter(crd, tx); + + if (tx.onePhaseCommit()) + waitCoordCntrFut = coordCntrFut; + } + } + // We are holding transaction-level locks for entries here, so we can get next write version. onEntriesLocked(); // We are holding transaction-level locks for entries here, so we can get next write version. tx.writeVersion(cctx.versions().next(tx.topologyVersion())); - { - // Assign keys to primary nodes. - if (!F.isEmpty(writes)) { - for (IgniteTxEntry write : writes) - map(tx.entry(write.txKey())); - } + // Assign keys to primary nodes. + if (!F.isEmpty(req.writes())) { + for (IgniteTxEntry write : req.writes()) + map(tx.entry(write.txKey())); + } - if (!F.isEmpty(reads)) { - for (IgniteTxEntry read : reads) - map(tx.entry(read.txKey())); - } + if (!F.isEmpty(req.reads())) { + for (IgniteTxEntry read : req.reads()) + map(tx.entry(read.txKey())); } if (isDone()) return; if (last) { - if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) { - for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { - if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { - tx.onePhaseCommit(false); + if (waitCoordCntrFut != null) { + skipInit = true; - break; - } - } - } - - int miniId = 0; + waitCoordCntrFut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); - assert tx.transactionNodes() != null; + sendPrepareRequests(); + } + catch (Throwable e) { + U.error(log, "Failed to get coordinator counter: " + e, e); - final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; + GridNearTxPrepareResponse res = createPrepareResponse(e); - // Create mini futures. - for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { - assert !dhtMapping.empty(); + onDone(res, res.error()); + } + finally { + markInitialized(); + } + } + }); + } + else + sendPrepareRequests(); + } + } + finally { + if (!skipInit) + markInitialized(); + } + } - ClusterNode n = dhtMapping.primary(); + /** + * + */ + private void sendPrepareRequests() { + if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) { + for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { + if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { + tx.onePhaseCommit(false); - assert !n.isLocal(); + break; + } + } + } - GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id()); + assert !tx.txState().mvccEnabled(cctx) || !tx.onePhaseCommit() || tx.mvccCoordinatorCounter() != TxMvccVersion.COUNTER_NA; - Collection nearWrites = nearMapping == null ? null : nearMapping.writes(); + int miniId = 0; - Collection dhtWrites = dhtMapping.writes(); + assert tx.transactionNodes() != null; - if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) - continue; + final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; - if (tx.remainingTime() == -1) - return; + // Create mini futures. + for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { + assert !dhtMapping.empty(); - MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); + ClusterNode n = dhtMapping.primary(); - add(fut); // Append new future. + assert !n.isLocal(); - assert txNodes != null; + GridDistributedTxMapping nearMapping = tx.nearMap().get(n.id()); - GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - timeout, - dhtWrites, - nearWrites, - txNodes, - tx.nearXidVersion(), - true, - tx.onePhaseCommit(), - tx.subjectId(), - tx.taskNameHash(), - tx.activeCachesDeploymentEnabled(), - tx.storeWriteThrough(), - retVal); + Collection nearWrites = nearMapping == null ? null : nearMapping.writes(); - int idx = 0; + Collection dhtWrites = dhtMapping.writes(); - for (IgniteTxEntry entry : dhtWrites) { - try { - GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); + if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) + continue; - GridCacheContext cacheCtx = cached.context(); + if (tx.remainingTime() == -1) + return; - // Do not invalidate near entry on originating transaction node. - req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && - cached.readerId(n.id()) != null); + MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); + + add(fut); // Append new future. + + assert req.transactionNodes() != null; + + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + timeout, + dhtWrites, + nearWrites, + this.req.transactionNodes(), + tx.nearXidVersion(), + true, + tx.onePhaseCommit(), + tx.subjectId(), + tx.taskNameHash(), + tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), + retVal, + tx.mvccCoordinatorCounter()); + + int idx = 0; + + for (IgniteTxEntry entry : dhtWrites) { + try { + GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); - if (cached.isNewLocked()) { - List owners = cacheCtx.topology().owners(cached.partition(), - tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion()); + GridCacheContext cacheCtx = cached.context(); - // Do not preload if local node is a partition owner. - if (!owners.contains(cctx.localNode())) - req.markKeyForPreload(idx); - } + // Do not invalidate near entry on originating transaction node. + req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && + cached.readerId(n.id()) != null); - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } + if (cached.isNewLocked()) { + List owners = cacheCtx.topology().owners(cached.partition(), + tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion()); - idx++; + // Do not preload if local node is a partition owner. + if (!owners.contains(cctx.localNode())) + req.markKeyForPreload(idx); } - if (!F.isEmpty(nearWrites)) { - for (IgniteTxEntry entry : nearWrites) { - try { - if (entry.explicitVersion() == null) { - GridCacheMvccCandidate added = entry.cached().candidate(version()); + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; + } - assert added != null : "Missing candidate for cache entry:" + entry; - assert added.dhtLocal(); + idx++; + } - if (added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - } + if (!F.isEmpty(nearWrites)) { + for (IgniteTxEntry entry : nearWrites) { + try { + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - break; - } - catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } + assert added != null : "Missing candidate for cache entry:" + entry; + assert added.dhtLocal(); + + if (added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); } + + break; + } + catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; } + } + } - assert req.transactionNodes() != null; + assert req.transactionNodes() != null; - try { - cctx.io().send(n, req, tx.ioPolicy()); + try { + cctx.io().send(n, req, tx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + ']'); } - catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + ']'); - } - fut.onResult(e); - } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + n.id() + - ", err=" + e + ']'); - } - } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request dht, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); } } + } + } - for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { - if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { - if (tx.remainingTime() == -1) - return; - - MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping); - - add(fut); // Append new future. - - GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( - futId, - fut.futureId(), - tx.topologyVersion(), - tx, - timeout, - null, - nearMapping.writes(), - tx.transactionNodes(), - tx.nearXidVersion(), - true, - tx.onePhaseCommit(), - tx.subjectId(), - tx.taskNameHash(), - tx.activeCachesDeploymentEnabled(), - tx.storeWriteThrough(), - retVal); - - for (IgniteTxEntry entry : nearMapping.entries()) { - if (CU.writes().apply(entry)) { - try { - if (entry.explicitVersion() == null) { - GridCacheMvccCandidate added = entry.cached().candidate(version()); + for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { + if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { + if (tx.remainingTime() == -1) + return; - assert added != null : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping); + + add(fut); // Append new future. + + GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( + futId, + fut.futureId(), + tx.topologyVersion(), + tx, + timeout, + null, + nearMapping.writes(), + tx.transactionNodes(), + tx.nearXidVersion(), + true, + tx.onePhaseCommit(), + tx.subjectId(), + tx.taskNameHash(), + tx.activeCachesDeploymentEnabled(), + tx.storeWriteThrough(), + retVal, + tx.mvccCoordinatorCounter()); + + for (IgniteTxEntry entry : nearMapping.entries()) { + if (CU.writes().apply(entry)) { + try { + if (entry.explicitVersion() == null) { + GridCacheMvccCandidate added = entry.cached().candidate(version()); - if (added != null && added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - } + assert added != null : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; - break; - } catch (GridCacheEntryRemovedException ignore) { - assert false : "Got removed exception on entry with dht local candidate: " + entry; - } + if (added != null && added.ownerVersion() != null) + req.owned(entry.txKey(), added.ownerVersion()); } + + break; + } catch (GridCacheEntryRemovedException ignore) { + assert false : "Got removed exception on entry with dht local candidate: " + entry; } + } + } - assert req.transactionNodes() != null; + assert req.transactionNodes() != null; - try { - cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); + try { + cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - fut.onNodeLeft(); + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + fut.onNodeLeft(); + } + catch (IgniteCheckedException e) { + if (!cctx.kernalContext().isStopping()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + ']'); } - catch (IgniteCheckedException e) { - if (!cctx.kernalContext().isStopping()) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + ']'); - } - fut.onResult(e); - } - else { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.primary().id() + - ", err=" + e + ']'); - } - } + fut.onResult(e); + } + else { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nearMapping.primary().id() + + ", err=" + e + ']'); } } } } } - finally { - markInitialized(); - } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index d334850..805c34d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -103,6 +104,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { @GridDirectTransient private List nearWritesCacheMissed; + /** */ + private long mvccCrdCntr = TxMvccVersion.COUNTER_NA; + /** * Empty constructor required for {@link Externalizable}. */ @@ -141,7 +145,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { int taskNameHash, boolean addDepInfo, boolean storeWriteThrough, - boolean retVal) { + boolean retVal, + long mvccCrdCntr) { super(tx, timeout, null, @@ -169,6 +174,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size()); nearNodeId = tx.nearNodeId(); + this.mvccCrdCntr = mvccCrdCntr; + } + + /** + * @return Counter. + */ + public long mvccCoordinatorCounter() { + return mvccCrdCntr; } /** @@ -407,54 +420,60 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { writer.incrementState(); case 23: - if (!writer.writeUuid("nearNodeId", nearNodeId)) + if (!writer.writeLong("mvccCrdCntr", mvccCrdCntr)) return false; writer.incrementState(); case 24: - if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 25: - if (!writer.writeMessage("nearXidVer", nearXidVer)) + if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 26: - if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); case 27: - if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 28: - if (!writer.writeBitSet("preloadKeys", preloadKeys)) + if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 29: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); case 30: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 31: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 32: if (!writer.writeMessage("topVer", topVer)) return false; @@ -501,7 +520,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 23: - nearNodeId = reader.readUuid("nearNodeId"); + mvccCrdCntr = reader.readLong("mvccCrdCntr"); if (!reader.isLastRead()) return false; @@ -509,7 +528,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 24: - nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); + nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) return false; @@ -517,7 +536,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 25: - nearXidVer = reader.readMessage("nearXidVer"); + nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -525,7 +544,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 26: - ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); + nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) return false; @@ -533,7 +552,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 27: - ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); + ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -541,7 +560,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 28: - preloadKeys = reader.readBitSet("preloadKeys"); + ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -549,7 +568,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 29: - subjId = reader.readUuid("subjId"); + preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) return false; @@ -557,7 +576,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 30: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -565,6 +584,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 31: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 32: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -584,6 +611,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 32; + return 33; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 6d85222..82bd463 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1418,6 +1418,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (err == null) { + cctx.coordinators().assignCoordinator(exchCtx.events().discoveryCache()); + if (centralizedAff) { assert !exchCtx.mergeExchanges(); http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index d017d7d..0cccce3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -262,6 +263,18 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA AffinityTopologyVersion topVer = tx.topologyVersion(); + ClusterNode mvccCrd = null; + + if (tx.txState().mvccEnabled(cctx)) { + mvccCrd = cctx.coordinators().coordinator(topVer); + + if (mvccCrd == null) { + onDone(new ClusterTopologyCheckedException("Mvcc coordinator is not assigned: " + topVer)); + + return; + } + } + GridDhtTxMapping txMapping = new GridDhtTxMapping(); boolean hasNearCache = false; @@ -326,6 +339,16 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA for (final GridDistributedTxMapping m : mappings.values()) { final ClusterNode primary = m.primary(); + boolean needCntr = false; + + if (mvccCrd != null) { + if (tx.onePhaseCommit() || mvccCrd.equals(primary)) { + needCntr = true; + + mvccCrd = null; + } + } + if (primary.isLocal()) { if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) { GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(), @@ -334,6 +357,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA m.nearEntriesReads(), m.nearEntriesWrites()); + nearReq.requestMvccCounter(needCntr); + prepareLocal(nearReq, m, ++miniId, true); GridNearTxPrepareRequest colocatedReq = createRequest(txNodes, @@ -347,6 +372,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA else { GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes()); + req.requestMvccCounter(needCntr); + prepareLocal(req, m, ++miniId, m.hasNearCacheEntries()); } } @@ -357,6 +384,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA m.reads(), m.writes()); + req.requestMvccCounter(needCntr); + final MiniFuture fut = new MiniFuture(m, ++miniId); req.miniId(fut.futureId()); @@ -389,6 +418,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } } + if (mvccCrd != null) { + assert !tx.onePhaseCommit(); + + IgniteInternalFuture cntrFut = cctx.coordinators().requestTxCounter(mvccCrd, tx); + + add((IgniteInternalFuture)cntrFut); + } + markInitialized(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index c45eb7b..e093eeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -847,7 +848,8 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit 0, tx.activeCachesDeploymentEnabled(), !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()), - waitRemoteTxs); + waitRemoteTxs, + TxMvccVersion.COUNTER_NA); finishReq.checkCommitted(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 55d6bdd..8ecf21f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -105,14 +105,12 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER; import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; -import static org.apache.ignite.transactions.TransactionState.ACTIVE; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; -import static org.apache.ignite.transactions.TransactionState.SUSPENDED; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** @@ -3338,19 +3336,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea /** * Prepares next batch of entries in dht transaction. * - * @param reads Read entries. - * @param writes Write entries. - * @param txNodes Transaction nodes mapping. - * @param last {@code True} if this is last prepare request. + * @param req Prepare request. * @return Future that will be completed when locks are acquired. */ @SuppressWarnings("TypeMayBeWeakened") - public IgniteInternalFuture prepareAsyncLocal( - @Nullable Collection reads, - @Nullable Collection writes, - Map> txNodes, - boolean last - ) { + public IgniteInternalFuture prepareAsyncLocal(GridNearTxPrepareRequest req) { long timeout = remainingTime(); if (state() != PREPARING) { @@ -3375,11 +3365,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea timeout, 0, Collections.emptyMap(), - last, + req.last(), needReturnValue() && implicit()); try { - userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes); + userPrepare((serializable() && optimistic()) ? F.concat(false, req.writes(), req.reads()) : req.writes()); // Make sure to add future before calling prepare on it. cctx.mvcc().addFuture(fut); @@ -3387,7 +3377,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea if (isSystemInvalidate()) fut.complete(); else - fut.prepare(reads, writes, txNodes); + fut.prepare(req); } catch (IgniteTxTimeoutCheckedException | IgniteTxOptimisticCheckedException e) { fut.onError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index e352c87..e1c6636 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -57,6 +57,9 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { /** */ private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10; + /** */ + private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x02; + /** Future ID. */ private IgniteUuid futId; @@ -149,6 +152,20 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { } /** + * @return {@code True} if need request MVCC counter on primary node on prepare step. + */ + public boolean requestMvccCounter() { + return isFlag(REQUEST_MVCC_CNTR_FLAG_MASK); + } + + /** + * @param val {@code True} if need request MVCC counter on primary node on prepare step. + */ + public void requestMvccCounter(boolean val) { + setFlag(val, REQUEST_MVCC_CNTR_FLAG_MASK); + } + + /** * @return {@code True} if it is safe for first client request to wait for topology future * completion. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 8162168..4233371 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -97,6 +98,9 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse /** Not {@code null} if client node should remap transaction. */ private AffinityTopologyVersion clientRemapVer; + /** */ + private long mvccCrdCntr = TxMvccVersion.COUNTER_NA; + /** * Empty constructor required by {@link Externalizable}. */ @@ -146,6 +150,20 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse } /** + * @param mvccCrdCntr Counter. + */ + public void mvccCoordinatorCounter(long mvccCrdCntr) { + this.mvccCrdCntr = mvccCrdCntr; + } + + /** + * @return Counter. + */ + public long mvccCoordinatorCounter() { + return mvccCrdCntr; + } + + /** * @return One-phase commit state on primary node. */ public boolean onePhaseCommit() { http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java index e5d07ea..ec29002 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsSharedManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.mvcc; -import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,9 +30,12 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -47,6 +49,9 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS */ public class CacheCoordinatorsSharedManager extends GridCacheSharedManagerAdapter { /** */ + private final CoordinatorAssignmentHistory assignHist = new CoordinatorAssignmentHistory(); + + /** */ private final AtomicLong mvccCntr = new AtomicLong(0L); /** */ @@ -74,20 +79,28 @@ public class CacheCoordinatorsSharedManager extends GridCacheSharedManager cctx.gridIO().addMessageListener(TOPIC_CACHE_COORDINATOR, new CoordinatorMessageListener()); } + public long requestTxCounterOnCoordinator(GridCacheVersion txVer) { + assert cctx.localNode().equals(assignHist.currentCoordinator()); + + return assignTxCounter(txVer); + } + /** * @param crd Coordinator. - * @param txId Transaction ID. + * @param tx Transaction. * @return Counter request future. */ - public IgniteInternalFuture requestTxCounter(ClusterNode crd, GridCacheVersion txId) { - MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd); + public IgniteInternalFuture requestTxCounter(ClusterNode crd, IgniteInternalTx tx) { + assert !crd.isLocal() : crd; + + MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, tx); cntrFuts.put(fut.id, fut); try { cctx.gridIO().sendToGridTopic(crd, TOPIC_CACHE_COORDINATOR, - new CoordinatorTxCounterRequest(fut.id, txId), + new CoordinatorTxCounterRequest(fut.id, tx.nearXidVersion()), SYSTEM_POOL); } catch (IgniteCheckedException e) { @@ -98,8 +111,12 @@ public class CacheCoordinatorsSharedManager extends GridCacheSharedManager return fut; } + /** + * @param crd Coordinator. + * @return Counter request future. + */ public IgniteInternalFuture requestQueryCounter(ClusterNode crd) { - MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd); + MvccCounterFuture fut = new MvccCounterFuture(futIdCntr.incrementAndGet(), crd, null); cntrFuts.put(fut.id, fut); @@ -118,6 +135,7 @@ public class CacheCoordinatorsSharedManager extends GridCacheSharedManager } /** + * @param crd Coordinator. * @param txId Transaction ID. * @return Acknowledge future. */ @@ -144,6 +162,10 @@ public class CacheCoordinatorsSharedManager extends GridCacheSharedManager return fut; } + /** + * @param crd Coordinator. + * @param txId Transaction ID. + */ public void ackTxRollback(ClusterNode crd, GridCacheVersion txId) { CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, txId); @@ -329,14 +351,32 @@ public class CacheCoordinatorsSharedManager extends GridCacheSharedManager } /** - * @param discoCache Cluster topology. - * @return Assigned coordinator. + * @param topVer Topology version. + * @return MVCC coordinator for given topology version. */ - @Nullable public ClusterNode assignCoordinator(DiscoCache discoCache) { - // TODO IGNITE-3478 - List srvNodes = discoCache.serverNodes(); + @Nullable public ClusterNode coordinator(AffinityTopologyVersion topVer) { + return assignHist.coordinator(topVer); + } + + /** + * @param discoCache Discovery snapshot. + */ + public void assignCoordinator(DiscoCache discoCache) { + ClusterNode curCrd = assignHist.currentCoordinator(); + + if (curCrd == null || !discoCache.allNodes().contains(curCrd)) { + ClusterNode newCrd = null; + + if (!discoCache.serverNodes().isEmpty()) + newCrd = discoCache.serverNodes().get(0); - return srvNodes.isEmpty() ? null : srvNodes.get(0); + if (!F.eq(curCrd, newCrd)) { + assignHist.addAssignment(discoCache.version(), newCrd); + + log.info("Assigned mvcc coordinator [topVer=" + discoCache.version() + + ", crd=" + newCrd + ']'); + } + } } /** @@ -347,21 +387,30 @@ public class CacheCoordinatorsSharedManager extends GridCacheSharedManager private final Long id; /** */ + private IgniteInternalTx tx; + + /** */ private final ClusterNode crd; /** * @param id Future ID. * @param crd Coordinator. */ - MvccCounterFuture(Long id, ClusterNode crd) { + MvccCounterFuture(Long id, ClusterNode crd, IgniteInternalTx tx) { this.id = id; this.crd = crd; + this.tx = tx; } /** * @param cntr Counter. */ void onResponse(long cntr) { + assert cntr != TxMvccVersion.COUNTER_NA; + + if (tx != null) + tx.mvccCoordinatorCounter(cntr); + onDone(cntr); } @@ -448,7 +497,7 @@ public class CacheCoordinatorsSharedManager extends GridCacheSharedManager else if (msg instanceof CoordinatorQueryCounterRequest) processCoordinatorQueryStateRequest(nodeId, (CoordinatorQueryCounterRequest)msg); else - U.warn(log, "Unexpected message received: " + msg); + U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']'); } } }