Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3DB3A18620 for ; Fri, 3 Jul 2015 15:33:16 +0000 (UTC) Received: (qmail 62535 invoked by uid 500); 3 Jul 2015 15:33:16 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 62501 invoked by uid 500); 3 Jul 2015 15:33:16 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 62492 invoked by uid 99); 3 Jul 2015 15:33:16 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 15:33:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 99CBCD1F10 for ; Fri, 3 Jul 2015 15:33:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.23 X-Spam-Level: * X-Spam-Status: No, score=1.23 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.571, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id NIjF0xiTshHE for ; Fri, 3 Jul 2015 15:33:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 9C3C64C16F for ; Fri, 3 Jul 2015 15:32:52 +0000 (UTC) Received: (qmail 60905 invoked by uid 99); 3 Jul 2015 15:32:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2015 15:32:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DB98FE3679; Fri, 3 Jul 2015 15:32:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 03 Jul 2015 15:32:57 -0000 Message-Id: In-Reply-To: <9345ed959fad4b3b89436bc332f06d8e@git.apache.org> References: <9345ed959fad4b3b89436bc332f06d8e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/28] incubator-ignite git commit: IGNITE-621 - Added automatic retries. IGNITE-621 - Added automatic retries. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3787a9d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3787a9d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3787a9d3 Branch: refs/heads/ignite-648 Commit: 3787a9d3353c0c146141a79e3e87e1bbc5128031 Parents: 415264e Author: Alexey Goncharuk Authored: Fri Jun 19 17:15:02 2015 -0700 Committer: Alexey Goncharuk Committed: Fri Jun 19 17:15:02 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 5 + .../processors/cache/CacheOperationContext.java | 44 +++++- .../processors/cache/GridCacheAdapter.java | 91 +++++++------ .../processors/cache/GridCacheProxyImpl.java | 10 +- .../processors/cache/IgniteCacheProxy.java | 36 ++++- .../dht/atomic/GridDhtAtomicCache.java | 18 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 87 ++++++++++-- .../IgniteCachePutRetryAbstractSelfTest.java | 134 +++++++++++++++++++ .../dht/IgniteCachePutRetryAtomicSelfTest.java | 34 +++++ ...gniteCachePutRetryTransactionalSelfTest.java | 35 +++++ 10 files changed, 422 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 2b97e55..c8d6d7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -106,6 +106,11 @@ public interface IgniteCache extends javax.cache.Cache, IgniteAsyncS public IgniteCache withSkipStore(); /** + * @return Cache with no-retries behavior enabled. + */ + public IgniteCache withNoRetries(); + + /** * Executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes. * * @param p Optional predicate (may be {@code null}). If provided, will be used to http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java index 34d2bf4..343a2f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java @@ -36,6 +36,10 @@ public class CacheOperationContext implements Serializable { @GridToStringInclude private final boolean skipStore; + /** No retries flag. */ + @GridToStringInclude + private final boolean noRetries; + /** Client ID which operates over this projection. */ private final UUID subjId; @@ -56,6 +60,8 @@ public class CacheOperationContext implements Serializable { keepPortable = false; expiryPlc = null; + + noRetries = false; } /** @@ -68,7 +74,8 @@ public class CacheOperationContext implements Serializable { boolean skipStore, @Nullable UUID subjId, boolean keepPortable, - @Nullable ExpiryPolicy expiryPlc) { + @Nullable ExpiryPolicy expiryPlc, + boolean noRetries) { this.skipStore = skipStore; this.subjId = subjId; @@ -76,6 +83,8 @@ public class CacheOperationContext implements Serializable { this.keepPortable = keepPortable; this.expiryPlc = expiryPlc; + + this.noRetries = noRetries; } /** @@ -95,7 +104,8 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, true, - expiryPlc); + expiryPlc, + noRetries); } /** @@ -118,7 +128,8 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, keepPortable, - expiryPlc); + expiryPlc, + noRetries); } /** @@ -139,7 +150,8 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, keepPortable, - expiryPlc); + expiryPlc, + noRetries); } /** @@ -160,7 +172,29 @@ public class CacheOperationContext implements Serializable { skipStore, subjId, true, - plc); + plc, + noRetries); + } + + /** + * @param noRetries No retries flag. + * @return Operation context. + */ + public CacheOperationContext setNoRetries(boolean noRetries) { + return new CacheOperationContext( + skipStore, + subjId, + keepPortable, + expiryPlc, + noRetries + ); + } + + /** + * @return No retries flag. + */ + public boolean noRetries() { + return noRetries; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 7335d72..f993527 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -78,6 +78,9 @@ public abstract class GridCacheAdapter implements IgniteInternalCache> stash = new ThreadLocal>() { @@ -363,7 +366,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache forSubjectId(UUID subjId) { - CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null); + CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false); return new GridCacheProxyImpl<>(ctx, this, opCtx); } @@ -375,14 +378,14 @@ public abstract class GridCacheAdapter implements IgniteInternalCache setSkipStore(boolean skipStore) { - CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null); + CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ @Override public GridCacheProxyImpl keepPortable() { - CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null); + CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false); return new GridCacheProxyImpl<>((GridCacheContext)ctx, (GridCacheAdapter)this, opCtx); } @@ -399,7 +402,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache(ctx, this, opCtx); } @@ -2301,7 +2304,7 @@ public abstract class GridCacheAdapter implements IgniteInternalCache putAsync0(final K key, final V val, - @Nullable final CacheEntryPredicate... filter) { + @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -3930,51 +3933,63 @@ public abstract class GridCacheAdapter implements IgniteInternalCache implements IgniteInternalCache, Exte CacheOperationContext prev = gate.enter(opCtx); try { - return opCtx != null ? opCtx.skipStore() : false; + return opCtx != null && opCtx.skipStore(); } finally { gate.leave(prev); @@ -198,7 +198,7 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte /** {@inheritDoc} */ @Override public GridCacheProxyImpl forSubjectId(UUID subjId) { return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null)); + opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null, false)); } /** {@inheritDoc} */ @@ -210,7 +210,7 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte return this; return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null)); + opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null, false)); } finally { gate.leave(prev); @@ -224,7 +224,7 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte return new GridCacheProxyImpl<>((GridCacheContext)ctx, (GridCacheAdapter)delegate, - opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null, true, null)); + opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null, true, null, false)); } /** {@inheritDoc} */ @@ -1515,7 +1515,7 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte try { return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc)); + opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc, false)); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 48fd259..0ad2a9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -246,7 +246,7 @@ public class IgniteCacheProxy extends AsyncSupportAdapter(ctx, delegate, prj0, isAsync(), lock); } @@ -261,6 +261,30 @@ public class IgniteCacheProxy extends AsyncSupportAdapter withNoRetries() { + CacheOperationContext prev = onEnter(opCtx); + + try { + boolean noRetries = opCtx != null && opCtx.noRetries(); + + if (noRetries) + return this; + + CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) : + new CacheOperationContext(false, null, false, null, true); + + return new IgniteCacheProxy<>(ctx, + delegate, + opCtx0, + isAsync(), + lock); + } + finally { + onLeave(prev); + } + } + + /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate p, @Nullable Object... args) { try { CacheOperationContext prev = onEnter(opCtx); @@ -1498,10 +1522,11 @@ public class IgniteCacheProxy extends AsyncSupportAdapter((GridCacheContext)ctx, (GridCacheAdapter)delegate, @@ -1529,8 +1554,9 @@ public class IgniteCacheProxy extends AsyncSupportAdapter(ctx, delegate, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8630421..2863ae8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -767,11 +767,13 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { filter, subjId, taskNameHash, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + waitTopFut); return asyncOp(new CO>() { @Override public IgniteInternalFuture apply() { - updateFut.map(waitTopFut); + updateFut.map(); return updateFut; } @@ -830,14 +832,16 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { filter, subjId, taskNameHash, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, + true); if (statsEnabled) updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); return asyncOp(new CO>() { @Override public IgniteInternalFuture apply() { - updateFut.map(true); + updateFut.map(); return updateFut; } @@ -2273,9 +2277,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { req.filter(), req.subjectId(), req.taskNameHash(), - req.skipStore()); + req.skipStore(), + MAX_RETRIES, + true); - updateFut.map(true); + updateFut.map(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 07f5ecf..53150cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** Mappings. */ @GridToStringInclude - private final ConcurrentMap mappings; + private ConcurrentMap mappings; /** Error. */ private volatile CachePartialUpdateCheckedException err; @@ -123,7 +123,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter private GridNearAtomicUpdateRequest singleReq; /** Raw return value flag. */ - private boolean rawRetval; + private final boolean rawRetval; /** Fast map flag. */ private final boolean fastMap; @@ -149,6 +149,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** Skip store flag. */ private final boolean skipStore; + /** Wait for topology future flag. */ + private final boolean waitTopFut; + + /** Remap count. */ + private AtomicInteger remapCnt; + /** * @param cctx Cache context. * @param cache Cache instance. @@ -183,7 +189,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter final CacheEntryPredicate[] filter, UUID subjId, int taskNameHash, - boolean skipStore + boolean skipStore, + int remapCnt, + boolean waitTopFut ) { this.rawRetval = rawRetval; @@ -207,6 +215,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter this.subjId = subjId; this.taskNameHash = taskNameHash; this.skipStore = skipStore; + this.waitTopFut = waitTopFut; if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); @@ -218,6 +227,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter !(cctx.writeThrough() && cctx.config().getInterceptor() != null); nearEnabled = CU.isNearEnabled(cctx); + + this.remapCnt = new AtomicInteger(remapCnt); } /** {@inheritDoc} */ @@ -295,10 +306,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter /** * Performs future mapping. - * - * @param waitTopFut Whether to wait for topology future. */ - public void map(boolean waitTopFut) { + public void map() { AffinityTopologyVersion topVer = null; IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); @@ -310,14 +319,62 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); if (topVer == null) - mapOnTopology(null, false, null, waitTopFut); + mapOnTopology(null, false, null); else { topLocked = true; + // Cannot remap. + remapCnt.set(1); + map0(topVer, null, false, null); } } + /** + * @param failed Keys to remap. + */ + private void remap(Collection failed) { + if (futVer != null) + cctx.mvcc().removeAtomicFuture(version()); + + Collection remapKeys = new ArrayList<>(failed.size()); + Collection remapVals = new ArrayList<>(failed.size()); + + Iterator keyIt = keys.iterator(); + Iterator valsIt = vals.iterator(); + + for (Object key : failed) { + while (keyIt.hasNext()) { + Object nextKey = keyIt.next(); + Object nextVal = valsIt.next(); + + if (F.eq(key, nextKey)) { + remapKeys.add(nextKey); + remapVals.add(nextVal); + + break; + } + } + } + + keys = remapKeys; + vals = remapVals; + + mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f); + single = null; + futVer = null; + err = null; + opRes = null; + topVer = AffinityTopologyVersion.ZERO; + singleNodeId = null; + singleReq = null; + fastMapRemap = false; + updVer = null; + topLocked = false; + + map(); + } + /** {@inheritDoc} */ @SuppressWarnings("ConstantConditions") @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { @@ -331,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter if (op == TRANSFORM && retval == null) retval = Collections.emptyMap(); + if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class) && remapCnt.decrementAndGet() > 0) { + remap(X.cause(err, CachePartialUpdateCheckedException.class).failedKeys()); + + return false; + } + if (super.onDone(retval, err)) { if (futVer != null) cctx.mvcc().removeAtomicFuture(version()); @@ -353,7 +416,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter Collection remapKeys = fastMap ? null : res.remapKeys(); - mapOnTopology(remapKeys, true, nodeId, true); + mapOnTopology(remapKeys, true, nodeId); return; } @@ -431,10 +494,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter * @param keys Keys to map. * @param remap Boolean flag indicating if this is partial future remap. * @param oldNodeId Old node ID if remap. - * @param waitTopFut Whether to wait for topology future. */ - private void mapOnTopology(final Collection keys, final boolean remap, final UUID oldNodeId, - final boolean waitTopFut) { + private void mapOnTopology(final Collection keys, final boolean remap, final UUID oldNodeId) { cache.topology().readLock(); AffinityTopologyVersion topVer = null; @@ -465,7 +526,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter @Override public void apply(IgniteInternalFuture t) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - mapOnTopology(keys, remap, oldNodeId, waitTopFut); + mapOnTopology(keys, remap, oldNodeId); } }); } @@ -509,7 +570,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter } if (remap) - mapOnTopology(null, true, null, true); + mapOnTopology(null, true, null); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java new file mode 100644 index 0000000..89d1040 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** + * @return Keys count for the test. + */ + protected abstract int keysCount(); + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setBackups(1); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(3); + + U.sleep(300); + + startGrid(3); + } + + return null; + } + }); + + int keysCnt = keysCount(); + + for (int i = 0; i < keysCnt; i++) + ignite(0).cache(null).put(i, i); + + finished.set(true); + fut.get(); + + for (int i = 0; i < keysCnt; i++) + assertEquals(i, ignite(0).cache(null).get(i)); + } + + /** + * @throws Exception If failed. + */ + public void testFailWithNoRetries() throws Exception { + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(3); + + U.sleep(300); + + startGrid(3); + } + + return null; + } + }); + + int keysCnt = keysCount(); + + boolean exceptionThrown = false; + + for (int i = 0; i < keysCnt; i++) { + try { + ignite(0).cache(null).withNoRetries().put(i, i); + } + catch (Exception e) { + assertTrue("Invalid exception: " + e, X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, CachePartialUpdateException.class)); + + exceptionThrown = true; + + break; + } + } + + assertTrue(exceptionThrown); + + finished.set(true); + fut.get(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60 * 1000; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java new file mode 100644 index 0000000..e76663a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected int keysCount() { + return 60_000; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java new file mode 100644 index 0000000..e65459a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import org.apache.ignite.cache.*; + +/** + * + */ +public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected int keysCount() { + return 20_000; + } +}