Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E6B818B9A for ; Mon, 6 Jul 2015 08:49:55 +0000 (UTC) Received: (qmail 44025 invoked by uid 500); 6 Jul 2015 08:49:55 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 43993 invoked by uid 500); 6 Jul 2015 08:49:55 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 43984 invoked by uid 99); 6 Jul 2015 08:49:55 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 08:49:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id BAB181A67F0 for ; Mon, 6 Jul 2015 08:49:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id o8jonk3nWvh8 for ; Mon, 6 Jul 2015 08:49:47 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id EB050275F7 for ; Mon, 6 Jul 2015 08:49:40 +0000 (UTC) Received: (qmail 42985 invoked by uid 99); 6 Jul 2015 08:49:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 08:49:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 97C5AE022E; Mon, 6 Jul 2015 08:49:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 06 Jul 2015 08:49:55 -0000 Message-Id: <85daa8f96fad4366b825210122cc6ac6@git.apache.org> In-Reply-To: <60262fe8c46c4de081929dbe6fc643c7@git.apache.org> References: <60262fe8c46c4de081929dbe6fc643c7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/50] incubator-ignite git commit: IGNITE-621 - Fixing remap logic. IGNITE-621 - Fixing remap logic. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c2c90b52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c2c90b52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c2c90b52 Branch: refs/heads/ignite-752 Commit: c2c90b52972bec53919d97ec07d2aeab4d0d55e8 Parents: a9d0662 Author: Alexey Goncharuk Authored: Thu Jun 25 17:06:31 2015 -0700 Committer: Alexey Goncharuk Committed: Thu Jun 25 17:06:31 2015 -0700 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../processors/cache/GridCacheAdapter.java | 2 +- .../processors/cache/GridCacheAtomicFuture.java | 12 ++- .../processors/cache/GridCacheMvccManager.java | 8 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 88 ++++++++++++++++++-- .../communication/tcp/TcpCommunicationSpi.java | 2 +- ...eAtomicInvalidPartitionHandlingSelfTest.java | 7 +- 8 files changed, 110 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 542fa30..40fc873 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -343,6 +343,9 @@ public final class IgniteSystemProperties { /** Maximum size for affinity assignment history. */ public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE"; + /** Number of cache operation retries in case of topology exceptions. */ + public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT"; + /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index f993527..e138520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -79,7 +79,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> stash = new ThreadLocal extends GridCacheFuture { /** - * @return {@code True} if partition exchange should wait for this future to complete. + * @return Future topology version. */ - public boolean waitForPartitionExchange(); + public AffinityTopologyVersion topologyVersion(); /** - * @return Future topology version. + * Gets future that will be completed when it is safe when update is finished on the given version of topology. + * + * @param topVer Topology version to finish. + * @return Future or {@code null} if no need to wait. */ - public AffinityTopologyVersion topologyVersion(); + public IgniteInternalFuture completeFuture(AffinityTopologyVersion topVer); /** * @return Future keys. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c528e08..f24cf01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -338,7 +338,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture fut) { IgniteInternalFuture old = atomicFuts.put(futVer, fut); - assert old == null; + assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; } /** @@ -1002,8 +1002,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { res.ignoreChildFailures(ClusterTopologyCheckedException.class, CachePartialUpdateCheckedException.class); for (GridCacheAtomicFuture fut : atomicFuts.values()) { - if (fut.waitForPartitionExchange() && fut.topologyVersion().compareTo(topVer) < 0) - res.add((IgniteInternalFuture)fut); + IgniteInternalFuture complete = fut.completeFuture(topVer); + + if (complete != null) + res.add((IgniteInternalFuture)complete); } res.markInitialized(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index ff8454e..37b57e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; @@ -170,13 +171,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter } /** {@inheritDoc} */ - @Override public boolean waitForPartitionExchange() { - return waitForExchange; + @Override public AffinityTopologyVersion topologyVersion() { + return updateReq.topologyVersion(); } /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion() { - return updateReq.topologyVersion(); + @Override public IgniteInternalFuture completeFuture(AffinityTopologyVersion topVer) { + if (waitForExchange && topologyVersion().compareTo(topVer) < 0) + return this; + + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 536eb40..ea9b335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -105,7 +105,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter private final ExpiryPolicy expiryPlc; /** Future map topology version. */ - private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** Completion future for a particular topology version. */ + private GridFutureAdapter topCompleteFut; /** Optional filter. */ private final CacheEntryPredicate[] filter; @@ -246,8 +249,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); } - /** {@inheritDoc} */ - @Override public boolean waitForPartitionExchange() { + /** + * @return {@code True} if this future should block partition map exchange. + */ + private boolean waitForPartitionExchange() { // Wait fast-map near atomic update futures in CLOCK mode. return fastMap; } @@ -323,13 +328,36 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter else { topLocked = true; - // Cannot remap. - remapCnt.set(1); + synchronized (this) { + this.topVer = topVer; + + // Cannot remap. + remapCnt.set(1); + } map0(topVer, null, false, null); } } + /** {@inheritDoc} */ + @Override public IgniteInternalFuture completeFuture(AffinityTopologyVersion topVer) { + if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) { + synchronized (this) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; + + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); + + return topCompleteFut; + } + } + } + + return null; + } + /** * @param failed Keys to remap. */ @@ -339,14 +367,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter Collection remapKeys = new ArrayList<>(failed.size()); Collection remapVals = vals != null ? new ArrayList<>(failed.size()) : null; + Collection remapConflictPutVals = conflictPutVals != null ? new ArrayList(failed.size()) : null; + Collection remapConflictRmvVals = conflictRmvVals != null ? new ArrayList(failed.size()) : null; Iterator keyIt = keys.iterator(); Iterator valsIt = vals != null ? vals.iterator() : null; + Iterator conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null; + Iterator conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null; for (Object key : failed) { while (keyIt.hasNext()) { Object nextKey = keyIt.next(); Object nextVal = valsIt != null ? valsIt.next() : null; + GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null; + GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null; if (F.eq(key, nextKey)) { remapKeys.add(nextKey); @@ -354,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (remapVals != null) remapVals.add(nextVal); + if (remapConflictPutVals != null) + remapConflictPutVals.add(nextConflictPutVal); + + if (remapConflictRmvVals != null) + remapConflictRmvVals.add(nextConflictRmvVal); + break; } } @@ -361,13 +401,29 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter keys = remapKeys; vals = remapVals; + conflictPutVals = remapConflictPutVals; + conflictRmvVals = remapConflictRmvVals; - mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); single = null; futVer = null; err = null; opRes = null; - topVer = AffinityTopologyVersion.ZERO; + + GridFutureAdapter fut0; + + synchronized (this) { + mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); + + topVer = AffinityTopologyVersion.ZERO; + + fut0 = topCompleteFut; + + topCompleteFut = null; + } + + if (fut0 != null) + fut0.onDone(); + singleNodeId = null; singleReq = null; fastMapRemap = false; @@ -405,6 +461,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (futVer != null) cctx.mvcc().removeAtomicFuture(version()); + GridFutureAdapter fut0; + + synchronized (this) { + fut0 = topCompleteFut; + } + + if (fut0 != null) + fut0.onDone(); + return true; } @@ -544,6 +609,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter return; } + + synchronized (this) { + this.topVer = topVer; + } } finally { cache.topology().readUnlock(); @@ -559,7 +628,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter boolean remap = false; synchronized (this) { - if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { + if (topVer != AffinityTopologyVersion.ZERO && + ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) { CachePartialUpdateCheckedException err0 = err; if (err0 != null) @@ -1040,7 +1110,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (err0 == null) err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); - List keys = new ArrayList<>(failedKeys.size()); + Collection keys = new ArrayList<>(failedKeys.size()); for (KeyCacheObject key : failedKeys) keys.add(key.value(cctx.cacheObjectContext(), false)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index addf243d..4ca2995 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -210,7 +210,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final int DFLT_ACK_SND_THRESHOLD = 16; /** Default socket write timeout. */ - public static final long DFLT_SOCK_WRITE_TIMEOUT = 200; + public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000; /** No-op runnable. */ private static final IgniteRunnable NOOP = new IgniteRunnable() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c2c90b52/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index 054a110..b255558 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -41,6 +41,7 @@ import org.jsr166.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; import static org.apache.ignite.cache.CacheMode.*; @@ -236,6 +237,8 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA System.err.println("FINISHED PUTS"); + GridCacheMapEntry.debug = true; + // Start put threads. IgniteInternalFuture fut = multithreadedAsync(new Callable() { @Override public Object call() throws Exception { @@ -340,12 +343,12 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA } catch (AssertionError e) { if (r == 9) { - System.err.println("Failed to verify cache contents: " + e.getMessage()); + info("Failed to verify cache contents: " + e.getMessage()); throw e; } - System.err.println("Failed to verify cache contents, will retry: " + e.getMessage()); + info("Failed to verify cache contents, will retry: " + e.getMessage()); // Give some time to finish async updates. U.sleep(1000);