http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 6ec02a6..cbaaed4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityAssignment;
+import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -43,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAssignmentFetchFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -62,7 +63,6 @@ import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
@@ -112,10 +112,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** Demand lock. */
private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
- /** Pending affinity assignment futures. */
- private ConcurrentMap<AffinityTopologyVersion, GridDhtAssignmentFetchFuture> pendingAssignmentFetchFuts =
- new ConcurrentHashMap8<>();
-
/** */
private final ConcurrentLinkedDeque8<GridDhtLocalPartition> partsToEvict = new ConcurrentLinkedDeque8<>();
@@ -145,11 +141,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " +
"order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']';
- if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) {
- for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
- fut.onNodeLeft(e.eventNode().id());
- }
-
if (!initRebalanceFut.isDone()) {
startFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
@@ -199,19 +190,16 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
});
- cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class,
- new MessageHandler<GridDhtAffinityAssignmentRequest>() {
- @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) {
- processAffinityAssignmentRequest(node, msg);
- }
- });
+ if (!cctx.kernalContext().clientNode()) {
+ cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentRequest.class,
+ new MessageHandler<GridDhtAffinityAssignmentRequest>() {
+ @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentRequest msg) {
+ processAffinityAssignmentRequest(node, msg);
+ }
+ });
+ }
- cctx.io().addHandler(cctx.cacheId(), GridDhtAffinityAssignmentResponse.class,
- new MessageHandler<GridDhtAffinityAssignmentResponse>() {
- @Override protected void onMessage(ClusterNode node, GridDhtAffinityAssignmentResponse msg) {
- processAffinityAssignmentResponse(node, msg);
- }
- });
+ cctx.shared().affinity().onCacheCreated(cctx);
supplier = new GridDhtPartitionSupplier(cctx);
demander = new GridDhtPartitionDemander(cctx, demandLock);
@@ -267,13 +255,10 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
- @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
- demander.updateLastExchangeFuture(lastFut);
- }
+ @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
+ supplier.onTopologyChanged(lastFut.topologyVersion());
- /** {@inheritDoc} */
- @Override public void onTopologyChanged(AffinityTopologyVersion topVer) {
- supplier.onTopologyChanged(topVer);
+ demander.updateLastExchangeFuture(lastFut);
}
/** {@inheritDoc} */
@@ -289,7 +274,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
assert exchFut.forcePreload() || exchFut.dummyReassign() ||
exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
"Topology version mismatch [exchId=" + exchFut.exchangeId() +
- ", topVer=" + top.topologyVersion() + ']';
+ ", cache=" + cctx.name() +
+ ", topVer=" + top.topologyVersion() + ']';
GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
@@ -458,26 +444,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/**
- * @param topVer Requested topology version.
- * @param fut Future to add.
- */
- public void addDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) {
- GridDhtAssignmentFetchFuture old = pendingAssignmentFetchFuts.putIfAbsent(topVer, fut);
-
- assert old == null : "More than one thread is trying to fetch partition assignments: " + topVer;
- }
-
- /**
- * @param topVer Requested topology version.
- * @param fut Future to remove.
- */
- public void removeDhtAssignmentFetchFuture(AffinityTopologyVersion topVer, GridDhtAssignmentFetchFuture fut) {
- boolean rmv = pendingAssignmentFetchFuts.remove(topVer, fut);
-
- assert rmv : "Failed to remove assignment fetch future: " + topVer;
- }
-
- /**
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
@@ -644,11 +610,23 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
log.debug("Affinity is ready for topology version, will send response [topVer=" + topVer +
", node=" + node + ']');
- List<List<ClusterNode>> assignment = cctx.affinity().assignments(topVer);
+ GridAffinityAssignment assignment = cctx.affinity().assignment(topVer);
+
+ boolean newAffMode = node.version().compareTo(CacheAffinitySharedManager.LATE_AFF_ASSIGN_SINCE) >= 0;
+
+ GridDhtAffinityAssignmentResponse res = new GridDhtAffinityAssignmentResponse(cctx.cacheId(),
+ topVer,
+ assignment.assignment(),
+ newAffMode);
+
+ if (newAffMode && cctx.affinity().affinityCache().centralizedAffinityFunction()) {
+ assert assignment.idealAssignment() != null;
+
+ res.idealAffinityAssignment(assignment.idealAssignment());
+ }
try {
- cctx.io().send(node,
- new GridDhtAffinityAssignmentResponse(cctx.cacheId(), topVer, assignment), AFFINITY_POOL);
+ cctx.io().send(node, res, AFFINITY_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send affinity assignment response to remote node [node=" + node + ']', e);
@@ -658,18 +636,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/**
- * @param node Node.
- * @param res Response.
- */
- private void processAffinityAssignmentResponse(ClusterNode node, GridDhtAffinityAssignmentResponse res) {
- if (log.isDebugEnabled())
- log.debug("Processing affinity assignment response [node=" + node + ", res=" + res + ']');
-
- for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
- fut.onResponse(node, res);
- }
-
- /**
* Resends partitions on partition evict within configured timeout.
*
* @param part Evicted partition.
@@ -833,13 +799,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
U.warn(log, ">>> " + fut);
}
- if (!pendingAssignmentFetchFuts.isEmpty()) {
- U.warn(log, "Pending assignment fetch futures [cache=" + cctx.name() +"]:");
-
- for (GridDhtAssignmentFetchFuture fut : pendingAssignmentFetchFuts.values())
- U.warn(log, ">>> " + fut);
- }
-
supplier.dumpDebugInfo();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 8483cb1..4b876b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -267,7 +267,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
false);
// init() will register future for responses if future has remote mappings.
- fut.init();
+ fut.init(null);
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 943a91a..d495f83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -48,7 +48,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
private static final int NEAR_SIZE_OVERHEAD = 36 + 16;
/** Topology version at the moment when value was initialized from primary node. */
- private volatile long topVer = -1L;
+ private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** DHT version which caused the last update. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -96,50 +96,34 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
@Override public boolean valid(AffinityTopologyVersion topVer) {
assert topVer.topologyVersion() > 0 : "Topology version is invalid: " + topVer;
- long topVer0 = this.topVer;
+ AffinityTopologyVersion topVer0 = this.topVer;
- if (topVer0 == topVer.topologyVersion())
+ if (topVer0.equals(topVer))
return true;
- if (topVer0 == -1L || topVer.topologyVersion() < topVer0)
+ if (topVer0.equals(AffinityTopologyVersion.NONE) || topVer.compareTo(topVer0) < 0)
return false;
try {
- ClusterNode primary = null;
+ if (cctx.affinity().primaryChanged(partition(), topVer0, topVer)) {
+ this.topVer = AffinityTopologyVersion.NONE;
- for (long ver = topVer0; ver <= topVer.topologyVersion(); ver++) {
- ClusterNode primary0 = cctx.affinity().primary(part, new AffinityTopologyVersion(ver));
-
- if (primary0 == null) {
- this.topVer = -1L;
-
- return false;
- }
-
- if (primary == null)
- primary = primary0;
- else {
- if (!primary.equals(primary0)) {
- this.topVer = -1L;
-
- return false;
- }
- }
+ return false;
}
if (cctx.affinity().backup(cctx.localNode(), part, topVer)) {
- this.topVer = -1L;
+ this.topVer = AffinityTopologyVersion.NONE;
return false;
}
- this.topVer = topVer.topologyVersion();
+ this.topVer = topVer;
return true;
}
catch (IllegalStateException ignore) {
// Do not have affinity history.
- this.topVer = -1L;
+ this.topVer = AffinityTopologyVersion.NONE;
return false;
}
@@ -147,61 +131,52 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
/**
* @param topVer Topology version.
- * @return {@code True} if this entry was initialized by this call.
* @throws GridCacheEntryRemovedException If this entry is obsolete.
*/
- public boolean initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
- while (true) {
- GridDhtCacheEntry entry = cctx.near().dht().peekExx(key);
+ public void initializeFromDht(AffinityTopologyVersion topVer) throws GridCacheEntryRemovedException {
+ GridDhtCacheEntry entry = cctx.near().dht().peekExx(key);
- if (entry != null) {
- GridCacheEntryInfo e = entry.info();
+ if (entry != null) {
+ GridCacheEntryInfo e = entry.info();
- if (e != null) {
- GridCacheVersion enqueueVer = null;
+ if (e != null) {
+ GridCacheVersion enqueueVer = null;
- try {
- synchronized (this) {
- checkObsolete();
+ try {
+ synchronized (this) {
+ checkObsolete();
- if (isNew() || !valid(topVer)) {
- // Version does not change for load ops.
- update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
+ if (isNew() || !valid(topVer)) {
+ // Version does not change for load ops.
+ update(e.value(), e.expireTime(), e.ttl(), e.isNew() ? ver : e.version(), true);
- if (cctx.deferredDelete() && !isNew() && !isInternal()) {
- boolean deleted = val == null;
+ if (cctx.deferredDelete() && !isNew() && !isInternal()) {
+ boolean deleted = val == null;
- if (deleted != deletedUnlocked()) {
- deletedUnlocked(deleted);
+ if (deleted != deletedUnlocked()) {
+ deletedUnlocked(deleted);
- if (deleted)
- enqueueVer = e.version();
- }
+ if (deleted)
+ enqueueVer = e.version();
}
+ }
- ClusterNode primaryNode = cctx.affinity().primary(key, topVer);
-
- if (primaryNode == null)
- this.topVer = -1L;
- else
- recordNodeId(primaryNode.id(), topVer);
-
- dhtVer = e.isNew() || e.isDeleted() ? null : e.version();
+ ClusterNode primaryNode = cctx.affinity().primary(key, topVer);
- return true;
- }
+ if (primaryNode == null)
+ this.topVer = AffinityTopologyVersion.NONE;
+ else
+ recordNodeId(primaryNode.id(), topVer);
- return false;
+ dhtVer = e.isNew() || e.isDeleted() ? null : e.version();
}
}
- finally {
- if (enqueueVer != null)
- cctx.onDeferredDelete(this, enqueueVer);
- }
+ }
+ finally {
+ if (enqueueVer != null)
+ cctx.onDeferredDelete(this, enqueueVer);
}
}
- else
- return false;
}
}
@@ -318,10 +293,15 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
@Override protected void recordNodeId(UUID primaryNodeId, AffinityTopologyVersion topVer) {
assert Thread.holdsLock(this);
+ assert topVer.compareTo(cctx.affinity().affinityTopologyVersion()) <= 0 : "Affinity not ready [" +
+ "topVer=" + topVer +
+ ", readyVer=" + cctx.affinity().affinityTopologyVersion() +
+ ", cache=" + cctx.name() + ']';
+
primaryNode(primaryNodeId, topVer);
}
- /*
+ /**
* @param dhtVer DHT version to record.
* @return {@code False} if given version is lower then existing version.
*/
@@ -661,7 +641,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
/** {@inheritDoc} */
@Override protected void onInvalidate() {
- topVer = -1L;
+ topVer = AffinityTopologyVersion.NONE;
dhtVer = null;
}
@@ -709,13 +689,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
}
if (primary == null || !nodeId.equals(primary.id())) {
- this.topVer = -1L;
+ this.topVer = AffinityTopologyVersion.NONE;
return;
}
- if (topVer.topologyVersion() > this.topVer)
- this.topVer = topVer.topologyVersion();
+ if (topVer.compareTo(this.topVer) > 0)
+ this.topVer = topVer;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 06fc0a5..c3d75c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -141,8 +141,10 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
/**
* Initializes future.
+ *
+ * @param topVer Topology version.
*/
- public void init() {
+ public void init(@Nullable AffinityTopologyVersion topVer) {
AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
if (lockedTopVer != null) {
@@ -151,11 +153,15 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), lockedTopVer);
}
else {
- AffinityTopologyVersion topVer = tx == null ?
- (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
- tx.topologyVersion();
+ AffinityTopologyVersion mapTopVer = topVer;
+
+ if (mapTopVer == null) {
+ mapTopVer = tx == null ?
+ (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
+ }
- map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), mapTopVer);
}
markInitialized();
@@ -982,18 +988,18 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
}
// Need to wait for next topology version to remap.
- IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion());
+ IgniteInternalFuture<AffinityTopologyVersion> topFut = cctx.affinity().affinityReadyFuture(rmtTopVer);
- topFut.listen(new CIX1<IgniteInternalFuture<Long>>() {
- @Override public void applyx(IgniteInternalFuture<Long> fut) throws IgniteCheckedException {
- long readyTopVer = fut.get();
+ topFut.listen(new CIX1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void applyx(IgniteInternalFuture<AffinityTopologyVersion> fut) throws IgniteCheckedException {
+ AffinityTopologyVersion readyTopVer = fut.get();
// This will append new futures to compound list.
map(F.view(keys.keySet(), new P1<KeyCacheObject>() {
@Override public boolean apply(KeyCacheObject key) {
return invalidParts.contains(cctx.affinity().partition(key));
}
- }), F.t(node, keys), new AffinityTopologyVersion(readyTopVer));
+ }), F.t(node, keys), readyTopVer);
// It is critical to call onDone after adding futures to compound list.
onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedEntries, topVer));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index d5483cd..6515140 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
@@ -127,7 +126,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
break;
}
catch (GridCacheEntryRemovedException e) {
- entry = ctx.cache().entryEx(entry.key());
+ entry = ctx.cache().entryEx(entry.key(), tx.topologyVersion());
txEntry.cached(entry);
}
@@ -601,7 +600,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
break;
}
catch (GridCacheEntryRemovedException ignore) {
- entry.cached(cacheCtx.near().entryEx(entry.key()));
+ entry.cached(cacheCtx.near().entryEx(entry.key(), topVer));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index f146071..5d3f604 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -109,7 +109,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
- f.onResult(e);
+ f.onNodeLeft(e, true);
found = true;
}
@@ -121,13 +121,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* @param e Error.
+ * @param discoThread {@code True} if executed from discovery thread.
*/
- void onError(Throwable e) {
+ void onError(Throwable e, boolean discoThread) {
if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) {
if (tx.onePhaseCommit()) {
tx.markForBackupCheck();
- onComplete();
+ onComplete(discoThread);
return;
}
@@ -147,7 +148,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
}
}
- onComplete();
+ onComplete(discoThread);
}
}
@@ -202,7 +203,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
ERR_UPD.compareAndSet(this, null, err);
- return onComplete();
+ return onComplete(false);
}
/**
@@ -216,9 +217,10 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* Completeness callback.
*
+ * @param discoThread {@code True} if executed from discovery thread.
* @return {@code True} if future was finished by this call.
*/
- private boolean onComplete() {
+ private boolean onComplete(boolean discoThread) {
Throwable err0 = err;
if (err0 == null || tx.needCheckBackup())
@@ -248,14 +250,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (tx.setRollbackOnly()) {
if (tx.timedOut())
onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " +
- "was rolled back: " + this));
+ "was rolled back: " + this), false);
else
onError(new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
+ "[state=" + tx.state() + ", tx=" + this + ']'), false);
}
else
onError(new IgniteTxRollbackCheckedException("Invalid transaction state for " +
- "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+ "prepare [state=" + tx.state() + ", tx=" + this + ']'), false);
return;
}
@@ -270,7 +272,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
markInitialized();
}
catch (TransactionTimeoutException e) {
- onError(e);
+ onError(e, false);
}
}
@@ -450,7 +452,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
tx.userPrepare();
}
catch (IgniteCheckedException e) {
- onError(e);
+ onError(e, false);
}
}
@@ -485,7 +487,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
catch (ClusterTopologyCheckedException e) {
e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
- fut.onResult(e);
+ fut.onNodeLeft(e, false);
}
catch (IgniteCheckedException e) {
fut.onResult(e);
@@ -586,7 +588,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
break;
}
catch (GridCacheEntryRemovedException ignore) {
- entry.cached(cacheCtx.near().entryEx(entry.key()));
+ entry.cached(cacheCtx.near().entryEx(entry.key(), topVer));
}
}
}
@@ -695,7 +697,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* @param e Node failure.
*/
- void onResult(ClusterTopologyCheckedException e) {
+ void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) {
if (isDone())
return;
@@ -705,7 +707,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
// Fail the whole future (make sure not to remap on different primary node
// to prevent multiple lock coordinators).
- parent.onError(e);
+ parent.onError(e, discoThread);
}
}
@@ -721,7 +723,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (RCV_RES_UPD.compareAndSet(this, 0, 1)) {
if (res.error() != null) {
// Fail the whole compound future.
- parent.onError(res.error());
+ parent.onError(res.error(), false);
}
else {
if (res.clientRemapVersion() != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 7132567..4d77a3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -52,22 +52,16 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
// Obtain the topology version to use.
long threadId = Thread.currentThread().getId();
- AffinityTopologyVersion topVer = null;
+ AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
- if (tx.system()) {
- topVer = tx.topologyVersionSnapshot();
+ // If there is another system transaction in progress, use it's topology version to prevent deadlock.
+ if (topVer == null && tx.system()) {
+ topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
if (topVer == null)
- topVer = cctx.exchange().readyAffinityVersion();
+ topVer = tx.topologyVersionSnapshot();
}
- if (topVer == null)
- topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
-
- // If there is another system transaction in progress, use it's topology version to prevent deadlock.
- if (topVer == null && tx.system())
- topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
-
if (topVer != null) {
tx.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index a3130cd..e954a7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -141,8 +141,9 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
- @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
return tx.getAllAsync(ctx,
+ readyTopVer,
ctx.cacheKeysView(keys),
deserializeBinary,
skipVals,
@@ -179,6 +180,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
* @return Future.
*/
IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx,
+ AffinityTopologyVersion topVer,
@Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean deserializeBinary,
@@ -202,7 +204,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
/*keepCacheObjects*/true);
// init() will register future for responses if it has remote mappings.
- fut.init();
+ fut.init(topVer);
return fut;
}
@@ -314,6 +316,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (tx == null) {
tx = new GridNearTxRemote(
ctx.shared(),
+ req.topologyVersion(),
nodeId,
req.nearNodeId(),
req.nearXidVersion(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 078e322..5c4aca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -136,7 +136,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (isMini(fut)) {
MinFuture f = (MinFuture)fut;
- if (f.onNodeLeft(nodeId)) {
+ if (f.onNodeLeft(nodeId, true)) {
// Remove previous mapping.
mappings.remove(nodeId);
@@ -211,7 +211,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
if (f.futureId().equals(res.miniId()))
- f.onDhtFinishResponse(nodeId);
+ f.onDhtFinishResponse(nodeId, false);
}
}
}
@@ -486,7 +486,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
catch (ClusterTopologyCheckedException e) {
- mini.onNodeLeft(backupId);
+ mini.onNodeLeft(backupId, false);
}
catch (IgniteCheckedException e) {
mini.onDone(e);
@@ -628,7 +628,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Remove previous mapping.
mappings.remove(m.node().id());
- fut.onNodeLeft(n.id());
+ fut.onNodeLeft(n.id(), false);
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
@@ -652,9 +652,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
", loc=" + node.isLocal() +
", done=" + fut.isDone() + ']';
}
- else {
+ else
return "FinishFuture[node=null, done=" + fut.isDone() + ']';
- }
}
else if (f.getClass() == CheckBackupMiniFuture.class) {
CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
@@ -728,10 +727,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
private final IgniteUuid futId = IgniteUuid.randomUuid();
/**
- * @param nodeId Node ID.
+ * @param nodeId Node ID.
+ * @param discoThread {@code True} if executed from discovery thread.
* @return {@code True} if future processed node failure.
*/
- abstract boolean onNodeLeft(UUID nodeId);
+ abstract boolean onNodeLeft(UUID nodeId, boolean discoThread);
/**
* @return Future ID.
@@ -774,10 +774,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
return m;
}
- /**
- * @param nodeId Failed node ID.
- */
- boolean onNodeLeft(UUID nodeId) {
+ /** {@inheritDoc} */
+ boolean onNodeLeft(UUID nodeId, boolean discoThread) {
if (nodeId.equals(m.node().id())) {
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply: " + this);
@@ -806,7 +804,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- mini.onDhtFinishResponse(cctx.localNodeId());
+ mini.onDhtFinishResponse(cctx.localNodeId(), true);
}
});
}
@@ -815,7 +813,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
cctx.io().send(backup, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException e) {
- mini.onNodeLeft(backupId);
+ mini.onNodeLeft(backupId, discoThread);
}
catch (IgniteCheckedException e) {
mini.onDone(e);
@@ -823,7 +821,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else
- mini.onDhtFinishResponse(backupId);
+ mini.onDhtFinishResponse(backupId, true);
}
}
}
@@ -881,7 +879,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override boolean onNodeLeft(UUID nodeId) {
+ @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
if (nodeId.equals(backup.id())) {
readyNearMappingFromBackup(m);
@@ -942,22 +940,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
- @Override boolean onNodeLeft(UUID nodeId) {
- return onResponse(nodeId);
+ @Override boolean onNodeLeft(UUID nodeId, boolean discoThread) {
+ return onResponse(nodeId, discoThread);
}
/**
* @param nodeId Node ID.
+ * @param discoThread {@code True} if executed from discovery thread.
*/
- void onDhtFinishResponse(UUID nodeId) {
- onResponse(nodeId);
+ void onDhtFinishResponse(UUID nodeId, boolean discoThread) {
+ onResponse(nodeId, discoThread);
}
/**
* @param nodeId Node ID.
+ * @param discoThread {@code True} if executed from discovery thread.
* @return {@code True} if processed node response.
*/
- private boolean onResponse(UUID nodeId) {
+ private boolean onResponse(UUID nodeId, boolean discoThread) {
boolean done;
boolean ret;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a70fb3a..f7c330e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
@@ -344,6 +345,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
+ AffinityTopologyVersion topVer,
boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
@@ -354,6 +356,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
) {
if (cacheCtx.isNear()) {
return cacheCtx.nearTx().txLoadAsync(this,
+ topVer,
keys,
readThrough,
/*deserializeBinary*/false,
@@ -384,7 +387,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
key,
readThrough,
/*force primary*/needVer,
- topologyVersion(),
+ topVer,
CU.subjectId(this, cctx),
resolveTaskName(),
/*deserializeBinary*/false,
@@ -415,7 +418,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
keys,
readThrough,
/*force primary*/needVer,
- topologyVersion(),
+ topVer,
CU.subjectId(this, cctx),
resolveTaskName(),
/*deserializeBinary*/false,
@@ -445,7 +448,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
else {
assert cacheCtx.isLocal();
- return super.loadMissing(cacheCtx, readThrough, async, keys, skipVals, keepBinary, needVer, c);
+ return super.loadMissing(cacheCtx,
+ topVer,
+ readThrough,
+ async,
+ keys,
+ skipVals,
+ keepBinary,
+ needVer,
+ c);
}
}
@@ -512,7 +523,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
txEntry.explicitVersion(candVer);
- if (candVer.isLess(minVer))
+ if (candVer.compareTo(minVer) < 0)
minVer = candVer;
}
}
@@ -715,7 +726,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
", tx=" + this + ']');
// Replace the entry.
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
+ txEntry.cached(txEntry.context().cache().entryEx(txEntry.key(), topologyVersion()));
}
}
}
@@ -1338,6 +1349,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearTxLocal.class, this, "mappings", mappings, "super", super.toString());
+ return S.toString(GridNearTxLocal.class, this,
+ "thread", IgniteUtils.threadName(threadId),
+ "mappings", mappings,
+ "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 2acc139..03e1034 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -209,7 +209,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
}
catch (GridCacheEntryRemovedException ignored) {
// Retry.
- txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), tx.topologyVersion()));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 9c52c80..6b17d5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -25,6 +25,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -72,12 +73,14 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
/**
* This constructor is meant for optimistic transactions.
*
+ * @param topVer Transaction topology version.
* @param ldr Class loader.
* @param nodeId Node ID.
* @param nearNodeId Near node ID.
* @param xidVer XID version.
* @param commitVer Commit version.
* @param sys System flag.
+ * @param plc IO policy.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -85,10 +88,13 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
* @param writeEntries Write entries.
* @param ctx Cache registry.
* @param txSize Expected transaction size.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
* @throws IgniteCheckedException If unmarshalling failed.
*/
public GridNearTxRemote(
GridCacheSharedContext ctx,
+ AffinityTopologyVersion topVer,
ClassLoader ldr,
UUID nodeId,
UUID nearNodeId,
@@ -137,26 +143,35 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
addEntry(entry);
}
}
+
+ assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
+ topologyVersion(topVer);
}
/**
* This constructor is meant for pessimistic transactions.
*
+ * @param topVer Transaction topology version.
* @param nodeId Node ID.
* @param nearNodeId Near node ID.
* @param nearXidVer Near transaction ID.
* @param xidVer XID version.
* @param commitVer Commit version.
* @param sys System flag.
+ * @param plc IO policy.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
* @param timeout Timeout.
* @param ctx Cache registry.
* @param txSize Expected transaction size.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
*/
public GridNearTxRemote(
GridCacheSharedContext ctx,
+ AffinityTopologyVersion topVer,
UUID nodeId,
UUID nearNodeId,
GridCacheVersion nearXidVer,
@@ -195,6 +210,10 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
txState = new IgniteTxRemoteStateImpl(U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(1),
U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(txSize));
+
+ assert topVer != null && topVer.topologyVersion() > 0 : topVer;
+
+ topologyVersion(topVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
index 358f90c..f2a4b30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
@@ -56,13 +56,13 @@ public interface GridCacheDrManager extends GridCacheManager {
AffinityTopologyVersion topVer)throws IgniteCheckedException;
/**
- * Process partitions "before exchange" event.
+ * Process partitions exchange event.
*
* @param topVer Topology version.
* @param left {@code True} if exchange has been caused by node leave.
* @throws IgniteCheckedException If failed.
*/
- public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException;
+ public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException;
/**
* @return {@code True} is DR is enabled.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
index 825769f..a82adf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java
@@ -78,7 +78,7 @@ public class GridOsCacheDrManager implements GridCacheDrManager {
}
/** {@inheritDoc} */
- @Override public void beforeExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException {
+ @Override public void onExchange(AffinityTopologyVersion topVer, boolean left) throws IgniteCheckedException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
index 6ddd2e5..10fa116 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java
@@ -176,33 +176,6 @@ public class GridLocalCacheEntry extends GridCacheMapEntry {
return owner;
}
- /**
- *
- * @param ver Candidate version.
- * @return Current owner.
- */
- @Nullable public GridCacheMvccCandidate readyLocal(GridCacheVersion ver) {
- GridCacheMvccCandidate prev = null;
- GridCacheMvccCandidate owner = null;
-
- synchronized (this) {
- GridCacheMvcc mvcc = mvccExtras();
-
- if (mvcc != null) {
- prev = mvcc.localOwner();
-
- owner = mvcc.readyLocal(ver);
-
- if (mvcc.isEmpty())
- mvccExtras(null);
- }
- }
-
- checkOwnerChanged(prev, owner);
-
- return owner;
- }
-
/** {@inheritDoc} */
@Override public boolean tmLock(IgniteInternalTx tx,
long timeout,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 2e41f2a..c69759b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -174,20 +173,6 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
}
/**
- * @return Lock version.
- */
- GridCacheVersion lockVersion() {
- return lockVer;
- }
-
- /**
- * @return Entries.
- */
- List<GridLocalCacheEntry> entries() {
- return entries;
- }
-
- /**
* @return {@code True} if transaction is not {@code null}.
*/
private boolean inTx() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 401b61b..8bcf564 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -2485,11 +2485,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
* @return Expire time.
*/
abstract long expireTime();
-
- /**
- * @return Version.
- */
- abstract GridCacheVersion version();
}
/**
@@ -2528,11 +2523,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
@Override long expireTime() {
return GridCacheSwapEntryImpl.expireTime(e.getValue());
}
-
- /** {@inheritDoc} */
- @Override GridCacheVersion version() {
- return GridCacheSwapEntryImpl.version(e.getValue());
- }
}
/**
@@ -2578,11 +2568,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
@Override long expireTime() {
return GridCacheOffheapSwapEntry.expireTime(valPtr.get1());
}
-
- /** {@inheritDoc} */
- @Override GridCacheVersion version() {
- return GridCacheOffheapSwapEntry.version(valPtr.get1());
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 55bcf45..77f3765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -465,21 +465,29 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
@Override public AffinityTopologyVersion topologyVersion() {
AffinityTopologyVersion res = topVer;
- if (res.equals(AffinityTopologyVersion.NONE))
+ if (res.equals(AffinityTopologyVersion.NONE)) {
+ if (system()) {
+ AffinityTopologyVersion topVer = cctx.tm().lockedTopologyVersion(Thread.currentThread().getId(), this);
+
+ if (topVer != null)
+ return topVer;
+ }
+
return cctx.exchange().topologyVersion();
+ }
return res;
}
/** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersionSnapshot() {
+ @Override public final AffinityTopologyVersion topologyVersionSnapshot() {
AffinityTopologyVersion ret = topVer;
return AffinityTopologyVersion.NONE.equals(ret) ? null : ret;
}
/** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
+ @Override public final AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) {
AffinityTopologyVersion topVer0 = this.topVer;
if (!AffinityTopologyVersion.NONE.equals(topVer0))
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 9060fa7..e75ce91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -920,13 +921,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
- * @return Read version for serializable transaction.
- */
- @Nullable public GridCacheVersion serializableReadVersion() {
- return serReadVer;
- }
-
- /**
* Gets stored entry version. Version is stored for all entries in serializable transaction or
* when value is read using {@link IgniteCache#getEntry(Object)} method.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 547c018..41dc43f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.Collection;
+import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -349,6 +350,7 @@ public class IgniteTxHandler {
tx = new GridDhtTxLocal(
ctx,
+ req.topologyVersion(),
nearNode.id(),
req.version(),
req.futureId(),
@@ -464,8 +466,19 @@ public class IgniteTxHandler {
Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer);
Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer);
- if (!cacheNodes0.equals(cacheNodes1))
+ if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().compareTo(curVer) < 0)
+ return true;
+
+ try {
+ List<List<ClusterNode>> aff1 = ctx.affinity().assignments(expVer);
+ List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer);
+
+ if (!aff1.equals(aff2))
+ return true;
+ }
+ catch (IllegalStateException err) {
return true;
+ }
}
return false;
@@ -1247,6 +1260,7 @@ public class IgniteTxHandler {
if (tx == null) {
tx = new GridNearTxRemote(
ctx,
+ req.topologyVersion(),
ldr,
nodeId,
req.nearNodeId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 11a35cb..b1e150b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -343,11 +343,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
txState.seal();
}
- /** {@inheritDoc} */
- @Override public GridCacheReturn implicitSingleResult() {
- return implicitRes;
- }
-
/**
* @param ret Result.
*/
@@ -412,6 +407,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
+ AffinityTopologyVersion topVer,
final boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
@@ -472,7 +468,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
log.debug("Got removed entry, will retry: " + key);
if (txEntry != null)
- txEntry.cached(cacheCtx.cache().entryEx(key));
+ txEntry.cached(cacheCtx.cache().entryEx(key, topologyVersion()));
}
}
}
@@ -1137,7 +1133,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (log.isDebugEnabled())
log.debug("Got removed entry during transaction commit (will retry): " + txEntry);
- txEntry.cached(entryEx(cacheCtx, txEntry.txKey()));
+ txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion()));
}
}
}
@@ -1334,9 +1330,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * Checks if there is a cached or swapped value for
- * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method.
- *
* @param cacheCtx Cache context.
* @param keys Key to enlist.
* @param expiryPlc Explicitly specified expiry policy for entry.
@@ -1353,6 +1346,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@SuppressWarnings({"RedundantTypeArguments"})
private <K, V> Collection<KeyCacheObject> enlistRead(
final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Collection<KeyCacheObject> keys,
@Nullable ExpiryPolicy expiryPlc,
Map<K, V> map,
@@ -1373,7 +1367,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
Collection<KeyCacheObject> lockKeys = null;
- AffinityTopologyVersion topVer = topologyVersion();
+ AffinityTopologyVersion topVer = entryTopVer != null ? entryTopVer : topologyVersion();
boolean needReadVer = (serializable() && optimistic()) || needVer;
@@ -1653,10 +1647,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
/**
- * Loads all missed keys for
- * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean, boolean)} method.
- *
* @param cacheCtx Cache context.
+ * @param topVer Topology version.
* @param map Return map.
* @param missedMap Missed keys.
* @param deserializeBinary Deserialize binary flag.
@@ -1667,6 +1659,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
*/
private <K, V> IgniteInternalFuture<Map<K, V>> checkMissed(
final GridCacheContext cacheCtx,
+ final AffinityTopologyVersion topVer,
final Map<K, V> map,
final Map<KeyCacheObject, GridCacheVersion> missedMap,
final boolean deserializeBinary,
@@ -1695,6 +1688,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
},
loadMissing(
cacheCtx,
+ topVer,
!skipStore,
false,
missedMap.keySet(),
@@ -1776,6 +1770,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@SuppressWarnings("unchecked")
@Override public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
final GridCacheContext cacheCtx,
+ @Nullable final AffinityTopologyVersion entryTopVer,
Collection<KeyCacheObject> keys,
final boolean deserializeBinary,
final boolean skipVals,
@@ -1803,6 +1798,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null;
final Collection<KeyCacheObject> lockKeys = enlistRead(cacheCtx,
+ entryTopVer,
keys,
expiryPlc,
retMap,
@@ -1934,13 +1930,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
log.debug("Got removed exception in get postLock (will retry): " +
cached);
- txEntry.cached(entryEx(cacheCtx, txKey));
+ txEntry.cached(entryEx(cacheCtx, txKey, topologyVersion()));
}
}
}
if (!missed.isEmpty() && cacheCtx.isLocal()) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
retMap,
missed,
deserializeBinary,
@@ -2007,7 +2009,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (missed.isEmpty())
return new GridFinishedFuture<>(retMap);
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
return checkMissed(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
retMap,
missed,
deserializeBinary,
@@ -2031,10 +2039,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@SuppressWarnings("unchecked")
@Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Map<? extends K, ? extends V> map,
boolean retval
) {
return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
map,
null,
null,
@@ -2045,19 +2055,35 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** {@inheritDoc} */
@Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
K key,
V val,
boolean retval,
CacheEntryPredicate filter) {
- return putAsync0(cacheCtx, key, val, null, null, retval, filter);
+ return putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ val,
+ null,
+ null,
+ retval,
+ filter);
}
/** {@inheritDoc} */
@Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
K key,
EntryProcessor<K, V, Object> entryProcessor,
Object... invokeArgs) {
- return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null);
+ return (IgniteInternalFuture)putAsync0(cacheCtx,
+ entryTopVer,
+ key,
+ null,
+ entryProcessor,
+ invokeArgs,
+ true,
+ null);
}
/** {@inheritDoc} */
@@ -2072,6 +2098,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
});
return this.<Object, Object>putAllAsync0(cacheCtx,
+ null,
map,
null,
null,
@@ -2083,10 +2110,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@SuppressWarnings("unchecked")
@Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
@Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
Object... invokeArgs
) {
return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx,
+ entryTopVer,
null,
map,
invokeArgs,
@@ -2099,7 +2128,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
GridCacheContext cacheCtx,
Map<KeyCacheObject, GridCacheVersion> drMap
) {
- return removeAllAsync0(cacheCtx, null, drMap, false, null, false);
+ return removeAllAsync0(cacheCtx, null, null, drMap, false, null, false);
}
/**
@@ -2136,6 +2165,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
*/
private <K, V> IgniteInternalFuture<Void> enlistWrite(
final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
KeyCacheObject cacheKey,
Object val,
@Nullable ExpiryPolicy expiryPlc,
@@ -2162,6 +2192,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
GridCacheVersion drVer = dataCenterId != null ? cctx.versions().next(dataCenterId) : null;
boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
cacheKey,
val,
entryProcessor,
@@ -2183,7 +2214,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
keepBinary);
if (loadMissed) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
Collections.singleton(cacheKey),
filter,
ret,
@@ -2226,6 +2263,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
*/
private <K, V> IgniteInternalFuture<Void> enlistWrite(
final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Collection<?> keys,
@Nullable ExpiryPolicy expiryPlc,
@Nullable Map<?, ?> lookup,
@@ -2315,6 +2353,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
boolean loadMissed = enlistWriteEntry(cacheCtx,
+ entryTopVer,
cacheKey,
val,
entryProcessor,
@@ -2344,7 +2383,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
}
if (missedForLoad != null) {
+ AffinityTopologyVersion topVer = topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = entryTopVer;
+
return loadMissing(cacheCtx,
+ topVer != null ? topVer : topologyVersion(),
missedForLoad,
filter,
ret,
@@ -2377,6 +2422,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
*/
private IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
+ final AffinityTopologyVersion topVer,
final Set<KeyCacheObject> keys,
final CacheEntryPredicate[] filter,
final GridCacheReturn ret,
@@ -2441,6 +2487,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
return loadMissing(
cacheCtx,
+ topVer,
/*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore,
/*async*/true,
keys,
@@ -2474,6 +2521,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
* @throws IgniteCheckedException If failed.
*/
private boolean enlistWriteEntry(GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
final KeyCacheObject cacheKey,
@Nullable final Object val,
@Nullable final EntryProcessor<?, ?, ?> entryProcessor,
@@ -2505,7 +2553,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
// First time access.
if (txEntry == null) {
while (true) {
- GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion());
+ GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());
try {
entry.unswap(false);
@@ -2936,7 +2984,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
if (log.isDebugEnabled())
log.debug("Got removed entry in putAllAsync method (will retry): " + cached);
- txEntry.cached(entryEx(cached.context(), txEntry.txKey()));
+ txEntry.cached(entryEx(cached.context(), txEntry.txKey(), topologyVersion()));
}
}
}
@@ -3024,6 +3072,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
*/
private <K, V> IgniteInternalFuture putAsync0(
final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
K key,
@Nullable V val,
@Nullable EntryProcessor<K, V, Object> entryProcessor,
@@ -3050,6 +3099,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final IgniteInternalFuture<Void> loadFut = enlistWrite(
cacheCtx,
+ entryTopVer,
cacheKey,
val,
opCtx != null ? opCtx.expiry() : null,
@@ -3156,6 +3206,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@SuppressWarnings("unchecked")
private <K, V> IgniteInternalFuture putAllAsync0(
final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
@Nullable Map<? extends K, ? extends V> map,
@Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
@Nullable final Object[] invokeArgs,
@@ -3214,6 +3265,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final IgniteInternalFuture<Void> loadFut = enlistWrite(
cacheCtx,
+ entryTopVer,
keySet,
opCtx != null ? opCtx.expiry() : null,
map0,
@@ -3386,12 +3438,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** {@inheritDoc} */
@Override public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Collection<? extends K> keys,
boolean retval,
CacheEntryPredicate filter,
boolean singleRmv
) {
- return removeAllAsync0(cacheCtx, keys, null, retval, filter, singleRmv);
+ return removeAllAsync0(cacheCtx, entryTopVer, keys, null, retval, filter, singleRmv);
}
/**
@@ -3406,6 +3459,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
@SuppressWarnings("unchecked")
private <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync0(
final GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
@Nullable final Collection<? extends K> keys,
@Nullable Map<KeyCacheObject, GridCacheVersion> drMap,
final boolean retval,
@@ -3491,6 +3545,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
final IgniteInternalFuture<Void> loadFut = enlistWrite(
cacheCtx,
+ entryTopVer,
keys0,
plc,
/** lookup map */null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 5911e89..6741885 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -23,6 +23,7 @@ import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -72,6 +73,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
*/
public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Collection<KeyCacheObject> keys,
boolean deserializeBinary,
boolean skipVals,
@@ -87,6 +89,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
*/
public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Map<? extends K, ? extends V> map,
boolean retval);
@@ -100,6 +103,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
*/
public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
K key,
V val,
boolean retval,
@@ -114,6 +118,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
*/
public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
K key,
EntryProcessor<K, V, Object> entryProcessor,
Object... invokeArgs);
@@ -126,6 +131,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
*/
public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Map<? extends K, ? extends EntryProcessor<K, V, Object>> map,
Object... invokeArgs);
@@ -139,6 +145,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
*/
public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync(
GridCacheContext cacheCtx,
+ @Nullable AffinityTopologyVersion entryTopVer,
Collection<? extends K> keys,
boolean retval,
CacheEntryPredicate filter,
@@ -163,11 +170,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
Map<KeyCacheObject, GridCacheVersion> drMap);
/**
- * @return Return value for
- */
- public GridCacheReturn implicitSingleResult();
-
- /**
* Finishes transaction (either commit or rollback).
*
* @param commit {@code True} if commit, {@code false} if rollback.
@@ -188,6 +190,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
*/
public IgniteInternalFuture<Void> loadMissing(
GridCacheContext cacheCtx,
+ AffinityTopologyVersion topVer,
boolean readThrough,
boolean async,
Collection<KeyCacheObject> keys,
|