Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 328B61872A for ; Fri, 30 Oct 2015 15:52:01 +0000 (UTC) Received: (qmail 52862 invoked by uid 500); 30 Oct 2015 15:51:57 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 52733 invoked by uid 500); 30 Oct 2015 15:51:57 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 51993 invoked by uid 99); 30 Oct 2015 15:51:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Oct 2015 15:51:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 69501E02FF; Fri, 30 Oct 2015 15:51:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Fri, 30 Oct 2015 15:52:09 -0000 Message-Id: <64680580ae1b47a48aa4528318998b1f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/52] [abbrv] ignite git commit: ignite-1607 Implemented deadlock-free optimistic serializable tx mode http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index fe519a7..3c3527a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -221,34 +221,9 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda return true; } - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked", "RedundantCast"}) - @Override public IgniteInternalFuture readThroughAllAsync( - Collection keys, - boolean reload, - boolean skipVals, - IgniteInternalTx tx, - @Nullable UUID subjId, - String taskName, - IgniteBiInClosure vis - ) { - return (IgniteInternalFuture)loadAsync(tx, - keys, - reload, - /*force primary*/false, - subjId, - taskName, - /*deserialize portable*/true, - /*expiry policy*/null, - skipVals, - /*skip store*/false, - /*can remap*/true); - } - /** * @param tx Transaction. * @param keys Keys to load. - * @param reload Reload flag. * @param forcePrimary Force primary flag. * @param subjId Subject ID. * @param taskName Task name. @@ -256,11 +231,11 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda * @param expiryPlc Expiry policy. * @param skipVal Skip value flag. * @param skipStore Skip store flag. + * @param canRemap Can remap flag. * @return Loaded values. */ public IgniteInternalFuture> loadAsync(@Nullable IgniteInternalTx tx, @Nullable Collection keys, - boolean reload, boolean forcePrimary, @Nullable UUID subjId, String taskName, @@ -280,7 +255,6 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda GridNearGetFuture fut = new GridNearGetFuture<>(ctx, keys, !skipStore, - reload, forcePrimary, txx, subjId, @@ -288,7 +262,9 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda deserializePortable, expiry, skipVal, - canRemap); + canRemap, + false, + false); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 2ae03d3..d558cc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -46,7 +46,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; @SuppressWarnings({"NonPrivateFieldAccessedInSynchronizedContext", "TooBroadScope"}) public class GridNearCacheEntry extends GridDistributedCacheEntry { /** */ - private static final int NEAR_SIZE_OVERHEAD = 36; + private static final int NEAR_SIZE_OVERHEAD = 36 + 16; /** Topology version at the moment when value was initialized from primary node. */ private volatile long topVer = -1L; @@ -58,6 +58,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** Partition. */ private int part; + /** */ + private short evictReservations; + /** * @param ctx Cache context. * @param key Cache key. @@ -316,15 +319,21 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { primaryNode(primaryNodeId, topVer); } - /** - * This method should be called only when committing optimistic transactions. - * + /* * @param dhtVer DHT version to record. + * @return {@code False} if given version is lower then existing version. */ - public synchronized void recordDhtVersion(GridCacheVersion dhtVer) { - // Version manager must be updated separately, when adding DHT version - // to transaction entries. - this.dhtVer = dhtVer; + public final boolean recordDhtVersion(GridCacheVersion dhtVer) { + assert dhtVer != null; + assert Thread.holdsLock(this); + + if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) <= 0) { + this.dhtVer = dhtVer; + + return true; + } + + return false; } /** {@inheritDoc} */ @@ -332,7 +341,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { UUID subjId, String taskName) throws IgniteCheckedException { return cctx.near().loadAsync(tx, F.asList(key), - reload, /*force primary*/false, subjId, taskName, @@ -350,7 +358,6 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { * @param val New value. * @param ver Version to use. * @param dhtVer DHT version received from remote node. - * @param expVer Optional version to match. * @param ttl Time to live. * @param expireTime Expiration time. * @param evt Event flag. @@ -366,14 +373,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { CacheObject val, GridCacheVersion ver, GridCacheVersion dhtVer, - @Nullable GridCacheVersion expVer, long ttl, long expireTime, boolean evt, AffinityTopologyVersion topVer, UUID subjId) throws IgniteCheckedException, GridCacheEntryRemovedException { - boolean valid = valid(tx != null ? tx.topologyVersion() : cctx.affinity().affinityTopologyVersion()); + assert dhtVer != null; GridCacheVersion enqueueVer = null; @@ -389,28 +395,25 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { CacheObject old = this.val; boolean hasVal = hasValueUnlocked(); - if (isNew() || !valid || expVer == null || expVer.equals(this.dhtVer)) { + if (this.dhtVer == null || this.dhtVer.compareTo(dhtVer) < 0) { primaryNode(primaryNodeId, topVer); - // Change entry only if dht version has changed. - if (!dhtVer.equals(dhtVersion())) { - update(val, expireTime, ttl, ver); + update(val, expireTime, ttl, ver); - if (cctx.deferredDelete() && !isInternal()) { - boolean deleted = val == null; + if (cctx.deferredDelete() && !isInternal()) { + boolean deleted = val == null; - if (deleted != deletedUnlocked()) { - deletedUnlocked(deleted); + if (deleted != deletedUnlocked()) { + deletedUnlocked(deleted); - if (deleted) - enqueueVer = ver; - } + if (deleted) + enqueueVer = ver; } + } - recordDhtVersion(dhtVer); + this.dhtVer = dhtVer; - ret = true; - } + ret = true; } if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) @@ -647,6 +650,32 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** + * @throws GridCacheEntryRemovedException If entry was removed. + */ + public synchronized void reserveEviction() throws GridCacheEntryRemovedException { + checkObsolete(); + + evictReservations++; + } + + /** + * + */ + public synchronized void releaseEviction() { + assert evictReservations > 0 : this; + assert !obsolete() : this; + + evictReservations--; + } + + /** {@inheritDoc} */ + @Override protected boolean evictionDisabled() { + assert Thread.holdsLock(this); + + return evictReservations > 0; + } + + /** * @param nodeId Primary node ID. * @param topVer Topology version. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index eca2f71..ae1d43c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -24,7 +24,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -38,19 +37,17 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -59,6 +56,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -66,83 +64,31 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; -import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; /** * */ -public final class GridNearGetFuture extends GridCompoundIdentityFuture> - implements GridCacheFuture> { +public final class GridNearGetFuture extends CacheDistributedGetFutureAdapter { /** */ private static final long serialVersionUID = 0L; - /** Default max remap count value. */ - public static final int DFLT_MAX_REMAP_CNT = 3; - /** Logger reference. */ private static final AtomicReference logRef = new AtomicReference<>(); /** Logger. */ private static IgniteLogger log; - /** Maximum number of attempts to remap key to the same primary node. */ - private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); - - /** Context. */ - private final GridCacheContext cctx; - - /** Keys. */ - private Collection keys; - - /** Reload flag. */ - private boolean reload; - - /** Read through flag. */ - private boolean readThrough; - - /** Force primary flag. */ - private boolean forcePrimary; - - /** Future ID. */ - private IgniteUuid futId; - - /** Version. */ - private GridCacheVersion ver; - /** Transaction. */ private IgniteTxLocalEx tx; - /** Trackable flag. */ - private boolean trackable; - - /** Remap count. */ - private AtomicInteger remapCnt = new AtomicInteger(); - - /** Subject ID. */ - private UUID subjId; - - /** Task name. */ - private String taskName; - - /** Whether to deserialize portable objects. */ - private boolean deserializePortable; - - /** Skip values flag. */ - private boolean skipVals; - - /** Expiry policy. */ - private IgniteCacheExpiryPolicy expiryPlc; - - /** Flag indicating that get should be done on a locked topology version. */ - private final boolean canRemap; + /** */ + private GridCacheVersion ver; /** * @param cctx Context. * @param keys Keys. * @param readThrough Read through flag. - * @param reload Reload flag. * @param forcePrimary If {@code true} get will be performed on primary node even if * called on backup node. * @param tx Transaction. @@ -151,12 +97,14 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture cctx, Collection keys, boolean readThrough, - boolean reload, boolean forcePrimary, @Nullable IgniteTxLocalEx tx, @Nullable UUID subjId, @@ -164,24 +112,26 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuturemapsReducer(keys.size())); + super(cctx, + keys, + readThrough, + forcePrimary, + subjId, + taskName, + deserializePortable, + expiryPlc, + skipVals, + canRemap, + needVer, + keepCacheObjects); assert !F.isEmpty(keys); - this.cctx = cctx; - this.keys = keys; - this.readThrough = readThrough; - this.reload = reload; - this.forcePrimary = forcePrimary; this.tx = tx; - this.subjId = subjId; - this.taskName = taskName; - this.deserializePortable = deserializePortable; - this.expiryPlc = expiryPlc; - this.skipVals = skipVals; - this.canRemap = canRemap; futId = IgniteUuid.randomUuid(); @@ -318,16 +268,17 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture> mappings = U.newHashMap(affNodes.size()); - Map savedVers = null; + Map savedEntries = null; // Assign keys to primary nodes. for (KeyCacheObject key : keys) - savedVers = map(key, mappings, topVer, mapped, savedVers); + savedEntries = map(key, mappings, topVer, mapped, savedEntries); if (isDone()) return; - final Map saved = savedVers; + final Map saved = savedEntries != null ? savedEntries : + Collections.emptyMap(); final int keysSize = keys.size(); @@ -346,7 +297,6 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture map( + private Map map( KeyCacheObject key, Map> mappings, AffinityTopologyVersion topVer, Map> mapped, - Map savedVers + Map saved ) { final GridNearCacheAdapter near = cache(); // Allow to get cached value from the local node. boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer); - GridCacheEntryEx entry = allowLocRead ? near.peekEx(key) : null; - while (true) { + GridNearCacheEntry entry = allowLocRead ? (GridNearCacheEntry)near.peekEx(key) : null; + try { CacheObject v = null; + GridCacheVersion ver = null; boolean isNear = entry != null; // First we peek into near cache. - if (isNear) - v = entry.innerGet(tx, - /*swap*/false, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /*metrics*/true, - /*events*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc); + if (isNear) { + if (needVer) { + T2 res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/true, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(tx, + /*swap*/false, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*metrics*/true, + /*events*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } + } ClusterNode affNode = null; @@ -486,18 +456,37 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture res = dhtEntry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!isNear && !skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = dhtEntry.innerGet(tx, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /*update-metrics*/false, + /*events*/!isNear && !skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } // Entry was not in memory or in swap, so we remove it from cache. if (v == null && isNew && dhtEntry.markObsoleteIfEmpty(ver)) @@ -515,7 +504,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture(skipVals ? true : v, ver); + + add(new GridFinishedFuture<>(Collections.singletonMap((K)key, val0))); + } + else { + if (keepCacheObjects) { + K key0 = (K)key; + V val0 = (V)(skipVals ? true : v); + + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); + } + else { + K key0 = key.value(cctx.cacheObjectContext(), true); + V val0 = v.value(cctx.cacheObjectContext(), true); - val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); - key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); + val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); + key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); - add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); + } + } } else { if (affNode == null) { @@ -551,19 +555,10 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { @@ -572,10 +567,23 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture keys, Collection infos, - Map savedVers, + Map savedEntries, AffinityTopologyVersion topVer ) { boolean empty = F.isEmpty(keys); @@ -681,9 +683,10 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture keys, + Map saved, + AffinityTopologyVersion topVer) { + for (KeyCacheObject key : keys) { + GridNearCacheEntry entry = saved.get(key); + + if (entry != null) { + entry.releaseEviction(); + + if (tx == null) + cctx.evicts().touch(entry, topVer); + } + } + } + /** {@inheritDoc} */ @Override public String toString() { Collection futs = F.viewReadOnly(futures(), new C1, String>() { @@ -763,7 +792,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture keys; /** Saved entry versions. */ - private Map savedVers; + private Map savedEntries; /** Topology version on which this future was mapped. */ private AffinityTopologyVersion topVer; @@ -774,18 +803,18 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture keys, - Map savedVers, + Map savedEntries, AffinityTopologyVersion topVer ) { this.node = node; this.keys = keys; - this.savedVers = savedVers; + this.savedEntries = savedEntries; this.topVer = topVer; } @@ -821,6 +850,17 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + releaseEvictions(keys.keySet(), savedEntries, topVer); + + return true; + } + else + return false; + } + /** * @param e Topology exception. */ @@ -915,7 +955,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture keys, boolean readThrough, - boolean reload, @NotNull AffinityTopologyVersion topVer, UUID subjId, int taskNameHash, @@ -145,7 +143,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep this.keys = keys.keySet(); this.flags = keys.values(); this.readThrough = readThrough; - this.reload = reload; this.topVer = topVer; this.subjId = subjId; this.taskNameHash = taskNameHash; http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java new file mode 100644 index 0000000..47c1d21 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -0,0 +1,930 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.lang.IgniteReducer; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; +import static org.apache.ignite.transactions.TransactionState.PREPARED; +import static org.apache.ignite.transactions.TransactionState.PREPARING; + +/** + * + */ +public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter + implements GridCacheMvccFuture { + /** */ + public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0"); + + /** */ + @GridToStringExclude + private KeyLockFuture keyLockFut = new KeyLockFuture(); + + /** */ + @GridToStringExclude + private ClientRemapFuture remapFut; + + /** + * @param cctx Context. + * @param tx Transaction. + */ + public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { + super(cctx, tx); + + assert tx.optimistic() && tx.serializable() : tx; + + // Should wait for all mini futures completion before finishing tx. + ignoreChildFailures(IgniteCheckedException.class); + } + + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + if (log.isDebugEnabled()) + log.debug("Transaction future received owner changed callback: " + entry); + + if ((entry.context().isNear() || entry.context().isLocal()) && owner != null) { + IgniteTxEntry txEntry = tx.entry(entry.txKey()); + + if (txEntry != null) { + if (entry.context().isLocal()) { + GridCacheVersion serReadVer = txEntry.serializableReadVersion(); + + if (serReadVer != null) { + GridCacheContext ctx = entry.context(); + + while (true) { + try { + if (!entry.checkSerializableReadVersion(serReadVer)) { + Object key = entry.key().value(ctx.cacheObjectContext(), false); + + IgniteTxOptimisticCheckedException err0 = + new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + + "read/write conflict [key=" + key + ", cache=" + ctx.name() + ']'); + + err.compareAndSet(null, err0); + } + + break; + } + catch (GridCacheEntryRemovedException e) { + entry = ctx.cache().entryEx(entry.key()); + + txEntry.cached(entry); + } + } + + } + } + + keyLockFut.onKeyLocked(entry.txKey()); + + return true; + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Collection nodes() { + return F.viewReadOnly(futures(), new IgniteClosure, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture f) { + if (isMini(f)) + return ((MiniFuture)f).node(); + + return cctx.discovery().localNode(); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + boolean found = false; + + for (IgniteInternalFuture fut : futures()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture) fut; + + if (f.node().id().equals(nodeId)) { + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + + nodeId); + + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + f.onNodeLeft(e); + + found = true; + } + } + } + + return found; + } + + /** + * @param m Failed mapping. + * @param e Error. + */ + private void onError(@Nullable GridDistributedTxMapping m, Throwable e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { + if (tx.onePhaseCommit()) { + tx.markForBackupCheck(); + + onComplete(); + + return; + } + } + + if (e instanceof IgniteTxOptimisticCheckedException) { + if (m != null) + tx.removeMapping(m.node().id()); + } + + err.compareAndSet(null, e); + + keyLockFut.onDone(e); + } + + /** {@inheritDoc} */ + @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { + if (!isDone()) { + for (IgniteInternalFuture fut : pending()) { + if (isMini(fut)) { + MiniFuture f = (MiniFuture)fut; + + if (f.futureId().equals(res.miniId())) { + assert f.node().id().equals(nodeId); + + f.onResult(res); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(IgniteInternalTx t, Throwable err) { + if (isDone()) + return false; + + if (err != null) { + this.err.compareAndSet(null, err); + + keyLockFut.onDone(err); + } + + return onComplete(); + } + + /** + * @param f Future. + * @return {@code True} if mini-future. + */ + private boolean isMini(IgniteInternalFuture f) { + return f.getClass().equals(MiniFuture.class); + } + + /** + * Completeness callback. + * + * @return {@code True} if future was finished by this call. + */ + private boolean onComplete() { + Throwable err0 = err.get(); + + if (err0 == null || tx.needCheckBackup()) + tx.state(PREPARED); + + if (super.onDone(tx, err0)) { + if (err0 != null) + tx.setRollbackOnly(); + + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); + + return true; + } + + return false; + } + + /** + * Initializes future. + * + * @param remap Remap flag. + */ + @Override protected void prepare0(boolean remap, boolean topLocked) { + try { + boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); + + if (!txStateCheck) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); + } + else + onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); + + return; + } + + prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); + + markInitialized(); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + + /** + * @param reads Read entries. + * @param writes Write entries. + * @param remap Remap flag. + * @param topLocked Topology locked flag. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void prepare( + Iterable reads, + Iterable writes, + boolean remap, + boolean topLocked + ) throws IgniteCheckedException { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer.topologyVersion() > 0; + + txMapping = new GridDhtTxMapping(); + + if (!F.isEmpty(reads) || !F.isEmpty(writes)) { + for (int cacheId : tx.activeCacheIds()) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name())); + + return; + } + } + } + + Map, GridDistributedTxMapping> mappings = new HashMap<>(); + + for (IgniteTxEntry write : writes) + map(write, topVer, mappings, remap, topLocked); + + for (IgniteTxEntry read : reads) + map(read, topVer, mappings, remap, topLocked); + + keyLockFut.onAllKeysAdded(); + + if (!remap) + add(keyLockFut); + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } + + tx.addEntryMapping(mappings.values()); + + cctx.mvcc().recheckPendingLocks(); + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + for (GridDistributedTxMapping m : mappings.values()) { + assert !m.empty(); + + MiniFuture fut = new MiniFuture(m); + + add(fut); + } + + Collection> futs = (Collection)futures(); + + Iterator> it = futs.iterator(); + + while (it.hasNext()) { + IgniteInternalFuture fut0 = it.next(); + + if (skipFuture(remap, fut0)) + continue; + + MiniFuture fut = (MiniFuture)fut0; + + IgniteCheckedException err = prepare(fut); + + if (err != null) { + while (it.hasNext()) { + fut0 = it.next(); + + if (skipFuture(remap, fut0)) + continue; + + fut = (MiniFuture)fut0; + + tx.removeMapping(fut.mapping().node().id()); + + fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err)); + } + + break; + } + } + + markInitialized(); + } + + /** + * @param remap Remap flag. + * @param fut Future. + * @return {@code True} if skip future during remap. + */ + private boolean skipFuture(boolean remap, IgniteInternalFuture fut) { + return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get()); + } + + /** + * @param fut Mini future. + * @return Prepare error if any. + */ + @Nullable private IgniteCheckedException prepare(final MiniFuture fut) { + GridDistributedTxMapping m = fut.mapping(); + + final ClusterNode n = m.node(); + + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + m.reads(), + m.writes(), + m.near(), + txMapping.transactionNodes(), + m.last(), + m.lastBackups(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash(), + m.clientFirst(), + tx.activeCachesDeploymentEnabled()); + + for (IgniteTxEntry txEntry : m.writes()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } + + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(F.concat(false, m.writes(), m.reads())); + + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + + return e; + } + } + + req.miniId(fut.futureId()); + + // If this is the primary node for the keys. + if (n.isLocal()) { + IgniteInternalFuture prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + + prepFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture prepFut) { + try { + fut.onResult(prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } + } + }); + } + else { + try { + cctx.io().send(n, req, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + + fut.onNodeLeft(e); + + return e; + } + catch (IgniteCheckedException e) { + fut.onResult(e); + + return e; + } + } + + return null; + } + + /** + * @param entry Transaction entry. + * @param topVer Topology version. + * @param curMapping Current mapping. + * @param remap Remap flag. + * @param topLocked Topology locked flag. + */ + private void map( + IgniteTxEntry entry, + AffinityTopologyVersion topVer, + Map, GridDistributedTxMapping> curMapping, + boolean remap, + boolean topLocked + ) { + GridCacheContext cacheCtx = entry.context(); + + List nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + + txMapping.addMapping(nodes); + + ClusterNode primary = F.first(nodes); + + assert primary != null; + + if (log.isDebugEnabled()) { + log.debug("Mapped key to primary node [key=" + entry.key() + + ", part=" + cacheCtx.affinity().partition(entry.key()) + + ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); + } + + if (primary.version().compareTo(SER_TX_SINCE) < 0) { + onDone(new IgniteCheckedException("Optimistic serializable transactions can be used only with node " + + "version starting from " + SER_TX_SINCE)); + + return; + } + + // Must re-initialize cached entry while holding topology lock. + if (cacheCtx.isNear()) + entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); + else if (!cacheCtx.isLocal()) + entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true)); + else + entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); + + if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) { + if (entry.explicitVersion() == null) + keyLockFut.addLockKey(entry.txKey()); + } + + IgniteBiTuple key = F.t(primary, cacheCtx.isNear()); + + GridDistributedTxMapping cur = curMapping.get(key); + + if (cur == null) { + cur = new GridDistributedTxMapping(primary); + + curMapping.put(key, cur); + + if (primary.isLocal()) { + if (entry.context().isNear()) + tx.nearLocallyMapped(true); + else if (entry.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + // Initialize near flag right away. + cur.near(cacheCtx.isNear()); + + cur.clientFirst(!topLocked && cctx.kernalContext().clientNode()); + + cur.last(true); + } + + cur.add(entry); + + if (entry.explicitVersion() != null) { + tx.markExplicit(primary.id()); + + cur.markExplicitLock(); + } + + entry.nodeId(primary.id()); + + if (cacheCtx.isNear()) { + while (true) { + try { + GridNearCacheEntry cached = (GridNearCacheEntry)entry.cached(); + + cached.dhtNodeId(tx.xidVersion(), primary.id()); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + entry.cached(cacheCtx.near().entryEx(entry.key())); + } + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + Collection futs = F.viewReadOnly(futures(), + new C1, String>() { + @Override public String apply(IgniteInternalFuture f) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + }, + new P1>() { + @Override public boolean apply(IgniteInternalFuture f) { + return isMini(f); + } + }); + + return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this, + "innerFuts", futs, + "keyLockFut", keyLockFut, + "tx", tx, + "super", super.toString()); + } + + /** + * + */ + private class ClientRemapFuture extends GridCompoundFuture { + /** */ + private boolean remap = true; + + /** + * + */ + public ClientRemapFuture() { + super(); + + reducer(new IgniteReducer() { + @Override public boolean collect(GridNearTxPrepareResponse res) { + assert res != null; + + if (res.clientRemapVersion() == null) + remap = false; + + return true; + } + + @Override public Boolean reduce() { + return remap; + } + }); + } + } + + /** + * + */ + private class MiniFuture extends GridFutureAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteUuid futId = IgniteUuid.randomUuid(); + + /** Keys. */ + @GridToStringInclude + private GridDistributedTxMapping m; + + /** Flag to signal some result being processed. */ + private AtomicBoolean rcvRes = new AtomicBoolean(false); + + /** + * @param m Mapping. + */ + MiniFuture(GridDistributedTxMapping m) { + this.m = m; + } + + /** + * @return Future ID. + */ + IgniteUuid futureId() { + return futId; + } + + /** + * @return Node ID. + */ + public ClusterNode node() { + return m.node(); + } + + /** + * @return Keys. + */ + public GridDistributedTxMapping mapping() { + return m; + } + + /** + * @param e Error. + */ + void onResult(Throwable e) { + if (rcvRes.compareAndSet(false, true)) { + onError(m, e); + + if (log.isDebugEnabled()) + log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); + + // Fail. + onDone(e); + } + else + U.warn(log, "Received error after another result has been processed [fut=" + + GridNearOptimisticSerializableTxPrepareFuture.this + ", mini=" + this + ']', e); + } + + /** + * @param e Node failure. + */ + void onNodeLeft(ClusterTopologyCheckedException e) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + this); + + onError(null, e); + + onDone(e); + } + } + + /** + * @param res Result callback. + */ + @SuppressWarnings("unchecked") + void onResult(final GridNearTxPrepareResponse res) { + if (isDone()) + return; + + if (rcvRes.compareAndSet(false, true)) { + if (res.error() != null) { + // Fail the whole compound future. + onError(m, res.error()); + + onDone(res.error()); + } + else { + if (res.clientRemapVersion() != null) { + assert cctx.kernalContext().clientNode(); + assert m.clientFirst(); + + tx.removeMapping(m.node().id()); + + ClientRemapFuture remapFut0 = null; + + synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) { + if (remapFut == null) { + remapFut = new ClientRemapFuture(); + + remapFut0 = remapFut; + } + } + + if (remapFut0 != null) { + Collection> futs = (Collection)futures(); + + for (IgniteInternalFuture fut : futs) { + if (isMini(fut) && fut != this) + remapFut0.add((MiniFuture)fut); + } + + remapFut0.markInitialized(); + + remapFut0.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture remapFut0) { + try { + IgniteInternalFuture affFut = + cctx.exchange().affinityReadyFuture(res.clientRemapVersion()); + + if (affFut == null) + affFut = new GridFinishedFuture(); + + if (remapFut.get()) { + if (log.isDebugEnabled()) { + log.debug("Will remap client tx [" + + "fut=" + GridNearOptimisticSerializableTxPrepareFuture.this + + ", topVer=" + res.topologyVersion() + ']'); + } + + synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) { + assert remapFut0 == remapFut; + + remapFut = null; + } + + affFut.listen(new CI1>() { + @Override public void apply(IgniteInternalFuture affFut) { + try { + affFut.get(); + + remap(res); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + else { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException( + "Cluster topology changed while client transaction is preparing."); + + err.retryReadyFuture(affFut); + + onDone(err); + } + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) { + log.debug("Prepare failed, will not remap tx: " + + GridNearOptimisticSerializableTxPrepareFuture.this); + } + + onDone(e); + } + } + }); + } + else + onDone(res); + } + else { + onPrepareResponse(m, res); + + // Finish this mini future (need result only on client node). + onDone(cctx.kernalContext().clientNode() ? res : null); + } + } + } + } + + /** + * @param res Response. + */ + private void remap(final GridNearTxPrepareResponse res) { + prepareOnTopology(true, new Runnable() { + @Override public void run() { + onDone(res); + } + }); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); + } + } + + /** + * Keys lock future. + */ + private class KeyLockFuture extends GridFutureAdapter { + /** */ + @GridToStringInclude + private Collection lockKeys = new GridConcurrentHashSet<>(); + + /** */ + private volatile boolean allKeysAdded; + + /** + * @param key Key to track for locking. + */ + private void addLockKey(IgniteTxKey key) { + assert !allKeysAdded; + + lockKeys.add(key); + } + + /** + * @param key Locked keys. + */ + private void onKeyLocked(IgniteTxKey key) { + lockKeys.remove(key); + + checkLocks(); + } + + /** + * Moves future to the ready state. + */ + private void onAllKeysAdded() { + allKeysAdded = true; + + checkLocks(); + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { + boolean locked = lockKeys.isEmpty(); + + if (locked && allKeysAdded) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + + onDone((GridNearTxPrepareResponse)null); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); + } + + return locked; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(KeyLockFuture.class, this, super.toString()); + } + } +}