Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 30FC0109CB for ; Wed, 20 May 2015 18:27:31 +0000 (UTC) Received: (qmail 93949 invoked by uid 500); 20 May 2015 18:27:31 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 93917 invoked by uid 500); 20 May 2015 18:27:31 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 93907 invoked by uid 99); 20 May 2015 18:27:31 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 May 2015 18:27:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id A2256C11B9 for ; Wed, 20 May 2015 18:27:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id SDRIu-0w-5at for ; Wed, 20 May 2015 18:27:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 2F85E48484 for ; Wed, 20 May 2015 18:27:27 +0000 (UTC) Received: (qmail 93784 invoked by uid 99); 20 May 2015 18:27:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 May 2015 18:27:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 49FB1DFBDC; Wed, 20 May 2015 18:27:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 20 May 2015 18:27:26 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/24] incubator-ignite git commit: IGNITE-80 - Porting changes to a separate branch. Repository: incubator-ignite Updated Branches: refs/heads/ignite-916 86ed8b825 -> b4aa05d8f (forced update) IGNITE-80 - Porting changes to a separate branch. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dcda61b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dcda61b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dcda61b4 Branch: refs/heads/ignite-916 Commit: dcda61b4fe2be3005544a3fc915b19ac3e4c9598 Parents: 1e53395 Author: Alexey Goncharuk Authored: Wed Apr 29 14:08:05 2015 -0700 Committer: Alexey Goncharuk Committed: Wed Apr 29 14:08:05 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 5 +-- .../GridCachePartitionExchangeManager.java | 4 +- .../distributed/dht/GridDhtCacheAdapter.java | 6 ++- .../dht/atomic/GridDhtAtomicCache.java | 4 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 42 +++++++++++++++----- .../dht/atomic/GridNearAtomicUpdateRequest.java | 36 ++++++++++++++--- .../colocated/GridDhtColocatedLockFuture.java | 4 +- .../cache/transactions/IgniteTxManager.java | 24 +++++++++++ 8 files changed, 101 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index b8668e6..112330a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -146,9 +146,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { c = clsHandlers.get(new ListenerKey(cacheMsg.cacheId(), cacheMsg.getClass())); if (c == null) { - if (log.isDebugEnabled()) - log.debug("Received message without registered handler (will ignore) [msg=" + cacheMsg + - ", nodeId=" + nodeId + ']'); + U.warn(log, "Received message without registered handler (will ignore) [msg=" + cacheMsg + + ", nodeId=" + nodeId + ']'); return; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5f82ae2..e61168e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -409,10 +409,10 @@ public class GridCachePartitionExchangeManager extends GridCacheSharedMana * @param ver Topology version. * @return Future or {@code null} is future is already completed. */ - public @Nullable IgniteInternalFuture affinityReadyFuture(AffinityTopologyVersion ver) { + @Nullable public IgniteInternalFuture affinityReadyFuture(AffinityTopologyVersion ver) { GridDhtPartitionsExchangeFuture lastInitializedFut0 = lastInitializedFut; - if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) >= 0) { + if (lastInitializedFut0 != null && lastInitializedFut0.topologyVersion().compareTo(ver) == 0) { if (log.isDebugEnabled()) log.debug("Return lastInitializedFut for topology ready future " + "[ver=" + ver + ", fut=" + lastInitializedFut0 + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 1c46fd0..4d1db85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -645,8 +645,10 @@ public abstract class GridDhtCacheAdapter extends GridDistributedCacheAdap res.error(e); } - res.invalidPartitions(fut.invalidPartitions(), - new AffinityTopologyVersion(ctx.discovery().topologyVersion())); + if (!F.isEmpty(fut.invalidPartitions())) + res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().readyAffinityVersion()); + else + res.invalidPartitions(fut.invalidPartitions(), req.topologyVersion()); try { ctx.io().send(nodeId, res, ctx.ioPolicy()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 905f7bf..a30f211 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1041,7 +1041,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { // Do not check topology version for CLOCK versioning since // partition exchange will wait for near update future. - if (topology().topologyVersion().equals(req.topologyVersion()) || + // Also do not check topology version if topology was locked on near node by + // external transaction or explicit lock. + if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() || ctx.config().getAtomicWriteOrderMode() == CLOCK) { ClusterNode node = ctx.discovery().node(nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 072ab52..3dc89f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.cache.dr.*; +import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; @@ -136,6 +137,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** Task name hash. */ private final int taskNameHash; + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + private boolean topLocked; + /** Skip store flag. */ private final boolean skipStore; @@ -289,7 +293,23 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter * @param waitTopFut Whether to wait for topology future. */ public void map(boolean waitTopFut) { - mapOnTopology(keys, false, null, waitTopFut); + AffinityTopologyVersion topVer = null; + + IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); + + if (tx != null && tx.topologyVersionSnapshot() != null) + topVer = tx.topologyVersionSnapshot(); + + if (topVer == null) + topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + if (topVer == null) + mapOnTopology(keys, false, null, waitTopFut); + else { + topLocked = true; + + map0(topVer, keys, false, null); + } } /** {@inheritDoc} */ @@ -430,15 +450,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter } topVer = fut.topologyVersion(); - - if (futVer == null) - // Assign future version in topology read lock before first exception may be thrown. - futVer = cctx.versions().next(topVer); } else { if (waitTopFut) { fut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture t) { + @Override + public void apply(IgniteInternalFuture t) { mapOnTopology(keys, remap, oldNodeId, waitTopFut); } }); @@ -448,9 +465,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter return; } - - if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC)) - cctx.mvcc().addAtomicFuture(version(), this); } finally { cache.topology().readUnlock(); @@ -474,6 +488,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter } /** + * @param topVer Topology version. * @param keys Keys to map. * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. @@ -494,6 +509,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter return; } + if (futVer == null) + // Assign future version in topology read lock before first exception may be thrown. + futVer = cctx.versions().next(topVer); + + if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC)) + cctx.mvcc().addAtomicFuture(version(), this); + CacheConfiguration ccfg = cctx.config(); // Assign version on near node in CLOCK ordering mode even if fastMap is false. @@ -579,6 +601,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter fastMap, updVer, topVer, + topLocked, syncMode, op, retval, @@ -716,6 +739,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter fastMap, updVer, topVer, + topLocked, syncMode, op, retval, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index e0e3e26..a96a666 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -64,6 +64,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Topology version. */ private AffinityTopologyVersion topVer; + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + private boolean topLocked; + /** Write synchronization mode. */ private CacheWriteSynchronizationMode syncMode; @@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri boolean fastMap, @Nullable GridCacheVersion updateVer, @NotNull AffinityTopologyVersion topVer, + boolean topLocked, CacheWriteSynchronizationMode syncMode, GridCacheOperation op, boolean retval, @@ -179,6 +183,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.updateVer = updateVer; this.topVer = topVer; + this.topLocked = topLocked; this.syncMode = syncMode; this.op = op; this.retval = retval; @@ -254,6 +259,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } /** + * @return Topology locked flag. + */ + public boolean topologyLocked() { + return topLocked; + } + + /** * @return Cache write synchronization mode. */ public CacheWriteSynchronizationMode writeSynchronizationMode() { @@ -664,18 +676,24 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 20: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("topLocked", topLocked)) return false; writer.incrementState(); case 21: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 22: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 23: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -842,7 +860,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 20: - topVer = reader.readMessage("topVer"); + topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) return false; @@ -850,7 +868,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 21: - updateVer = reader.readMessage("updateVer"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -858,6 +876,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 22: + updateVer = reader.readMessage("updateVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 23: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -877,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 24; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 5b74b31..6292f2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -292,7 +292,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity false, false); - cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion())); + cand.topologyVersion(topVer.get()); } } else { @@ -311,7 +311,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity false, false); - cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion())); + cand.topologyVersion(topVer.get()); } else cand = cand.reenter(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dcda61b4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c494602..874e640 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -639,6 +639,30 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @return Any transaction associated with the current thread. + */ + public IgniteInternalTx anyActiveThreadTx() { + long threadId = Thread.currentThread().getId(); + + IgniteInternalTx tx = threadMap.get(threadId); + + if (tx != null && tx.topologyVersionSnapshot() != null) + return tx; + + for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) { + if (!cacheCtx.systemTx()) + continue; + + tx = sysThreadMap.get(new TxThreadKey(threadId, cacheCtx.cacheId())); + + if (tx != null && tx.topologyVersionSnapshot() != null) + return tx; + } + + return null; + } + + /** * @return Local transaction. */ @Nullable public IgniteInternalTx localTxx() {