Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5995617E29 for ; Wed, 10 Jun 2015 16:28:08 +0000 (UTC) Received: (qmail 27900 invoked by uid 500); 10 Jun 2015 16:28:08 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 27868 invoked by uid 500); 10 Jun 2015 16:28:08 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 27859 invoked by uid 99); 10 Jun 2015 16:28:08 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 16:28:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 9E1CD183C47 for ; Wed, 10 Jun 2015 16:28:07 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.77 X-Spam-Level: * X-Spam-Status: No, score=1.77 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id wEfj_oSLToZ8 for ; Wed, 10 Jun 2015 16:27:43 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id A3FC027650 for ; Wed, 10 Jun 2015 16:27:25 +0000 (UTC) Received: (qmail 23563 invoked by uid 99); 10 Jun 2015 16:27:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 16:27:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7CB3DE045F; Wed, 10 Jun 2015 16:27:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 10 Jun 2015 16:27:47 -0000 Message-Id: <6c3c5e4293aa45c4bdb2f215023e4ab7@git.apache.org> In-Reply-To: <8eb7a652192c4f07b1b9bed4629f3c65@git.apache.org> References: <8eb7a652192c4f07b1b9bed4629f3c65@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/31] incubator-ignite git commit: ignite-471-2: huge merge from sprint-6 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java index 78966d0..1d57ef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java @@ -80,7 +80,7 @@ public final class GridDhtForceKeysFuture extends GridCompoundFuture preloader; + private GridDhtPreloader preloader; /** Trackable flag. */ private boolean trackable; @@ -95,7 +95,7 @@ public final class GridDhtForceKeysFuture extends GridCompoundFuture cctx, AffinityTopologyVersion topVer, Collection keys, - GridDhtPreloader preloader + GridDhtPreloader preloader ) { assert topVer.topologyVersion() != 0 : topVer; assert !F.isEmpty(keys) : keys; @@ -208,21 +208,21 @@ public final class GridDhtForceKeysFuture extends GridCompoundFuture keys, Collection exc) { - Map> mappings = new HashMap<>(); - - ClusterNode loc = cctx.localNode(); - - int curTopVer = topCntr.get(); + Map> mappings = null; for (KeyCacheObject key : keys) - map(key, mappings, exc); + mappings = map(key, mappings, exc); if (isDone()) return false; boolean ret = false; - if (!mappings.isEmpty()) { + if (mappings != null) { + ClusterNode loc = cctx.localNode(); + + int curTopVer = topCntr.get(); + preloader.addFuture(this); trackable = true; @@ -275,22 +275,27 @@ public final class GridDhtForceKeysFuture extends GridCompoundFuture> mappings, Collection exc) { + private Map> map(KeyCacheObject key, + @Nullable Map> mappings, + Collection exc) + { ClusterNode loc = cctx.localNode(); - int part = cctx.affinity().partition(key); - GridCacheEntryEx e = cctx.dht().peekEx(key); try { if (e != null && !e.isNewLocked()) { - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { + int part = cctx.affinity().partition(key); + log.debug("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() + ", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']'); + } // Key has been rebalanced or retrieved already. - return; + return mappings; } } catch (GridCacheEntryRemovedException ignore) { @@ -299,6 +304,8 @@ public final class GridDhtForceKeysFuture extends GridCompoundFuture { +public class GridDhtPartitionDemandPool { /** Dummy message to wake up a blocking queue if a node leaves. */ private final SupplyMessage DUMMY_TOP = new SupplyMessage(); /** */ - private final GridCacheContext cctx; + private final GridCacheContext cctx; /** */ private final IgniteLogger log; @@ -99,7 +99,7 @@ public class GridDhtPartitionDemandPool { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - public GridDhtPartitionDemandPool(GridCacheContext cctx, ReadWriteLock busyLock) { + public GridDhtPartitionDemandPool(GridCacheContext cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; @@ -108,9 +108,11 @@ public class GridDhtPartitionDemandPool { log = cctx.logger(getClass()); - poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; + boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode(); - if (poolSize > 0) { + poolSize = enabled ? cctx.config().getRebalanceThreadPoolSize() : 0; + + if (enabled) { barrier = new CyclicBarrier(poolSize); dmdWorkers = new ArrayList<>(poolSize); @@ -327,7 +329,7 @@ public class GridDhtPartitionDemandPool { * @param assigns Assignments. * @param force {@code True} if dummy reassign. */ - void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) { + void addAssignments(final GridDhtPreloaderAssignments assigns, boolean force) { if (log.isDebugEnabled()) log.debug("Adding partition assignments: " + assigns); @@ -399,7 +401,7 @@ public class GridDhtPartitionDemandPool { private int id; /** Partition-to-node assignments. */ - private final LinkedBlockingDeque> assignQ = new LinkedBlockingDeque<>(); + private final LinkedBlockingDeque assignQ = new LinkedBlockingDeque<>(); /** Message queue. */ private final LinkedBlockingDeque msgQ = @@ -425,7 +427,7 @@ public class GridDhtPartitionDemandPool { /** * @param assigns Assignments. */ - void addAssignments(GridDhtPreloaderAssignments assigns) { + void addAssignments(GridDhtPreloaderAssignments assigns) { assert assigns != null; assignQ.offer(assigns); @@ -885,7 +887,7 @@ public class GridDhtPartitionDemandPool { } // Sync up all demand threads at this step. - GridDhtPreloaderAssignments assigns = null; + GridDhtPreloaderAssignments assigns = null; while (assigns == null) assigns = poll(assignQ, cctx.gridConfig().getNetworkTimeout(), this); @@ -995,12 +997,12 @@ public class GridDhtPartitionDemandPool { * @param exchFut Exchange future. * @return Assignments of partitions to nodes. */ - GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { + GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. GridDhtPartitionTopology top = cctx.dht().topology(); if (!cctx.rebalanceEnabled()) - return new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); int partCnt = cctx.affinity().partitions(); @@ -1009,7 +1011,7 @@ public class GridDhtPartitionDemandPool { "Topology version mismatch [exchId=" + exchFut.exchangeId() + ", topVer=" + top.topologyVersion() + ']'; - GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments<>(exchFut, top.topologyVersion()); + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); AffinityTopologyVersion topVer = assigns.topologyVersion(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index facf7e3..faa6cf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -237,7 +237,7 @@ public class GridDhtPartitionMap implements Comparable, Ext * @return Full string representation. */ public String toFullString() { - return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", super.toString()); + return S.toString(GridDhtPartitionMap.class, this, "size", size(), "map", map.toString()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 5d9677d..13cfef3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -43,9 +43,9 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh /** * Thread pool for supplying partitions to demanding nodes. */ -class GridDhtPartitionSupplyPool { +class GridDhtPartitionSupplyPool { /** */ - private final GridCacheContext cctx; + private final GridCacheContext cctx; /** */ private final IgniteLogger log; @@ -72,7 +72,7 @@ class GridDhtPartitionSupplyPool { * @param cctx Cache context. * @param busyLock Shutdown lock. */ - GridDhtPartitionSupplyPool(GridCacheContext cctx, ReadWriteLock busyLock) { + GridDhtPartitionSupplyPool(GridCacheContext cctx, ReadWriteLock busyLock) { assert cctx != null; assert busyLock != null; @@ -83,16 +83,18 @@ class GridDhtPartitionSupplyPool { top = cctx.dht().topology(); - int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; + if (!cctx.kernalContext().clientNode()) { + int poolSize = cctx.rebalanceEnabled() ? cctx.config().getRebalanceThreadPoolSize() : 0; - for (int i = 0; i < poolSize; i++) - workers.add(new SupplyWorker()); + for (int i = 0; i < poolSize; i++) + workers.add(new SupplyWorker()); - cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2() { - @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { - processDemandMessage(id, m); - } - }); + cctx.io().addHandler(cctx.cacheId(), GridDhtPartitionDemandMessage.class, new CI2() { + @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { + processDemandMessage(id, m); + } + }); + } depEnabled = cctx.gridDeploy().enabled(); } @@ -248,11 +250,6 @@ class GridDhtPartitionSupplyPool { boolean ack = false; try { - // Partition map exchange is finished which means that all near transactions with given - // topology version are committed. We can wait for local locks here as it will not take - // much time. - cctx.mvcc().finishLocks(d.topologyVersion()).get(); - for (int part : d.partitions()) { GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4b8db00..9f18c98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -44,6 +44,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.util.concurrent.locks.*; +import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** @@ -117,8 +119,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter initFut; /** Topology snapshot. */ - private AtomicReference topSnapshot = - new AtomicReference<>(); + private AtomicReference topSnapshot = new AtomicReference<>(); /** Last committed cache version before next topology version use. */ private AtomicReference lastVer = new AtomicReference<>(); @@ -146,8 +147,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter reqs; + /** Cache validation results. */ private volatile Map cacheValidRes; + /** Skip preload flag. */ + private boolean skipPreload; + /** * Dummy future created to trigger reassignments if partition * topology changed while preloading. @@ -200,6 +205,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter(); if (log.isDebugEnabled()) - log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + - ", fut=" + this + ']'); + log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); + } + + /** + * @param reqs Cache change requests. + */ + public void cacheChangeRequests(Collection reqs) { + this.reqs = reqs; } /** {@inheritDoc} */ @@ -250,6 +257,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter cacheCtx = cctx.cacheContext(cacheId); + + return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer); } /** @@ -312,7 +329,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter cachesWithoutNodes = null; - for (String name : cctx.cache().cacheNames()) { - if (exchId.isLeft()) { + if (exchId.isLeft()) { + for (String name : cctx.cache().cacheNames()) { if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { if (cachesWithoutNodes == null) cachesWithoutNodes = new ArrayList<>(); @@ -505,7 +622,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter(CU.aliveRemoteCacheNodes(cctx, + rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, exchId.topologyVersion())); rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); @@ -591,6 +709,28 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion()); + + while (true) { + try { + locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + U.warn(log, "Failed to wait for locks release future. " + + "Dumping pending objects that might be the cause: " + cctx.localNodeId()); + + U.warn(log, "Locked entries:"); + + Map> locks = + cctx.mvcc().unfinishedLocks(exchId.topologyVersion()); + + for (Map.Entry> e : locks.entrySet()) + U.warn(log, "Locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); + } + } + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -650,36 +790,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter m = new HashMap<>(); + Map m = null; for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) + if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) { + if (m == null) + m = new HashMap<>(); + m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes())); + } } - cacheValidRes = m; + cacheValidRes = m != null ? m : Collections.emptyMap(); cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err); @@ -864,8 +1005,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter extends GridCachePreloaderAdapter { +public class GridDhtPreloader extends GridCachePreloaderAdapter { /** Default preload resend timeout. */ public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500; @@ -57,13 +58,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private final GridAtomicLong topVer = new GridAtomicLong(); /** Force key futures. */ - private final ConcurrentMap> forceKeyFuts = newMap(); + private final ConcurrentMap> forceKeyFuts = newMap(); /** Partition suppliers. */ - private GridDhtPartitionSupplyPool supplyPool; + private GridDhtPartitionSupplyPool supplyPool; /** Partition demanders. */ - private GridDhtPartitionDemandPool demandPool; + private GridDhtPartitionDemandPool demandPool; /** Start future. */ private final GridFutureAdapter startFut; @@ -92,7 +93,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { assert !loc.id().equals(n.id()); - for (GridDhtForceKeysFuture f : forceKeyFuts.values()) + for (GridDhtForceKeysFuture f : forceKeyFuts.values()) f.onDiscoveryEvent(e); assert e.type() != EVT_NODE_JOINED || n.order() > loc.order() : "Node joined with smaller-than-local " + @@ -117,7 +118,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** * @param cctx Cache context. */ - public GridDhtPreloader(GridCacheContext cctx) { + public GridDhtPreloader(GridCacheContext cctx) { super(cctx); top = cctx.dht().topology(); @@ -158,8 +159,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } }); - supplyPool = new GridDhtPartitionSupplyPool<>(cctx, busyLock); - demandPool = new GridDhtPartitionDemandPool<>(cctx, busyLock); + supplyPool = new GridDhtPartitionSupplyPool(cctx, busyLock); + demandPool = new GridDhtPartitionDemandPool(cctx, busyLock); cctx.events().addListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED); } @@ -227,12 +228,14 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { final long start = U.currentTimeMillis(); - if (cctx.config().getRebalanceDelay() >= 0) { - U.log(log, "Starting rebalancing in " + cctx.config().getRebalanceMode() + " mode: " + cctx.name()); + final CacheConfiguration cfg = cctx.config(); + + if (cfg.getRebalanceDelay() >= 0) { + U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); demandPool.syncFuture().listen(new CI1() { @Override public void apply(Object t) { - U.log(log, "Completed rebalancing in " + cctx.config().getRebalanceMode() + " mode " + + U.log(log, "Completed rebalancing in " + cfg.getRebalanceMode() + " mode " + "[cache=" + cctx.name() + ", time=" + (U.currentTimeMillis() - start) + " ms]"); } }); @@ -253,12 +256,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { return demandPool.assign(exchFut); } /** {@inheritDoc} */ - @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { + @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) { demandPool.addAssignments(assignments, forcePreload); } @@ -271,7 +274,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { /** {@inheritDoc} */ @Override public IgniteInternalFuture syncFuture() { - return demandPool.syncFuture(); + return cctx.kernalContext().clientNode() ? startFut : demandPool.syncFuture(); } /** @@ -406,7 +409,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { return; try { - GridDhtForceKeysFuture f = forceKeyFuts.get(msg.futureId()); + GridDhtForceKeysFuture f = forceKeyFuts.get(msg.futureId()); if (f != null) f.onResult(node.id(), msg); @@ -491,7 +494,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { */ @SuppressWarnings( {"unchecked", "RedundantCast"}) @Override public GridDhtFuture request(Collection keys, AffinityTopologyVersion topVer) { - final GridDhtForceKeysFuture fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); + final GridDhtForceKeysFuture fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys, this); IgniteInternalFuture topReadyFut = cctx.affinity().affinityReadyFuturex(topVer); @@ -543,7 +546,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * * @param fut Future to add. */ - void addFuture(GridDhtForceKeysFuture fut) { + void addFuture(GridDhtForceKeysFuture fut) { forceKeyFuts.put(fut.futureId(), fut); } @@ -552,7 +555,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { * * @param fut Future to remove. */ - void remoteFuture(GridDhtForceKeysFuture fut) { + void remoteFuture(GridDhtForceKeysFuture fut) { forceKeyFuts.remove(fut.futureId(), fut); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 369fc68..2f6ef6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -27,8 +27,7 @@ import java.util.concurrent.*; /** * Partition to node assignments. */ -public class GridDhtPreloaderAssignments extends - ConcurrentHashMap { +public class GridDhtPreloaderAssignments extends ConcurrentHashMap { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index ba3357d..041f83a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -433,6 +433,11 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { } /** {@inheritDoc} */ + @Nullable @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException { + return dht.tryPutIfAbsent(key, val); + } + + /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) throws IgniteCheckedException { return dht.getAndReplace(key, val); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 8258b14..351d6cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -95,7 +95,7 @@ public abstract class GridNearCacheAdapter extends GridDistributedCacheAda } /** {@inheritDoc} */ - @Override public GridCachePreloader preloader() { + @Override public GridCachePreloader preloader() { return dht().preloader(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index fc178e3..74438bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -274,7 +274,7 @@ public final class GridNearGetFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture +public final class GridNearLockFuture extends GridCompoundIdentityFuture implements GridCacheMvccFuture { /** */ private static final long serialVersionUID = 0L; @@ -58,7 +58,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture cctx; + private GridCacheContext cctx; /** Lock owner thread. */ @GridToStringInclude @@ -135,7 +135,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture cctx, + GridCacheContext cctx, Collection keys, @Nullable GridNearTxLocal tx, boolean read, @@ -184,15 +184,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture f) { - if (isMini(f)) - return ((MiniFuture)f).node(); + return F.viewReadOnly(futures(), new IgniteClosure, ClusterNode>() { + @Nullable @Override public ClusterNode apply(IgniteInternalFuture f) { + if (isMini(f)) + return ((MiniFuture)f).node(); - return cctx.discovery().localNode(); - } - }); + return cctx.discovery().localNode(); + } + }); } /** {@inheritDoc} */ @@ -350,13 +349,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture extends GridCompoundIdentityFuture mappings = - new ConcurrentLinkedDeque8<>(); + boolean clientNode = cctx.kernalContext().clientNode(); + + assert !remap || (clientNode && (tx == null || !tx.hasRemoteLocks())); + + ConcurrentLinkedDeque8 mappings = new ConcurrentLinkedDeque8<>(); // Assign keys to primary nodes. GridNearLockMapping map = null; @@ -795,6 +809,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture oldValTup = valMap.get(entry.key()); + try { + if (res.dhtVersion(i) == null) { + onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " + + "(will fail the lock): " + res)); - CacheObject oldVal = entry.rawGet(); - boolean hasOldVal = false; - CacheObject newVal = res.value(i); + return; + } - boolean readRecordable = false; + IgniteBiTuple oldValTup = valMap.get(entry.key()); - if (retval) { - readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + CacheObject oldVal = entry.rawGet(); + boolean hasOldVal = false; + CacheObject newVal = res.value(i); - if (readRecordable) - hasOldVal = entry.hasValue(); - } + boolean readRecordable = false; - GridCacheVersion dhtVer = res.dhtVersion(i); - GridCacheVersion mappedVer = res.mappedVersion(i); + if (retval) { + readRecordable = cctx.events().isRecordable(EVT_CACHE_OBJECT_READ); + + if (readRecordable) + hasOldVal = entry.hasValue(); + } - if (newVal == null) { - if (oldValTup != null) { - if (oldValTup.get1().equals(dhtVer)) - newVal = oldValTup.get2(); + GridCacheVersion dhtVer = res.dhtVersion(i); + GridCacheVersion mappedVer = res.mappedVersion(i); - oldVal = oldValTup.get2(); + if (newVal == null) { + if (oldValTup != null) { + if (oldValTup.get1().equals(dhtVer)) + newVal = oldValTup.get2(); + + oldVal = oldValTup.get2(); + } } - } - // Lock is held at this point, so we can set the - // returned value if any. - entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); + // Lock is held at this point, so we can set the + // returned value if any. + entry.resetFromPrimary(newVal, lockVer, dhtVer, node.id(), topVer); - if (inTx() && implicitTx() && tx.onePhaseCommit()) { - boolean pass = res.filterResult(i); + if (inTx()) { + tx.hasRemoteLocks(true); - tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); - } + if (implicitTx() && tx.onePhaseCommit()) { + boolean pass = res.filterResult(i); - entry.readyNearLock(lockVer, mappedVer, res.committedVersions(), res.rolledbackVersions(), - res.pending()); - - if (retval) { - if (readRecordable) - cctx.events().addEvent( - entry.partition(), - entry.key(), - tx, - null, - EVT_CACHE_OBJECT_READ, - newVal, - newVal != null, - oldVal, - hasOldVal, - CU.subjectId(tx, cctx.shared()), - null, - inTx() ? tx.resolveTaskName() : null); - - if (cctx.cache().configuration().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(false); - } + tx.entry(cctx.txKey(k)).filters(pass ? CU.empty0() : CU.alwaysFalse0Arr()); + } + } - if (log.isDebugEnabled()) - log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); + entry.readyNearLock(lockVer, + mappedVer, + res.committedVersions(), + res.rolledbackVersions(), + res.pending()); + + if (retval) { + if (readRecordable) + cctx.events().addEvent( + entry.partition(), + entry.key(), + tx, + null, + EVT_CACHE_OBJECT_READ, + newVal, + newVal != null, + oldVal, + hasOldVal, + CU.subjectId(tx, cctx.shared()), + null, + inTx() ? tx.resolveTaskName() : null); + + if (cctx.cache().configuration().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(false); + } - break; // Inner while loop. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Failed to add candidates because entry was removed (will renew)."); + if (log.isDebugEnabled()) + log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']'); - // Replace old entry with new one. - entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + break; // Inner while loop. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Failed to add candidates because entry was removed (will renew)."); + + // Replace old entry with new one. + entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key())); + } } + + i++; } - i++; - } + try { + proceedMapping(mappings); + } + catch (IgniteCheckedException e) { + onDone(e); + } - try { - proceedMapping(mappings); - } - catch (IgniteCheckedException e) { - onDone(e); + onDone(true); } - - onDone(true); } } + /** + * + */ + private void remap() { + undoLocks(false, false); + + mapOnTopology(true); + + onDone(true); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString());