ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [07/39] incubator-ignite git commit: IGNITE-621 - Added automatic retries.
Date Fri, 03 Jul 2015 12:57:02 GMT
IGNITE-621 - Added automatic retries.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3787a9d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3787a9d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3787a9d3

Branch: refs/heads/ignite-747
Commit: 3787a9d3353c0c146141a79e3e87e1bbc5128031
Parents: 415264e
Author: Alexey Goncharuk <agoncharuk@gridgain.com>
Authored: Fri Jun 19 17:15:02 2015 -0700
Committer: Alexey Goncharuk <agoncharuk@gridgain.com>
Committed: Fri Jun 19 17:15:02 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |   5 +
 .../processors/cache/CacheOperationContext.java |  44 +++++-
 .../processors/cache/GridCacheAdapter.java      |  91 +++++++------
 .../processors/cache/GridCacheProxyImpl.java    |  10 +-
 .../processors/cache/IgniteCacheProxy.java      |  36 ++++-
 .../dht/atomic/GridDhtAtomicCache.java          |  18 ++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  87 ++++++++++--
 .../IgniteCachePutRetryAbstractSelfTest.java    | 134 +++++++++++++++++++
 .../dht/IgniteCachePutRetryAtomicSelfTest.java  |  34 +++++
 ...gniteCachePutRetryTransactionalSelfTest.java |  35 +++++
 10 files changed, 422 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 2b97e55..c8d6d7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -106,6 +106,11 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
     public IgniteCache<K, V> withSkipStore();
 
     /**
+     * @return Cache with no-retries behavior enabled.
+     */
+    public IgniteCache<K, V> withNoRetries();
+
+    /**
      * Executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes.
      *
      * @param p Optional predicate (may be {@code null}). If provided, will be used to

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 34d2bf4..343a2f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -36,6 +36,10 @@ public class CacheOperationContext implements Serializable {
     @GridToStringInclude
     private final boolean skipStore;
 
+    /** No retries flag. */
+    @GridToStringInclude
+    private final boolean noRetries;
+
     /** Client ID which operates over this projection. */
     private final UUID subjId;
 
@@ -56,6 +60,8 @@ public class CacheOperationContext implements Serializable {
         keepPortable = false;
 
         expiryPlc = null;
+
+        noRetries = false;
     }
 
     /**
@@ -68,7 +74,8 @@ public class CacheOperationContext implements Serializable {
         boolean skipStore,
         @Nullable UUID subjId,
         boolean keepPortable,
-        @Nullable ExpiryPolicy expiryPlc) {
+        @Nullable ExpiryPolicy expiryPlc,
+        boolean noRetries) {
         this.skipStore = skipStore;
 
         this.subjId = subjId;
@@ -76,6 +83,8 @@ public class CacheOperationContext implements Serializable {
         this.keepPortable = keepPortable;
 
         this.expiryPlc = expiryPlc;
+
+        this.noRetries = noRetries;
     }
 
     /**
@@ -95,7 +104,8 @@ public class CacheOperationContext implements Serializable {
             skipStore,
             subjId,
             true,
-            expiryPlc);
+            expiryPlc,
+            noRetries);
     }
 
     /**
@@ -118,7 +128,8 @@ public class CacheOperationContext implements Serializable {
             skipStore,
             subjId,
             keepPortable,
-            expiryPlc);
+            expiryPlc,
+            noRetries);
     }
 
     /**
@@ -139,7 +150,8 @@ public class CacheOperationContext implements Serializable {
             skipStore,
             subjId,
             keepPortable,
-            expiryPlc);
+            expiryPlc,
+            noRetries);
     }
 
     /**
@@ -160,7 +172,29 @@ public class CacheOperationContext implements Serializable {
             skipStore,
             subjId,
             true,
-            plc);
+            plc,
+            noRetries);
+    }
+
+    /**
+     * @param noRetries No retries flag.
+     * @return Operation context.
+     */
+    public CacheOperationContext setNoRetries(boolean noRetries) {
+        return new CacheOperationContext(
+            skipStore,
+            subjId,
+            keepPortable,
+            expiryPlc,
+            noRetries
+        );
+    }
+
+    /**
+     * @return No retries flag.
+     */
+    public boolean noRetries() {
+        return noRetries;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7335d72..f993527 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
     /** clearLocally() split threshold. */
     public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
 
+    /** Maximum number of retries when topology changes. */
+    public static final int MAX_RETRIES = 100;
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new
ThreadLocal<IgniteBiTuple<String,
         String>>() {
@@ -363,7 +366,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
-        CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null);
+        CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null,
false);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);
     }
@@ -375,14 +378,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) {
-        CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null);
+        CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null,
false);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);
     }
 
     /** {@inheritDoc} */
     @Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepPortable() {
-        CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null);
+        CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null,
false);
 
         return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1,
V1>)this, opCtx);
     }
@@ -399,7 +402,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         assert !CU.isAtomicsCache(ctx.name());
         assert !CU.isMarshallerCache(ctx.name());
 
-        CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc);
+        CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc,
false);
 
         return new GridCacheProxyImpl<>(ctx, this, opCtx);
     }
@@ -2301,7 +2304,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
      * @return Putx operation future.
      */
     public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
-                                                   @Nullable final CacheEntryPredicate...
filter) {
+        @Nullable final CacheEntryPredicate... filter) {
         A.notNull(key, "key", val, "val");
 
         if (keyCheck)
@@ -3930,51 +3933,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         if (tx == null || tx.implicit()) {
             TransactionConfiguration tCfg = ctx.gridConfig().getTransactionConfiguration();
 
-            tx = ctx.tm().newTx(
-                true,
-                op.single(),
-                ctx.systemTx() ? ctx : null,
-                OPTIMISTIC,
-                READ_COMMITTED,
-                tCfg.getDefaultTxTimeout(),
-                !ctx.skipStore(),
-                0
-            );
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
 
-            assert tx != null;
+            int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES;
 
-            try {
-                T t = op.op(tx);
+            for (int i = 0; i < retries; i++) {
+                tx = ctx.tm().newTx(
+                    true,
+                    op.single(),
+                    ctx.systemTx() ? ctx : null,
+                    OPTIMISTIC,
+                    READ_COMMITTED,
+                    tCfg.getDefaultTxTimeout(),
+                    !ctx.skipStore(),
+                    0
+                );
 
-                assert tx.done() : "Transaction is not done: " + tx;
+                assert tx != null;
 
-                return t;
-            }
-            catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException
|
-                IgniteTxRollbackCheckedException e) {
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
                 try {
-                    tx.rollback();
+                    T t = op.op(tx);
 
-                    e = new IgniteTxRollbackCheckedException("Transaction has been rolled
back: " +
-                        tx.xid(), e);
+                    assert tx.done() : "Transaction is not done: " + tx;
+
+                    return t;
+                }
+                catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException
|
+                    IgniteTxRollbackCheckedException e) {
+                    throw e;
                 }
-                catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
-                    U.error(log, "Failed to rollback transaction (cache may contain stale
locks): " + tx, e1);
+                catch (IgniteCheckedException e) {
+                    try {
+                        tx.rollback();
+
+                        e = new IgniteTxRollbackCheckedException("Transaction has been rolled
back: " +
+                            tx.xid(), e);
+                    }
+                    catch (IgniteCheckedException | AssertionError | RuntimeException e1)
{
+                        U.error(log, "Failed to rollback transaction (cache may contain stale
locks): " + tx, e1);
+
+                        U.addLastCause(e, e1, log);
+                    }
+
+                    if (X.hasCause(e, ClusterTopologyCheckedException.class) && i
!= retries - 1)
+                        continue;
 
-                    U.addLastCause(e, e1, log);
+                    throw e;
                 }
+                finally {
+                    ctx.tm().resetContext();
 
-                throw e;
+                    if (ctx.isNear())
+                        ctx.near().dht().context().tm().resetContext();
+                }
             }
-            finally {
-                ctx.tm().resetContext();
 
-                if (ctx.isNear())
-                    ctx.near().dht().context().tm().resetContext();
-            }
+            // Should not happen.
+            throw new IgniteCheckedException("Failed to perform cache operation (maximum
number of retries exceeded).");
         }
         else
             return op.op(tx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 63ba242..cec8c53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -118,7 +118,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K,
V>, Exte
         CacheOperationContext prev = gate.enter(opCtx);
 
         try {
-            return opCtx != null ? opCtx.skipStore() : false;
+            return opCtx != null && opCtx.skipStore();
         }
         finally {
             gate.leave(prev);
@@ -198,7 +198,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K,
V>, Exte
     /** {@inheritDoc} */
     @Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
         return new GridCacheProxyImpl<>(ctx, delegate,
-            opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false,
subjId, false, null));
+            opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false,
subjId, false, null, false));
     }
 
     /** {@inheritDoc} */
@@ -210,7 +210,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K,
V>, Exte
                 return this;
 
             return new GridCacheProxyImpl<>(ctx, delegate,
-                opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true,
null, false, null));
+                opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true,
null, false, null, false));
         }
         finally {
             gate.leave(prev);
@@ -224,7 +224,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K,
V>, Exte
         
         return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, 
             (GridCacheAdapter<K1, V1>)delegate,
-            opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null,
true, null));
+            opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null,
true, null, false));
     }
 
     /** {@inheritDoc} */
@@ -1515,7 +1515,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K,
V>, Exte
 
         try {
             return new GridCacheProxyImpl<>(ctx, delegate,
-                opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false,
null, false, plc));
+                opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false,
null, false, plc, false));
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 48fd259..0ad2a9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -246,7 +246,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
 
         try {
             CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
-                new CacheOperationContext(false, null, false, plc);
+                new CacheOperationContext(false, null, false, plc, false);
 
             return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
         }
@@ -261,6 +261,30 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteCache<K, V> withNoRetries() {
+        CacheOperationContext prev = onEnter(opCtx);
+
+        try {
+            boolean noRetries = opCtx != null && opCtx.noRetries();
+
+            if (noRetries)
+                return this;
+
+            CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) :
+                new CacheOperationContext(false, null, false, null, true);
+
+            return new IgniteCacheProxy<>(ctx,
+                delegate,
+                opCtx0,
+                isAsync(),
+                lock);
+        }
+        finally {
+            onLeave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable
Object... args) {
         try {
             CacheOperationContext prev = onEnter(opCtx);
@@ -1498,10 +1522,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         try {
             CacheOperationContext opCtx0 =
                 new CacheOperationContext(
-                    opCtx != null ? opCtx.skipStore() : false,
+                    opCtx != null && opCtx.skipStore(),
                     opCtx != null ? opCtx.subjectId() : null,
                     true,
-                    opCtx != null ? opCtx.expiry() : null);
+                    opCtx != null ? opCtx.expiry() : null,
+                    opCtx != null && opCtx.noRetries());
 
             return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
                 (GridCacheAdapter<K1, V1>)delegate,
@@ -1529,8 +1554,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
             CacheOperationContext opCtx0 =
                 new CacheOperationContext(true,
                     opCtx != null ? opCtx.subjectId() : null,
-                    opCtx != null ? opCtx.isKeepPortable() : false,
-                    opCtx != null ? opCtx.expiry() : null);
+                    opCtx != null && opCtx.isKeepPortable(),
+                    opCtx != null ? opCtx.expiry() : null,
+                    opCtx != null && opCtx.noRetries());
 
             return new IgniteCacheProxy<>(ctx,
                 delegate,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8630421..2863ae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -767,11 +767,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             filter,
             subjId,
             taskNameHash,
-            opCtx != null && opCtx.skipStore());
+            opCtx != null && opCtx.skipStore(),
+            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+            waitTopFut);
 
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
             @Override public IgniteInternalFuture<Object> apply() {
-                updateFut.map(waitTopFut);
+                updateFut.map();
 
                 return updateFut;
             }
@@ -830,14 +832,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             filter,
             subjId,
             taskNameHash,
-            opCtx != null && opCtx.skipStore());
+            opCtx != null && opCtx.skipStore(),
+            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+            true);
 
         if (statsEnabled)
             updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
 
         return asyncOp(new CO<IgniteInternalFuture<Object>>() {
             @Override public IgniteInternalFuture<Object> apply() {
-                updateFut.map(true);
+                updateFut.map();
 
                 return updateFut;
             }
@@ -2273,9 +2277,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             req.filter(),
             req.subjectId(),
             req.taskNameHash(),
-            req.skipStore());
+            req.skipStore(),
+            MAX_RETRIES,
+            true);
 
-        updateFut.map(true);
+        updateFut.map();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 07f5ecf..53150cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** Mappings. */
     @GridToStringInclude
-    private final ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
+    private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
 
     /** Error. */
     private volatile CachePartialUpdateCheckedException err;
@@ -123,7 +123,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     private GridNearAtomicUpdateRequest singleReq;
 
     /** Raw return value flag. */
-    private boolean rawRetval;
+    private final boolean rawRetval;
 
     /** Fast map flag. */
     private final boolean fastMap;
@@ -149,6 +149,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
     /** Skip store flag. */
     private final boolean skipStore;
 
+    /** Wait for topology future flag. */
+    private final boolean waitTopFut;
+
+    /** Remap count. */
+    private AtomicInteger remapCnt;
+
     /**
      * @param cctx Cache context.
      * @param cache Cache instance.
@@ -183,7 +189,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         final CacheEntryPredicate[] filter,
         UUID subjId,
         int taskNameHash,
-        boolean skipStore
+        boolean skipStore,
+        int remapCnt,
+        boolean waitTopFut
     ) {
         this.rawRetval = rawRetval;
 
@@ -207,6 +215,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
+        this.waitTopFut = waitTopFut;
 
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
@@ -218,6 +227,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
 
         nearEnabled = CU.isNearEnabled(cctx);
+
+        this.remapCnt = new AtomicInteger(remapCnt);
     }
 
     /** {@inheritDoc} */
@@ -295,10 +306,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /**
      * Performs future mapping.
-     *
-     * @param waitTopFut Whether to wait for topology future.
      */
-    public void map(boolean waitTopFut) {
+    public void map() {
         AffinityTopologyVersion topVer = null;
 
         IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
@@ -310,14 +319,62 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
         if (topVer == null)
-            mapOnTopology(null, false, null, waitTopFut);
+            mapOnTopology(null, false, null);
         else {
             topLocked = true;
 
+            // Cannot remap.
+            remapCnt.set(1);
+
             map0(topVer, null, false, null);
         }
     }
 
+    /**
+     * @param failed Keys to remap.
+     */
+    private void remap(Collection<?> failed) {
+        if (futVer != null)
+            cctx.mvcc().removeAtomicFuture(version());
+
+        Collection<Object> remapKeys = new ArrayList<>(failed.size());
+        Collection<Object> remapVals = new ArrayList<>(failed.size());
+
+        Iterator<?> keyIt = keys.iterator();
+        Iterator<?> valsIt = vals.iterator();
+
+        for (Object key : failed) {
+            while (keyIt.hasNext()) {
+                Object nextKey = keyIt.next();
+                Object nextVal = valsIt.next();
+
+                if (F.eq(key, nextKey)) {
+                    remapKeys.add(nextKey);
+                    remapVals.add(nextVal);
+
+                    break;
+                }
+            }
+        }
+
+        keys = remapKeys;
+        vals = remapVals;
+
+        mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
+        single = null;
+        futVer = null;
+        err = null;
+        opRes = null;
+        topVer = AffinityTopologyVersion.ZERO;
+        singleNodeId = null;
+        singleReq = null;
+        fastMapRemap = false;
+        updVer = null;
+        topLocked = false;
+
+        map();
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("ConstantConditions")
     @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
@@ -331,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (op == TRANSFORM && retval == null)
             retval = Collections.emptyMap();
 
+        if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class)
&& remapCnt.decrementAndGet() > 0) {
+            remap(X.cause(err, CachePartialUpdateCheckedException.class).failedKeys());
+
+            return false;
+        }
+
         if (super.onDone(retval, err)) {
             if (futVer != null)
                 cctx.mvcc().removeAtomicFuture(version());
@@ -353,7 +416,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
 
-            mapOnTopology(remapKeys, true, nodeId, true);
+            mapOnTopology(remapKeys, true, nodeId);
 
             return;
         }
@@ -431,10 +494,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param keys Keys to map.
      * @param remap Boolean flag indicating if this is partial future remap.
      * @param oldNodeId Old node ID if remap.
-     * @param waitTopFut Whether to wait for topology future.
      */
-    private void mapOnTopology(final Collection<?> keys, final boolean remap, final
UUID oldNodeId,
-        final boolean waitTopFut) {
+    private void mapOnTopology(final Collection<?> keys, final boolean remap, final
UUID oldNodeId) {
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer = null;
@@ -465,7 +526,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion>
t) {
                             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                                 @Override public void run() {
-                                    mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+                                    mapOnTopology(keys, remap, oldNodeId);
                                 }
                             });
                         }
@@ -509,7 +570,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         if (remap)
-            mapOnTopology(null, true, null, true);
+            mapOnTopology(null, true, null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
new file mode 100644
index 0000000..89d1040
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstractSelfTest
{
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /**
+     * @return Keys count for the test.
+     */
+    protected abstract int keysCount();
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception
{
+        CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+        cfg.setBackups(1);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPut() throws Exception {
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>()
{
+            @Override public Object call() throws Exception {
+                while (!finished.get()) {
+                    stopGrid(3);
+
+                    U.sleep(300);
+
+                    startGrid(3);
+                }
+
+                return null;
+            }
+        });
+
+        int keysCnt = keysCount();
+
+        for (int i = 0; i < keysCnt; i++)
+            ignite(0).cache(null).put(i, i);
+
+        finished.set(true);
+        fut.get();
+
+        for (int i = 0; i < keysCnt; i++)
+            assertEquals(i, ignite(0).cache(null).get(i));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailWithNoRetries() throws Exception {
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>()
{
+            @Override public Object call() throws Exception {
+                while (!finished.get()) {
+                    stopGrid(3);
+
+                    U.sleep(300);
+
+                    startGrid(3);
+                }
+
+                return null;
+            }
+        });
+
+        int keysCnt = keysCount();
+
+        boolean exceptionThrown = false;
+
+        for (int i = 0; i < keysCnt; i++) {
+            try {
+                ignite(0).cache(null).withNoRetries().put(i, i);
+            }
+            catch (Exception e) {
+                assertTrue("Invalid exception: " + e, X.hasCause(e, ClusterTopologyCheckedException.class)
|| X.hasCause(e, CachePartialUpdateException.class));
+
+                exceptionThrown = true;
+
+                break;
+            }
+        }
+
+        assertTrue(exceptionThrown);
+
+        finished.set(true);
+        fut.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 3 * 60 * 1000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
new file mode 100644
index 0000000..e76663a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+
+/**
+ *
+ */
+public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int keysCount() {
+        return 60_000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
new file mode 100644
index 0000000..e65459a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+
+/**
+ *
+ */
+public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest
{
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int keysCount() {
+        return 20_000;
+    }
+}


Mime
View raw message