Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 20980200C5C for ; Thu, 20 Apr 2017 10:39:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1F469160BB0; Thu, 20 Apr 2017 08:39:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0C884160BE4 for ; Thu, 20 Apr 2017 10:39:14 +0200 (CEST) Received: (qmail 7007 invoked by uid 500); 20 Apr 2017 08:39:14 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 3530 invoked by uid 99); 20 Apr 2017 08:39:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 08:39:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 84DB0E00C4; Thu, 20 Apr 2017 08:39:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Thu, 20 Apr 2017 08:40:13 -0000 Message-Id: In-Reply-To: <2e8d7ef222ea405998aa3e7df9518002@git.apache.org> References: <2e8d7ef222ea405998aa3e7df9518002@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [66/71] [abbrv] ignite git commit: ignite-1561 Fixed tx prepare for cross cache tx with near + colocated cache archived-at: Thu, 20 Apr 2017 08:39:21 -0000 ignite-1561 Fixed tx prepare for cross cache tx with near + colocated cache Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5088265 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5088265 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5088265 Branch: refs/heads/ignite-4535 Commit: a5088265d7927aab702425249b3f0d6996cb989e Parents: badf49c Author: sboikov Authored: Thu Apr 20 08:29:42 2017 +0300 Committer: sboikov Committed: Thu Apr 20 08:29:42 2017 +0300 ---------------------------------------------------------------------- .../cache/GridCacheSharedContext.java | 6 +- .../processors/cache/GridCacheUtils.java | 83 +++++-- .../distributed/GridDistributedTxMapping.java | 68 +++++- .../GridDistributedTxRemoteAdapter.java | 2 +- .../cache/distributed/dht/GridDhtTxLocal.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 2 + ...arOptimisticSerializableTxPrepareFuture.java | 218 ++++++++++++------- .../near/GridNearOptimisticTxPrepareFuture.java | 95 ++++++-- .../GridNearPessimisticTxPrepareFuture.java | 186 ++++++++++------ .../near/GridNearTxFinishFuture.java | 6 +- .../cache/distributed/near/GridNearTxLocal.java | 30 +-- .../near/GridNearTxPrepareFutureAdapter.java | 30 +-- .../cache/transactions/IgniteTxHandler.java | 49 +---- .../IgniteTxImplicitSingleStateImpl.java | 6 + .../transactions/IgniteTxLocalAdapter.java | 7 +- .../cache/transactions/IgniteTxLocalState.java | 10 + .../cache/transactions/IgniteTxManager.java | 7 +- .../cache/transactions/IgniteTxState.java | 2 +- .../cache/transactions/IgniteTxStateImpl.java | 52 +++-- .../lang/gridfunc/PredicateCollectionView.java | 7 +- .../util/lang/gridfunc/PredicateMapView.java | 6 - .../util/lang/gridfunc/PredicateSetView.java | 6 - .../lang/gridfunc/ReadOnlyCollectionView.java | 6 - .../lang/gridfunc/ReadOnlyCollectionView2X.java | 6 - .../lang/gridfunc/TransformCollectionView.java | 7 +- .../util/lang/gridfunc/TransformMapView.java | 6 - ...sCacheTxNearEnabledRandomOperationsTest.java | 28 +++ .../cache/CrossCacheTxRandomOperationsTest.java | 23 +- .../dht/GridNearCacheTxNodeFailureSelfTest.java | 31 --- .../dht/IgniteCrossCacheTxSelfTest.java | 8 + .../IgniteCacheFailoverTestSuite.java | 2 - .../testsuites/IgniteCacheTestSuite2.java | 2 + 32 files changed, 631 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 34bb321..79083e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -52,7 +52,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TransactionMetri import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; -import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -742,7 +742,7 @@ public class GridCacheSharedContext { * @param cacheCtx Cache context. * @return Error message if transactions are incompatible. */ - @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridLongList activeCacheIds, + @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridIntList activeCacheIds, GridCacheContext cacheCtx) { if (cacheCtx.systemTx() && !tx.system()) return "system cache can be enlisted only in system transaction"; @@ -751,7 +751,7 @@ public class GridCacheSharedContext { return "non-system cache can't be enlisted in system transaction"; for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); GridCacheContext activeCacheCtx = cacheContext(cacheId); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index d20a782..51a95a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -211,14 +211,6 @@ public class GridCacheUtils { /** Default transaction config. */ private static final TransactionConfiguration DEFAULT_TX_CFG = new TransactionConfiguration(); - /** Partition to state transformer. */ - private static final IgniteClosure PART2STATE = - new C1() { - @Override public GridDhtPartitionState apply(GridDhtLocalPartition p) { - return p.state(); - } - }; - /** Empty predicate array. */ private static final IgnitePredicate[] EMPTY_FILTER = new IgnitePredicate[0]; @@ -247,24 +239,79 @@ public class GridCacheUtils { private static final CacheEntryPredicate[] ALWAYS_FALSE0_ARR = new CacheEntryPredicate[] {ALWAYS_FALSE0}; /** Read filter. */ - private static final IgnitePredicate READ_FILTER = new P1() { - @Override public boolean apply(Object e) { - return ((IgniteTxEntry)e).op() == READ; + public static final IgnitePredicate READ_FILTER = new P1() { + @Override public boolean apply(IgniteTxEntry e) { + return e.op() == READ; + } + + @Override public String toString() { + return "READ_FILTER"; + } + }; + + /** Read filter. */ + public static final IgnitePredicate READ_FILTER_NEAR = new P1() { + @Override public boolean apply(IgniteTxEntry e) { + return e.op() == READ && e.context().isNear(); + } + + @Override public String toString() { + return "READ_FILTER_NEAR"; + } + }; + + /** Read filter. */ + public static final IgnitePredicate READ_FILTER_COLOCATED = new P1() { + @Override public boolean apply(IgniteTxEntry e) { + return e.op() == READ && !e.context().isNear(); + } + + @Override public String toString() { + return "READ_FILTER_COLOCATED"; + } + }; + + /** Write filter. */ + public static final IgnitePredicate WRITE_FILTER = new P1() { + @Override public boolean apply(IgniteTxEntry e) { + return e.op() != READ; + } + + @Override public String toString() { + return "WRITE_FILTER"; + } + }; + + /** Write filter. */ + public static final IgnitePredicate WRITE_FILTER_NEAR = new P1() { + @Override public boolean apply(IgniteTxEntry e) { + return e.op() != READ && e.context().isNear(); + } + + @Override public String toString() { + return "WRITE_FILTER_NEAR"; + } + }; + + /** Write filter. */ + public static final IgnitePredicate WRITE_FILTER_COLOCATED = new P1() { + @Override public boolean apply(IgniteTxEntry e) { + return e.op() != READ && !e.context().isNear(); } @Override public String toString() { - return "Cache transaction read filter"; + return "WRITE_FILTER_COLOCATED"; } }; /** Write filter. */ - private static final IgnitePredicate WRITE_FILTER = new P1() { - @Override public boolean apply(Object e) { - return ((IgniteTxEntry)e).op() != READ; + public static final IgnitePredicate FILTER_NEAR_CACHE_ENTRY = new P1() { + @Override public boolean apply(IgniteTxEntry e) { + return e.context().isNear(); } @Override public String toString() { - return "Cache transaction write filter"; + return "FILTER_NEAR_CACHE_ENTRY"; } }; @@ -613,7 +660,7 @@ public class GridCacheUtils { * @return Filter for transaction reads. */ @SuppressWarnings({"unchecked"}) - public static IgnitePredicate reads() { + public static IgnitePredicate reads() { return READ_FILTER; } @@ -621,7 +668,7 @@ public class GridCacheUtils { * @return Filter for transaction writes. */ @SuppressWarnings({"unchecked"}) - public static IgnitePredicate writes() { + public static IgnitePredicate writes() { return WRITE_FILTER; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index f8cec50..45903aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -52,8 +52,8 @@ public class GridDistributedTxMapping { /** {@code True} if this is last mapping for node. */ private boolean last; - /** {@code True} if mapping is for near caches, {@code false} otherwise. */ - private boolean near; + /** Near cache entries count. */ + private int nearEntries; /** {@code True} if this is first mapping for optimistic tx on client node. */ private boolean clientFirst; @@ -96,17 +96,17 @@ public class GridDistributedTxMapping { } /** - * @return {@code True} if mapping is for near caches, {@code false} otherwise. + * @return {@code True} if has colocated cache entries. */ - public boolean near() { - return near; + public boolean hasColocatedCacheEntries() { + return entries.size() > nearEntries; } /** - * @param near {@code True} if mapping is for near caches, {@code false} otherwise. + * @return {@code True} if has near cache entries. */ - public void near(boolean near) { - this.near = near; + public boolean hasNearCacheEntries() { + return nearEntries > 0; } /** @@ -124,6 +124,15 @@ public class GridDistributedTxMapping { } /** + * @return Near cache entries. + */ + @Nullable public Collection nearCacheEntries() { + assert nearEntries > 0; + + return F.view(entries, CU.FILTER_NEAR_CACHE_ENTRY); + } + + /** * @return {@code True} if lock is explicit. */ public boolean explicitLock() { @@ -159,21 +168,58 @@ public class GridDistributedTxMapping { * @return Reads. */ public Collection reads() { - return F.view(entries, CU.reads()); + return F.view(entries, CU.READ_FILTER); } /** * @return Writes. */ public Collection writes() { - return F.view(entries, CU.writes()); + return F.view(entries, CU.WRITE_FILTER); + } + + /** + * @return Near cache reads. + */ + public Collection nearEntriesReads() { + assert hasNearCacheEntries(); + + return F.view(entries, CU.READ_FILTER_NEAR); + } + + /** + * @return Near cache writes. + */ + public Collection nearEntriesWrites() { + assert hasNearCacheEntries(); + + return F.view(entries, CU.WRITE_FILTER_NEAR); + } + + /** + * @return Colocated cache reads. + */ + public Collection colocatedEntriesReads() { + assert hasColocatedCacheEntries(); + + return F.view(entries, CU.READ_FILTER_COLOCATED); + } + + /** + * @return Colocated cache writes. + */ + public Collection colocatedEntriesWrites() { + assert hasColocatedCacheEntries(); + + return F.view(entries, CU.WRITE_FILTER_COLOCATED); } /** * @param entry Adds entry. */ public void add(IgniteTxEntry entry) { - entries.add(entry); + if (entries.add(entry) && entry.context().isNear()) + nearEntries++; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index de8b29e..9cb04d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -387,7 +387,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } try { - cctx.tm().prepareTx(this); + cctx.tm().prepareTx(this, null); if (pessimistic() || isSystemInvalidate()) state(PREPARED); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index b1c7e5b..26f08fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -399,7 +399,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa addEntry(msgId, e); } - userPrepare(); + userPrepare(null); // Make sure to add future before calling prepare on it. cctx.mvcc().addFuture(fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 6e7b324..464df6e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -920,6 +920,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture, GridDistributedTxMapping> mappings = new HashMap<>(); + Map mappings = new HashMap<>(); - for (IgniteTxEntry write : writes) + boolean hasNearCache = false; + + for (IgniteTxEntry write : writes) { map(write, topVer, mappings, txMapping, remap, topLocked); + if (write.context().isNear()) + hasNearCache = true; + } + for (IgniteTxEntry read : reads) map(read, topVer, mappings, txMapping, remap, topLocked); @@ -363,12 +368,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim tx.transactionNodes(txMapping.transactionNodes()); - checkOnePhase(txMapping); + if (!hasNearCache) + checkOnePhase(txMapping); + MiniFuture locNearEntriesFut = null; + + // Create futures in advance to have all futures when process {@link GridNearTxPrepareResponse#clientRemapVersion}. for (GridDistributedTxMapping m : mappings.values()) { assert !m.empty(); - add(new MiniFuture(this, m, ++miniId)); + MiniFuture fut = new MiniFuture(this, m, ++miniId); + + add(fut); + + if (m.primary().isLocal() && m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) { + assert locNearEntriesFut == null; + + locNearEntriesFut = fut; + + add(new MiniFuture(this, m, ++miniId)); + } } Collection> futs = (Collection)futures(); @@ -383,7 +402,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim MiniFuture fut = (MiniFuture)fut0; - IgniteCheckedException err = prepare(fut, txMapping); + IgniteCheckedException err = prepare(fut, txMapping.transactionNodes(), locNearEntriesFut); if (err != null) { while (it.hasNext()) { @@ -417,9 +436,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * @param fut Mini future. + * @param txNodes Tx nodes. + * @param locNearEntriesFut Local future for near cache entries prepare. * @return Prepare error if any. */ - @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) { + @Nullable private IgniteCheckedException prepare(final MiniFuture fut, + Map> txNodes, + @Nullable MiniFuture locNearEntriesFut) { GridDistributedTxMapping m = fut.mapping(); final ClusterNode primary = m.primary(); @@ -434,36 +457,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim return err; } - GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( - futId, - tx.topologyVersion(), - tx, - timeout, - m.reads(), - m.writes(), - m.near(), - txMapping.transactionNodes(), - m.last(), - tx.onePhaseCommit(), - tx.needReturnValue() && tx.implicit(), - tx.implicitSingle(), - m.explicitLock(), - tx.subjectId(), - tx.taskNameHash(), - m.clientFirst(), - tx.activeCachesDeploymentEnabled()); - - for (IgniteTxEntry txEntry : m.entries()) { - if (txEntry.op() == TRANSFORM) - req.addDhtVersion(txEntry.txKey(), null); - } - // Must lock near entries separately. - if (m.near()) { + if (m.hasNearCacheEntries()) { try { - tx.optimisticLockEntries(m.entries()); - - tx.userPrepare(); + cctx.tm().prepareTx(tx, m.nearCacheEntries()); } catch (IgniteCheckedException e) { fut.onResult(e); @@ -472,27 +469,36 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } } - req.miniId(fut.futureId()); - - // If this is the primary node for the keys. if (primary.isLocal()) { - IgniteInternalFuture prepFut = cctx.tm().txHandler().prepareTx(primary.id(), - tx, - req); - - prepFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture prepFut) { - try { - fut.onResult(prepFut.get()); - } - catch (IgniteCheckedException e) { - fut.onResult(e); - } - } - }); + if (locNearEntriesFut != null) { + boolean nearEntries = fut == locNearEntriesFut; + + GridNearTxPrepareRequest req = createRequest(txNodes, + fut, + timeout, + nearEntries ? m.nearEntriesReads() : m.colocatedEntriesReads(), + nearEntries ? m.nearEntriesWrites() : m.colocatedEntriesWrites()); + + prepareLocal(req, fut, nearEntries); + } + else { + GridNearTxPrepareRequest req = createRequest(txNodes, + fut, + timeout, + m.reads(), + m.writes()); + + prepareLocal(req, fut, m.hasNearCacheEntries()); + } } else { try { + GridNearTxPrepareRequest req = createRequest(txNodes, + fut, + timeout, + m.reads(), + m.writes()); + cctx.io().send(primary, req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException e) { @@ -513,16 +519,86 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } /** + * @param txNodes Tx nodes. + * @param fut Future. + * @param timeout Timeout. + * @param reads Read entries. + * @param writes Write entries. + * @return Request. + */ + private GridNearTxPrepareRequest createRequest( + Map> txNodes, + MiniFuture fut, + long timeout, + Collection reads, + Collection writes) { + GridDistributedTxMapping m = fut.mapping(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + timeout, + reads, + writes, + m.hasNearCacheEntries(), + txNodes, + m.last(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash(), + m.clientFirst(), + tx.activeCachesDeploymentEnabled()); + + for (IgniteTxEntry txEntry : writes) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + req.miniId(fut.futureId()); + + return req; + } + + /** + * @param req Request. + * @param fut Future. + * @param nearEntries {@code True} if prepare near cache entries. + */ + private void prepareLocal(GridNearTxPrepareRequest req, + final MiniFuture fut, + final boolean nearEntries) { + IgniteInternalFuture prepFut = nearEntries ? + cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) : + cctx.tm().txHandler().prepareColocatedTx(tx, req); + + prepFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture prepFut) { + try { + fut.onResult(prepFut.get(), nearEntries); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + }); + } + + /** * @param entry Transaction entry. * @param topVer Topology version. * @param curMapping Current mapping. + * @param txMapping Mapping. * @param remap Remap flag. * @param topLocked Topology locked flag. */ private void map( IgniteTxEntry entry, AffinityTopologyVersion topVer, - Map, GridDistributedTxMapping> curMapping, + Map curMapping, GridDhtTxMapping txMapping, boolean remap, boolean topLocked @@ -565,30 +641,25 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } } - IgniteBiTuple key = F.t(primary, cacheCtx.isNear()); - - GridDistributedTxMapping cur = curMapping.get(key); + GridDistributedTxMapping cur = curMapping.get(primary.id()); if (cur == null) { cur = new GridDistributedTxMapping(primary); - curMapping.put(key, cur); - - if (primary.isLocal()) { - if (entry.context().isNear()) - tx.nearLocallyMapped(true); - else if (entry.context().isColocated()) - tx.colocatedLocallyMapped(true); - } - - // Initialize near flag right away. - cur.near(cacheCtx.isNear()); + curMapping.put(primary.id(), cur); cur.clientFirst(!topLocked && cctx.kernalContext().clientNode()); cur.last(true); } + if (primary.isLocal()) { + if (cacheCtx.isNear()) + tx.nearLocallyMapped(true); + else if (cacheCtx.isColocated()) + tx.colocatedLocallyMapped(true); + } + cur.add(entry); if (entry.explicitVersion() != null) { @@ -683,9 +754,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * */ private static class MiniFuture extends GridFutureAdapter { - /** */ - private static final long serialVersionUID = 0L; - /** Receive result flag updater. */ private static AtomicIntegerFieldUpdater RCV_RES_UPD = AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); @@ -773,9 +841,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * @param res Result callback. + * @param updateMapping Update mapping flag. */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - void onResult(final GridNearTxPrepareResponse res) { + void onResult(final GridNearTxPrepareResponse res, boolean updateMapping) { if (isDone()) return; @@ -878,7 +947,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim onDone(res); } else { - parent.onPrepareResponse(m, res); + parent.onPrepareResponse(m, res, updateMapping); // Finish this mini future (need result only on client node). onDone(parent.cctx.kernalContext().clientNode() ? res : null); @@ -892,8 +961,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim */ private void remap(final GridNearTxPrepareResponse res) { parent.prepareOnTopology(true, new Runnable() { - @Override - public void run() { + @Override public void run() { onDone(res); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index bc47c13..f4ce1ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -345,6 +345,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param write Write. * @param topLocked {@code True} if thread already acquired lock preventing topology change. + * @param remap Remap flag. */ private void prepareSingle(IgniteTxEntry write, boolean topLocked, boolean remap) { write.clearEntryReadVersion(); @@ -382,7 +383,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa tx.transactionNodes(txMapping.transactionNodes()); - checkOnePhase(txMapping); + if (!write.context().isNear()) + checkOnePhase(txMapping); + + assert !(mapping.hasColocatedCacheEntries() && mapping.hasNearCacheEntries()) : mapping; proceedPrepare(mapping, null); } @@ -390,6 +394,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param writes Write entries. * @param topLocked {@code True} if thread already acquired lock preventing topology change. + * @param remap Remap flag. */ private void prepare( Iterable writes, @@ -402,24 +407,37 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa txMapping = new GridDhtTxMapping(); - Map map = new HashMap<>(); + Map map = new HashMap<>(); // Assign keys to primary nodes. GridDistributedTxMapping cur = null; Queue mappings = new ArrayDeque<>(); + boolean hasNearCache = false; + for (IgniteTxEntry write : writes) { write.clearEntryReadVersion(); GridDistributedTxMapping updated = map(write, topVer, cur, topLocked, remap); + if (write.context().isNear()) + hasNearCache = true; + if (cur != updated) { mappings.offer(updated); updated.last(true); - GridDistributedTxMapping prev = map.put(updated.primary().id(), updated); + ClusterNode primary = updated.primary(); + + assert !primary.isLocal() || !cctx.kernalContext().clientNode(); + + // Minor optimization to not create MappingKey: on client node can not have mapping for local node. + Object key = cctx.kernalContext().clientNode() ? primary.id() : + new MappingKey(primary.id(), primary.isLocal() && updated.hasNearCacheEntries()); + + GridDistributedTxMapping prev = map.put(key, updated); if (prev != null) prev.last(false); @@ -451,7 +469,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa tx.transactionNodes(txMapping.transactionNodes()); - checkOnePhase(txMapping); + if (!hasNearCache) + checkOnePhase(txMapping); proceedPrepare(mappings); } @@ -497,7 +516,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa timeout, null, m.writes(), - m.near(), + m.hasNearCacheEntries(), txMapping.transactionNodes(), m.last(), tx.onePhaseCommit(), @@ -515,14 +534,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } // Must lock near entries separately. - if (m.near()) { + if (m.hasNearCacheEntries()) { try { - tx.optimisticLockEntries(req.writes()); - - tx.userPrepare(); + cctx.tm().prepareTx(tx, m.nearCacheEntries()); } catch (IgniteCheckedException e) { onError(e, false); + + return; } } @@ -532,13 +551,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa add(fut); // Append new future. - // If this is the primary node for the keys. if (n.isLocal()) { - // At this point, if any new node joined, then it is - // waiting for this transaction to complete, so - // partition reassignments are not possible here. + assert !(m.hasColocatedCacheEntries() && m.hasNearCacheEntries()) : m; + IgniteInternalFuture prepFut = - cctx.tm().txHandler().prepareTx(n.id(), tx, req); + m.hasNearCacheEntries() ? cctx.tm().txHandler().prepareNearTx(n.id(), req, true) + : cctx.tm().txHandler().prepareColocatedTx(tx, req); prepFut.listen(new CI1>() { @Override public void apply(IgniteInternalFuture prepFut) { @@ -590,6 +608,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param topVer Topology version. * @param cur Current mapping. * @param topLocked {@code True} if thread already acquired lock preventing topology change. + * @param remap Remap flag. * @return Mapping. */ private GridDistributedTxMapping map( @@ -644,14 +663,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - if (cur == null || !cur.primary().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { + if (cur == null || !cur.primary().id().equals(primary.id()) || + (primary.isLocal() && cur.hasNearCacheEntries() != cacheCtx.isNear())) { boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode(); cur = new GridDistributedTxMapping(primary); - // Initialize near flag right away. - cur.near(cacheCtx.isNear()); - cur.clientFirst(clientFirst); } @@ -908,7 +925,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa remap(); } else { - parent.onPrepareResponse(m, res); + parent.onPrepareResponse(m, res, m.hasNearCacheEntries()); // Proceed prepare before finishing mini future. if (mappings != null) @@ -937,4 +954,44 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); } } + + /** + * + */ + private static class MappingKey { + /** */ + private final UUID nodeId; + + /** */ + private final boolean nearEntries; + + /** + * @param nodeId Node ID. + * @param nearEntries Near cache entries flag (should be true only for local node). + */ + MappingKey(UUID nodeId, boolean nearEntries) { + this.nodeId = nodeId; + this.nearEntries = nearEntries; + } + + /** {@inheritDoc} */ + @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") + @Override public boolean equals(Object o) { + MappingKey that = (MappingKey) o; + + return nearEntries == that.nearEntries && nodeId.equals(that.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + res = 31 * res + (nearEntries ? 1 : 0); + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MappingKey.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index cb15bca..e934319 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,7 +45,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -102,7 +102,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (f != null) { assert f.primary().id().equals(nodeId); - f.onResult(res); + f.onResult(res, true); } else { if (msgLog.isDebugEnabled()) { @@ -169,7 +169,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } try { - tx.userPrepare(); + tx.userPrepare(Collections.emptyList()); cctx.mvcc().addFuture(this); @@ -181,20 +181,97 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } /** + * @param txNodes Tx nodes. + * @param m Mapping. + * @param timeout Timeout. + * @param reads Reads. + * @param writes Writes. + * @return Request. + */ + private GridNearTxPrepareRequest createRequest(Map> txNodes, + GridDistributedTxMapping m, + long timeout, + Collection reads, + Collection writes) { + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + timeout, + reads, + writes, + m.hasNearCacheEntries(), + txNodes, + true, + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash(), + false, + tx.activeCachesDeploymentEnabled()); + + for (IgniteTxEntry txEntry : writes) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + return req; + } + + /** + * @param req Request. + * @param m Mapping. + * @param miniId Mini future ID. + * @param nearEntries {@code True} if prepare near cache entries. + */ + private void prepareLocal(GridNearTxPrepareRequest req, + GridDistributedTxMapping m, + int miniId, + final boolean nearEntries) { + final MiniFuture fut = new MiniFuture(m, miniId); + + req.miniId(fut.futureId()); + + add(fut); + + IgniteInternalFuture prepFut = nearEntries ? + cctx.tm().txHandler().prepareNearTx(cctx.localNodeId(), req, true) : + cctx.tm().txHandler().prepareColocatedTx(tx, req); + + prepFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture prepFut) { + try { + fut.onResult(prepFut.get(), nearEntries); + } + catch (IgniteCheckedException e) { + fut.onError(e); + } + } + }); + } + + /** * */ private void preparePessimistic() { - Map, GridDistributedTxMapping> mappings = new HashMap<>(); + Map mappings = new HashMap<>(); AffinityTopologyVersion topVer = tx.topologyVersion(); GridDhtTxMapping txMapping = new GridDhtTxMapping(); + boolean hasNearCache = false; + for (IgniteTxEntry txEntry : tx.allEntries()) { txEntry.clearEntryReadVersion(); GridCacheContext cacheCtx = txEntry.context(); + if (cacheCtx.isNear()) + hasNearCache = true; + List nodes; if (!cacheCtx.isLocal()) { @@ -205,21 +282,14 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA else nodes = cacheCtx.affinity().nodesByKey(txEntry.key(), topVer); - ClusterNode primary = F.first(nodes); - - boolean near = cacheCtx.isNear(); + assert !nodes.isEmpty(); - IgniteBiTuple key = F.t(primary, near); + ClusterNode primary = nodes.get(0); - GridDistributedTxMapping nodeMapping = mappings.get(key); + GridDistributedTxMapping nodeMapping = mappings.get(primary.id()); - if (nodeMapping == null) { - nodeMapping = new GridDistributedTxMapping(primary); - - nodeMapping.near(cacheCtx.isNear()); - - mappings.put(key, nodeMapping); - } + if (nodeMapping == null) + mappings.put(primary.id(), nodeMapping = new GridDistributedTxMapping(primary)); txEntry.nodeId(primary.id()); @@ -230,7 +300,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA tx.transactionNodes(txMapping.transactionNodes()); - checkOnePhase(txMapping); + if (!hasNearCache) + checkOnePhase(txMapping); long timeout = tx.remainingTime(); @@ -242,56 +313,48 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA int miniId = 0; + Map> txNodes = txMapping.transactionNodes(); + for (final GridDistributedTxMapping m : mappings.values()) { final ClusterNode primary = m.primary(); - GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( - futId, - tx.topologyVersion(), - tx, - timeout, - m.reads(), - m.writes(), - m.near(), - txMapping.transactionNodes(), - true, - tx.onePhaseCommit(), - tx.needReturnValue() && tx.implicit(), - tx.implicitSingle(), - m.explicitLock(), - tx.subjectId(), - tx.taskNameHash(), - false, - tx.activeCachesDeploymentEnabled()); - - for (IgniteTxEntry txEntry : m.entries()) { - if (txEntry.op() == TRANSFORM) - req.addDhtVersion(txEntry.txKey(), null); + if (primary.isLocal()) { + if (m.hasNearCacheEntries() && m.hasColocatedCacheEntries()) { + GridNearTxPrepareRequest nearReq = createRequest(txMapping.transactionNodes(), + m, + timeout, + m.nearEntriesReads(), + m.nearEntriesWrites()); + + prepareLocal(nearReq, m, ++miniId, true); + + GridNearTxPrepareRequest colocatedReq = createRequest(txNodes, + m, + timeout, + m.colocatedEntriesReads(), + m.colocatedEntriesWrites()); + + prepareLocal(colocatedReq, m, ++miniId, false); + } + else { + GridNearTxPrepareRequest req = createRequest(txNodes, m, timeout, m.reads(), m.writes()); + + prepareLocal(req, m, ++miniId, m.hasNearCacheEntries()); + } } + else { + GridNearTxPrepareRequest req = createRequest(txNodes, + m, + timeout, + m.reads(), + m.writes()); - final MiniFuture fut = new MiniFuture(m, ++miniId); + final MiniFuture fut = new MiniFuture(m, ++miniId); - req.miniId(fut.futureId()); + req.miniId(fut.futureId()); - add(fut); + add(fut); - if (primary.isLocal()) { - IgniteInternalFuture prepFut = cctx.tm().txHandler().prepareTx(primary.id(), - tx, - req); - - prepFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture prepFut) { - try { - fut.onResult(prepFut.get()); - } - catch (IgniteCheckedException e) { - fut.onError(e); - } - } - }); - } - else { try { cctx.io().send(primary, req, tx.ioPolicy()); @@ -395,12 +458,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA /** * @param res Response. + * @param updateMapping Update mapping flag. */ - void onResult(GridNearTxPrepareResponse res) { + void onResult(GridNearTxPrepareResponse res, boolean updateMapping) { if (res.error() != null) onError(res.error()); else { - onPrepareResponse(m, res); + onPrepareResponse(m, res, updateMapping); onDone(res); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 37be0fb..89874ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -642,7 +642,7 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit * @param mapping Mapping to finish. */ private void readyNearMappingFromBackup(GridDistributedTxMapping mapping) { - if (mapping.near()) { + if (mapping.hasNearCacheEntries()) { GridCacheVersion xidVer = tx.xidVersion(); mapping.dhtVersion(xidVer, xidVer); @@ -676,7 +676,7 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit private void finish(int miniId, GridDistributedTxMapping m, boolean commit) { ClusterNode n = m.primary(); - assert !m.empty(); + assert !m.empty() : m; CacheWriteSynchronizationMode syncMode = tx.syncMode(); @@ -698,7 +698,7 @@ public final class GridNearTxFinishFuture extends GridCacheCompoundIdentit m.explicitLock(), tx.storeEnabled(), tx.topologyVersion(), - completedVer, // Reuse 'baseVersion' to do not add new fields in message. + completedVer, // Reuse 'baseVersion' to do not add new fields in message. null, null, tx.size(), http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 62af536..f795ddc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -151,9 +151,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea @GridToStringExclude private volatile GridNearTxFinishFuture rollbackFut; - /** Entries to lock on next step of prepare stage. */ - private Collection optimisticLockEntries = Collections.emptyList(); - /** True if transaction contains near cache entries mapped to local node. */ private boolean nearLocallyMapped; @@ -2425,14 +2422,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea /** {@inheritDoc} */ @Override public Collection optimisticLockEntries() { - return optimisticLockEntries; - } + assert false : "Should not be called"; - /** - * @param optimisticLockEntries Optimistic lock entries. - */ - void optimisticLockEntries(Collection optimisticLockEntries) { - this.optimisticLockEntries = optimisticLockEntries; + throw new UnsupportedOperationException(); } /** @@ -2862,8 +2854,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea if (m == null) { mappings.put(m = new GridDistributedTxMapping(primary)); - m.near(map.near()); - if (map.explicitLock()) m.markExplicitLock(); } @@ -2889,8 +2879,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea mappings.put(m); - m.near(map.near()); - if (map.explicitLock()) m.markExplicitLock(); @@ -2933,14 +2921,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea Collection committedVers, Collection rolledbackVers) { + assert mapping.hasNearCacheEntries() : mapping; + // Process writes, then reads. for (IgniteTxEntry txEntry : mapping.entries()) { - if (CU.writes().apply(txEntry)) + if (CU.WRITE_FILTER_NEAR.apply(txEntry)) readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers); } for (IgniteTxEntry txEntry : mapping.entries()) { - if (CU.reads().apply(txEntry)) + if (CU.READ_FILTER_NEAR.apply(txEntry)) readyNearLock(txEntry, mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers); } } @@ -2952,7 +2942,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. */ - void readyNearLock(IgniteTxEntry txEntry, + private void readyNearLock(IgniteTxEntry txEntry, GridCacheVersion dhtVer, Collection pendingVers, Collection committedVers, @@ -3333,11 +3323,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea needReturnValue() && implicit()); try { - // At this point all the entries passed in must be enlisted in transaction because this is an - // optimistic transaction. - optimisticLockEntries = (serializable() && optimistic()) ? F.concat(false, writes, reads) : writes; - - userPrepare(); + userPrepare((serializable() && optimistic()) ? F.concat(false, writes, reads) : writes); // Make sure to add future before calling prepare on it. cctx.mvcc().addFuture(fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index 7f1f5a2..004e4da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -172,7 +172,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends Collection backups = entry.getValue(); - if (backups.size() <= 1) + if (backups.size() <= 1 && !tx.txState().hasNearCacheConfigured(cctx, tx.topologyVersion())) tx.onePhaseCommit(true); } } @@ -180,9 +180,12 @@ public abstract class GridNearTxPrepareFutureAdapter extends /** * @param m Mapping. * @param res Response. + * @param updateMapping Update mapping flag. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - final void onPrepareResponse(GridDistributedTxMapping m, GridNearTxPrepareResponse res) { + final void onPrepareResponse(GridDistributedTxMapping m, + GridNearTxPrepareResponse res, + boolean updateMapping) { if (res == null) return; @@ -245,24 +248,25 @@ public abstract class GridNearTxPrepareFutureAdapter extends } if (!m.empty()) { - GridCacheVersion writeVer = res.writeVersion(); - - if (writeVer == null) - writeVer = res.dhtVersion(); - // This step is very important as near and DHT versions grow separately. cctx.versions().onReceived(nodeId, res.dhtVersion()); - // Register DHT version. - m.dhtVersion(res.dhtVersion(), writeVer); + if (updateMapping && m.hasNearCacheEntries()) { + GridCacheVersion writeVer = res.writeVersion(); + + if (writeVer == null) + writeVer = res.dhtVersion(); - GridDistributedTxMapping map = tx.mappings().get(nodeId); + // Register DHT version. + m.dhtVersion(res.dhtVersion(), writeVer); - if (map != null) - map.dhtVersion(res.dhtVersion(), writeVer); + GridDistributedTxMapping map = tx.mappings().get(nodeId); + + if (map != null) + map.dhtVersion(res.dhtVersion(), writeVer); - if (m.near()) tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions()); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 153ad04..a591517 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -111,20 +111,17 @@ public class IgniteTxHandler { /** * @param nearNodeId Node ID. * @param req Request. - * @return Prepare future. */ - private IgniteInternalFuture processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) { + private void processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) { if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() + ", node=" + nearNodeId + ']'); } - IgniteInternalFuture fut = prepareTx(nearNodeId, null, req); + IgniteInternalFuture fut = prepareNearTx(nearNodeId, req, false); assert req.txState() != null || fut == null || fut.error() != null || (ctx.tm().tx(req.version()) == null && ctx.tm().nearTx(req.version()) == null); - - return fut; } /** @@ -209,41 +206,13 @@ public class IgniteTxHandler { } /** - * @param nearNodeId Near node ID that initiated transaction. - * @param locTx Optional local transaction. - * @param req Near prepare request. - * @return Future for transaction. - */ - public IgniteInternalFuture prepareTx( - UUID nearNodeId, - @Nullable GridNearTxLocal locTx, - GridNearTxPrepareRequest req - ) { - assert nearNodeId != null; - assert req != null; - - if (locTx != null) { - if (req.near()) { - // Make sure not to provide Near entries to DHT cache. - req.cloneEntries(); - - return prepareNearTx(nearNodeId, req); - } - else - return prepareColocatedTx(locTx, req); - } - else - return prepareNearTx(nearNodeId, req); - } - - /** * Prepares local colocated tx. * * @param locTx Local transaction. * @param req Near prepare request. * @return Prepare future. */ - private IgniteInternalFuture prepareColocatedTx( + public IgniteInternalFuture prepareColocatedTx( final GridNearTxLocal locTx, final GridNearTxPrepareRequest req ) { @@ -308,16 +277,20 @@ public class IgniteTxHandler { } /** - * Prepares near transaction. - * * @param nearNodeId Near node ID that initiated transaction. * @param req Near prepare request. + * @param locReq Local request flag. * @return Prepare future. */ - private IgniteInternalFuture prepareNearTx( + public IgniteInternalFuture prepareNearTx( final UUID nearNodeId, - final GridNearTxPrepareRequest req + final GridNearTxPrepareRequest req, + boolean locReq ) { + // Make sure not to provide Near entries to DHT cache. + if (locReq) + req.cloneEntries(); + ClusterNode nearNode = ctx.node(nearNodeId); if (nearNode == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index 5743bfb..36f5f2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -293,6 +294,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ + @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) { + return cacheCtx != null ? ctx.discovery().hasNearCache(cacheCtx.cacheId(), topVer) : false; + } + + /** {@inheritDoc} */ public String toString() { return S.toString(IgniteTxImplicitSingleStateImpl.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index a59ff51..5a708d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -225,7 +225,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** {@inheritDoc} */ - @Override public IgniteTxState txState() { + @Override public IgniteTxLocalState txState() { return txState; } @@ -401,10 +401,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig } /** + * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}. * @throws IgniteCheckedException If prepare step failed. */ @SuppressWarnings({"CatchGenericClass"}) - public void userPrepare() throws IgniteCheckedException { + public void userPrepare(@Nullable Collection entries) throws IgniteCheckedException { if (state() != PREPARING) { if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); @@ -420,7 +421,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig checkValid(); try { - cctx.tm().prepareTx(this); + cctx.tm().prepareTx(this, entries); } catch (IgniteCheckedException e) { throw e; http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java index 123d396..fe9fcbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache.transactions; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; + /** * */ @@ -41,4 +44,11 @@ public interface IgniteTxLocalState extends IgniteTxState { * */ public void seal(); + + /** + * @param ctx Context. + * @param topVer Topology version. + * @return {@code True} if tx has cache with created near cache. + */ + public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index da49c06..2da8dee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -774,12 +774,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Handles prepare stage of 2PC. + * Handles prepare stage. * * @param tx Transaction to prepare. + * @param entries Entries to lock or {@code null} if use default {@link IgniteInternalTx#optimisticLockEntries()}. * @throws IgniteCheckedException If preparation failed. */ - public void prepareTx(IgniteInternalTx tx) throws IgniteCheckedException { + public void prepareTx(IgniteInternalTx tx, @Nullable Collection entries) throws IgniteCheckedException { if (tx.state() == MARKED_ROLLBACK) { if (tx.remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); @@ -799,7 +800,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // Optimistic. assert tx.optimistic() || !tx.local(); - if (!lockMultiple(tx, tx.optimisticLockEntries())) { + if (!lockMultiple(tx, entries != null ? entries : tx.optimisticLockEntries())) { tx.setRollbackOnly(); throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction (lock conflict): " + tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java index 7a45b6e..ed2526e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -85,7 +85,7 @@ public interface IgniteTxState { public boolean hasNearCache(GridCacheSharedContext cctx); /** - * @param cacheCtx Ccntext. + * @param cacheCtx Context. * @param tx Transaction. * @throws IgniteCheckedException If cache check failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 304473e..3679208 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -28,12 +28,14 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheInterceptor; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; -import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridIntList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -52,7 +54,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC */ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** Active cache IDs. */ - private GridLongList activeCacheIds = new GridLongList(); + private GridIntList activeCacheIds = new GridIntList(); /** Per-transaction read map. */ @GridToStringInclude @@ -77,13 +79,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Nullable @Override public Integer firstCacheId() { - return activeCacheIds.isEmpty() ? null : (int)activeCacheIds.get(0); + return activeCacheIds.isEmpty() ? null : activeCacheIds.get(0); } /** {@inheritDoc} */ @Override public void unwindEvicts(GridCacheSharedContext cctx) { for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int) activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); GridCacheContext ctx = cctx.cacheContext(cacheId); @@ -95,7 +97,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) { if (activeCacheIds.size() == 1) { - int cacheId = (int)activeCacheIds.get(0); + int cacheId = activeCacheIds.get(0); return cctx.cacheContext(cacheId); } @@ -106,7 +108,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public void awaitLastFut(GridCacheSharedContext cctx) { for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); cctx.cacheContext(cacheId).cache().awaitLastFut(); } @@ -157,7 +159,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { } for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); GridCacheContext cacheCtx = cctx.cacheContext(cacheId); @@ -175,7 +177,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { CacheWriteSynchronizationMode syncMode = CacheWriteSynchronizationMode.FULL_ASYNC; for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); CacheWriteSynchronizationMode cacheSyncMode = cctx.cacheContext(cacheId).config().getWriteSynchronizationMode(); @@ -202,7 +204,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public boolean hasNearCache(GridCacheSharedContext cctx) { for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); GridCacheContext cacheCtx = cctx.cacheContext(cacheId); @@ -236,7 +238,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { int idx = 0; for (int i = 0; i < activeCacheIds.size(); i++) { - int activeCacheId = (int)activeCacheIds.get(i); + int activeCacheId = activeCacheIds.get(i); cacheNames.append(cctx.cacheContext(activeCacheId).name()); @@ -267,7 +269,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { GridCacheContext nonLocCtx = null; for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); GridCacheContext cacheCtx = cctx.cacheContext(cacheId); @@ -299,7 +301,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { GridCacheContext nonLocCtx = null; for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); GridCacheContext cacheCtx = cctx.cacheContext(cacheId); @@ -319,7 +321,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { if (!activeCacheIds.isEmpty()) { for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); CacheStoreManager store = cctx.cacheContext(cacheId).store(); @@ -334,7 +336,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public boolean hasInterceptor(GridCacheSharedContext cctx) { for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); CacheInterceptor interceptor = cctx.cacheContext(cacheId).config().getInterceptor(); @@ -347,13 +349,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public Collection stores(GridCacheSharedContext cctx) { - GridLongList cacheIds = activeCacheIds; + GridIntList cacheIds = activeCacheIds; if (!cacheIds.isEmpty()) { Collection stores = new ArrayList<>(cacheIds.size()); for (int i = 0; i < cacheIds.size(); i++) { - int cacheId = (int)cacheIds.get(i); + int cacheId = cacheIds.get(i); CacheStoreManager store = cctx.cacheContext(cacheId).store(); @@ -370,7 +372,7 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { /** {@inheritDoc} */ @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { for (int i = 0; i < activeCacheIds.size(); i++) { - int cacheId = (int)activeCacheIds.get(i); + int cacheId = activeCacheIds.get(i); GridCacheContext cacheCtx = cctx.cacheContext(cacheId); @@ -474,6 +476,22 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ + @Override public boolean hasNearCacheConfigured(GridCacheSharedContext ctx, AffinityTopologyVersion topVer) { + DiscoCache discoCache = ctx.discovery().discoCache(topVer); + + assert discoCache != null : topVer; + + for (int i = 0; i < activeCacheIds.size(); i++) { + int cacheId = activeCacheIds.get(i); + + if (discoCache.hasNearCache(cacheId)) + return true; + } + + return false; + } + + /** {@inheritDoc} */ public String toString() { return S.toString(IgniteTxStateImpl.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java index b4785a7..ff0105b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateCollectionView.java @@ -22,7 +22,6 @@ import java.util.Iterator; import org.apache.ignite.internal.util.GridSerializableCollection; import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.NotNull; @@ -45,6 +44,7 @@ public class PredicateCollectionView extends GridSerializableCollection { * @param col Input col that serves as a base for the view. * @param preds Optional preds. If preds are not provided - all elements will be in the view. */ + @SafeVarargs public PredicateCollectionView(Collection col, IgnitePredicate... preds) { this.col = col; this.preds = preds; @@ -70,9 +70,4 @@ public class PredicateCollectionView extends GridSerializableCollection { @Override public boolean isEmpty() { return F.isEmpty(preds) ? col.isEmpty() : !iterator().hasNext(); } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PredicateCollectionView.class, this); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java index d5b97a6..01e6d8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateMapView.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.util.GridSerializableMap; import org.apache.ignite.internal.util.GridSerializableSet; import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -113,9 +112,4 @@ public class PredicateMapView extends GridSerializableMap { @Override public boolean containsKey(Object key) { return GridFunc.isAll((K)key, preds) && map.containsKey(key); } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PredicateMapView.class, this); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java index 99fc2fd..8937107 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/PredicateSetView.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.util.GridSerializableMap; import org.apache.ignite.internal.util.GridSerializableSet; import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.NotNull; @@ -145,9 +144,4 @@ public class PredicateSetView extends GridSerializableMap { @Override public boolean containsKey(Object key) { return GridFunc.isAll((K)key, preds) && set.contains(key); } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(PredicateSetView.class, this); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java index 8186914..d8aa1d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView.java @@ -22,7 +22,6 @@ import java.util.Iterator; import org.apache.ignite.internal.util.GridSerializableCollection; import org.apache.ignite.internal.util.GridSerializableIterator; import org.apache.ignite.internal.util.lang.GridFunc; -import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -87,9 +86,4 @@ public class ReadOnlyCollectionView extends GridSerializableCollection { @Override public boolean equals(Object obj) { return obj instanceof Collection && GridFunc.eqNotOrdered(this, (Collection)obj); } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ReadOnlyCollectionView.class, this); - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a5088265/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java index 82ec651..7a60e17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/gridfunc/ReadOnlyCollectionView2X.java @@ -22,7 +22,6 @@ import java.util.Iterator; import org.apache.ignite.internal.util.GridSerializableCollection; import org.apache.ignite.internal.util.GridSerializableIterator; import org.apache.ignite.internal.util.lang.GridFunc; -import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.NotNull; /** @@ -92,9 +91,4 @@ public class ReadOnlyCollectionView2X extends GridSerializableCollection { @Override public boolean equals(Object obj) { return obj instanceof Collection && GridFunc.eqNotOrdered(this, (Collection)obj); } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ReadOnlyCollectionView2X.class, this); - } }