ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dma...@apache.org
Subject [2/3] ignite git commit: ignite-801 and ignite-1911: resurrecting data structure and atomics failover tests + stopping the node if ring message worker fails
Date Fri, 20 Nov 2015 16:30:07 GMT
ignite-801 and ignite-1911: resurrecting data structure and atomics failover tests + stopping the node if ring message worker fails


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c711484c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c711484c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c711484c

Branch: refs/heads/ignite-1.5
Commit: c711484c30315c06ce0b31a8775bfc41b7ee1483
Parents: 8e7e330
Author: Denis Magda <dmagda@gridgain.com>
Authored: Fri Nov 20 19:11:07 2015 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Fri Nov 20 19:11:07 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheUtils.java        |  39 +-
 .../CacheDataStructuresManager.java             |   2 +-
 .../GridFutureRemapTimeoutObject.java           |  72 --
 .../dht/GridPartitionedGetFuture.java           |  28 +-
 .../distributed/near/GridNearGetFuture.java     |  28 +-
 .../IgniteTxImplicitSingleStateImpl.java        |  29 +-
 .../IgniteTxRemoteSingleStateImpl.java          |  19 +-
 .../datastructures/DataStructuresProcessor.java |  47 +-
 .../GridAtomicCacheQueueImpl.java               | 126 +--
 .../GridCacheAtomicReferenceImpl.java           |  10 +-
 .../GridCacheCountDownLatchImpl.java            |  15 +-
 .../datastructures/GridCacheQueueAdapter.java   |  32 +-
 .../GridTransactionalCacheQueueImpl.java        | 193 ++--
 .../ignite/spi/discovery/DiscoverySpi.java      |   2 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  36 +
 ...eAbstractDataStructuresFailoverSelfTest.java | 924 +++++++++----------
 ...rtitionedDataStructuresFailoverSelfTest.java |   7 +-
 ...edOffheapDataStructuresFailoverSelfTest.java |  12 +-
 ...eplicatedDataStructuresFailoverSelfTest.java |   5 -
 ...gniteAtomicLongChangingTopologySelfTest.java |   2 +
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   4 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java |   4 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |  90 +-
 .../testframework/junits/GridAbstractTest.java  |   6 +-
 24 files changed, 796 insertions(+), 936 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index f7d115f..89779d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -61,6 +61,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -93,6 +94,7 @@ import org.apache.ignite.plugin.CachePluginConfiguration;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionRollbackException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -1780,28 +1782,41 @@ public class GridCacheUtils {
     public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
         return new Callable<S>() {
             @Override public S call() throws Exception {
-                int retries = GridCacheAdapter.MAX_RETRIES;
-
                 IgniteCheckedException err = null;
 
-                for (int i = 0; i < retries; i++) {
+                for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
                     try {
                         return c.call();
                     }
+                    catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+                        throw e;
+                    }
+                    catch (TransactionRollbackException e) {
+                        if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+                            throw e;
+
+                        U.sleep(1);
+                    }
                     catch (IgniteCheckedException e) {
-                        if (X.hasCause(e, ClusterTopologyCheckedException.class) ||
-                            X.hasCause(e, IgniteTxRollbackCheckedException.class) ||
-                            X.hasCause(e, CachePartialUpdateCheckedException.class)) {
-                            if (i < retries - 1) {
-                                err = e;
+                        if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+                            throw e;
 
-                                U.sleep(1);
+                        if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+                            ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
 
-                                continue;
-                            }
+                            if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
+                                ClusterTopologyServerNotFoundException)
+                                throw e;
 
-                            throw e;
+                            // IGNITE-1948: remove this check when the issue is fixed
+                            if (topErr.retryReadyFuture() != null)
+                                topErr.retryReadyFuture().get();
+                            else
+                                U.sleep(1);
                         }
+                        else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+                            CachePartialUpdateCheckedException.class))
+                            U.sleep(1);
                         else
                             throw e;
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 1ff4575..930921b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -770,4 +770,4 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
             return "RemoveSetCallable [setId=" + setId + ']';
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
deleted file mode 100644
index 72fdd4b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridFutureRemapTimeoutObject.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-
-/**
- * Future remap timeout object.
- */
-public class GridFutureRemapTimeoutObject extends GridTimeoutObjectAdapter {
-    /** */
-    private final GridFutureAdapter<?> fut;
-
-    /** Finished flag. */
-    private final AtomicBoolean finished = new AtomicBoolean();
-
-    /** Topology version to wait. */
-    private final AffinityTopologyVersion topVer;
-
-    /** Exception cause. */
-    private final IgniteCheckedException e;
-
-    /**
-     * @param fut Future.
-     * @param timeout Timeout.
-     * @param topVer Topology version timeout was created on.
-     * @param e Exception cause.
-     */
-    public GridFutureRemapTimeoutObject(
-        GridFutureAdapter<?> fut,
-        long timeout,
-        AffinityTopologyVersion topVer,
-        IgniteCheckedException e) {
-        super(timeout);
-
-        this.fut = fut;
-        this.topVer = topVer;
-        this.e = e;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onTimeout() {
-        if (finish()) // Fail the whole get future, else remap happened concurrently.
-            fut.onDone(new IgniteCheckedException("Failed to wait for topology version to change: " + topVer, e));
-    }
-
-    /**
-     * @return Guard against concurrent completion.
-     */
-    public boolean finish() {
-        return finished.compareAndSet(false, true);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index c3d9836..e3fae22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -644,34 +643,23 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
                 final AffinityTopologyVersion updTopVer =
                     new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
 
-                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                    cctx.kernalContext().config().getNetworkTimeout(),
-                    updTopVer,
-                    e);
-
                 cctx.affinity().affinityReadyFuture(updTopVer).listen(
                     new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            if (timeout.finish()) {
-                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
-                                try {
-                                    fut.get();
+                            try {
+                                fut.get();
 
-                                    // Remap.
-                                    map(keys.keySet(), F.t(node, keys), updTopVer);
+                                // Remap.
+                                map(keys.keySet(), F.t(node, keys), updTopVer);
 
-                                    onDone(Collections.<K, V>emptyMap());
-                                }
-                                catch (IgniteCheckedException e) {
-                                    GridPartitionedGetFuture.this.onDone(e);
-                                }
+                                onDone(Collections.<K, V>emptyMap());
+                            }
+                            catch (IgniteCheckedException e) {
+                                GridPartitionedGetFuture.this.onDone(e);
                             }
                         }
                     }
                 );
-
-                cctx.kernalContext().timeout().addTimeoutObject(timeout);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index dfaa44e..f1bff61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -40,7 +40,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.GridFutureRemapTimeoutObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.CacheDistributedGetFutureAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -851,34 +850,23 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                 final AffinityTopologyVersion updTopVer =
                     new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
 
-                final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
-                    cctx.kernalContext().config().getNetworkTimeout(),
-                    updTopVer,
-                    e);
-
                 cctx.affinity().affinityReadyFuture(updTopVer).listen(
                     new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
-                            if (timeout.finish()) {
-                                cctx.kernalContext().timeout().removeTimeoutObject(timeout);
-
-                                try {
-                                    fut.get();
+                            try {
+                                fut.get();
 
-                                    // Remap.
-                                    map(keys.keySet(), F.t(node, keys), updTopVer);
+                                // Remap.
+                                map(keys.keySet(), F.t(node, keys), updTopVer);
 
-                                    onDone(Collections.<K, V>emptyMap());
-                                }
-                                catch (IgniteCheckedException e) {
-                                    GridNearGetFuture.this.onDone(e);
-                                }
+                                onDone(Collections.<K, V>emptyMap());
+                            }
+                            catch (IgniteCheckedException e) {
+                                GridNearGetFuture.this.onDone(e);
                             }
                         }
                     }
                 );
-
-                cctx.kernalContext().timeout().addTimeoutObject(timeout);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index c75a8f38..3e0231e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -17,8 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
@@ -28,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -160,8 +163,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
 
         CacheStoreManager store = cacheCtx.store();
 
-        if (store.configured())
-            return Collections.singleton(store);
+        if (store.configured()) {
+            HashSet<CacheStoreManager> set = new HashSet<>(3, 0.75f);
+
+            set.add(store);
+
+            return set;
+        }
 
         return null;
     }
@@ -192,12 +200,20 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public Set<IgniteTxKey> writeSet() {
-        return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+        if (entry != null) {
+            HashSet<IgniteTxKey> set = new HashSet<>(3, 0.75f);
+
+            set.add(entry.txKey());
+
+            return set;
+        }
+        else
+            return Collections.<IgniteTxKey>emptySet();
     }
 
     /** {@inheritDoc} */
     @Override public Collection<IgniteTxEntry> writeEntries() {
-        return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+        return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
     }
 
     /** {@inheritDoc} */
@@ -207,8 +223,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
-        return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
-            Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
+        return entry != null ? F.asMap(entry.txKey(), entry) : Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
     }
 
     /** {@inheritDoc} */
@@ -223,7 +238,7 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
 
     /** {@inheritDoc} */
     @Override public Collection<IgniteTxEntry> allEntries() {
-        return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+        return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
index 22f04a8..90af517 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
@@ -17,10 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -62,12 +65,20 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
 
     /** {@inheritDoc} */
     @Override public Set<IgniteTxKey> writeSet() {
-        return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+        if (entry != null) {
+            HashSet<IgniteTxKey> set = new HashSet<>(3, 0.75f);
+
+            set.add(entry.txKey());
+
+            return set;
+        }
+        else
+            return Collections.<IgniteTxKey>emptySet();
     }
 
     /** {@inheritDoc} */
     @Override public Collection<IgniteTxEntry> writeEntries() {
-        return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+        return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
     }
 
     /** {@inheritDoc} */
@@ -77,7 +88,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
 
     /** {@inheritDoc} */
     @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
-        return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
+        return entry != null ? F.asMap(entry.txKey(), entry) :
             Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
     }
 
@@ -93,7 +104,7 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
 
     /** {@inheritDoc} */
     @Override public Collection<IgniteTxEntry> allEntries() {
-        return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+        return entry != null ? Arrays.asList(entry) : Collections.<IgniteTxEntry>emptyList();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index b532d7f..23d64cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -56,14 +56,13 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheType;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
@@ -532,21 +531,23 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
         if (dataStructure != null)
             return dataStructure;
 
-        if (!create)
-            return c.applyx();
-
         while (true) {
-            try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
+            try {
+                if (!create)
+                    return c.applyx();
 
-                if (err != null)
-                    throw err;
+                try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                    err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get();
 
-                dataStructure = c.applyx();
+                    if (err != null)
+                        throw err;
 
-                tx.commit();
+                    dataStructure = c.applyx();
+
+                    tx.commit();
 
-                return dataStructure;
+                    return dataStructure;
+                }
             }
             catch (IgniteTxRollbackCheckedException ignore) {
                 // Safe to retry right away.
@@ -1605,27 +1606,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
      */
     public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
         try {
-            int cnt = 0;
-
-            while (true) {
-                try {
-                    return call.call();
-                }
-                catch (ClusterGroupEmptyCheckedException e) {
-                    throw new IgniteCheckedException(e);
-                }
-                catch (IgniteTxRollbackCheckedException |
-                    CachePartialUpdateCheckedException |
-                    ClusterTopologyCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to execute data structure operation, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            return GridCacheUtils.retryTopologySafe(call).call();
         }
         catch (IgniteCheckedException e) {
             throw e;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
index 28f8631..b433887 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -56,26 +55,9 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
 
             checkRemoved(idx);
 
-            int cnt = 0;
-
             GridCacheQueueItemKey key = itemKey(idx);
 
-            while (true) {
-                try {
-                    cache.getAndPut(key, item);
-
-                    break;
-                }
-                catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to put queue item, will retry [err=" + e + ", idx=" + idx + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            cache.getAndPut(key, item);
 
             return true;
         }
@@ -98,38 +80,18 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
 
                 GridCacheQueueItemKey key = itemKey(idx);
 
-                int cnt = 0;
-
-                long stop = 0;
+                T data = (T)cache.getAndRemove(key);
 
-                while (true) {
-                    try {
-                        T data = (T)cache.getAndRemove(key);
+                if (data != null)
+                    return data;
 
-                        if (data != null)
-                            return data;
+                long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
 
-                        if (stop == 0)
-                            stop = U.currentTimeMillis() + RETRY_TIMEOUT;
+                while (U.currentTimeMillis() < stop) {
+                    data = (T)cache.getAndRemove(key);
 
-                        while (U.currentTimeMillis() < stop ) {
-                            data = (T)cache.getAndRemove(key);
-
-                            if (data != null)
-                                return data;
-                        }
-
-                        break;
-                    }
-                    catch (CachePartialUpdateCheckedException e) {
-                        if (cnt++ == MAX_UPDATE_RETRIES)
-                            throw e;
-                        else {
-                            U.warn(log, "Failed to remove queue item, will retry [err=" + e + ']');
-
-                            U.sleep(RETRY_DELAY);
-                        }
-                    }
+                    if (data != null)
+                        return data;
                 }
 
                 U.warn(log, "Failed to get item, will retry poll [queue=" + queueName + ", idx=" + idx + ']');
@@ -161,24 +123,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
                 idx++;
             }
 
-            int cnt = 0;
-
-            while (true) {
-                try {
-                    cache.putAll(putMap);
-
-                    break;
-                }
-                catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add items, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            cache.putAll(putMap);
 
             return true;
         }
@@ -197,34 +142,14 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
 
             GridCacheQueueItemKey key = itemKey(idx);
 
-            int cnt = 0;
+            if (cache.remove(key))
+                return;
 
-            long stop = 0;
-
-            while (true) {
-                try {
-                    if (cache.remove(key))
-                        return;
+            long stop = U.currentTimeMillis() + RETRY_TIMEOUT;
 
-                    if (stop == 0)
-                        stop = U.currentTimeMillis() + RETRY_TIMEOUT;
-
-                    while (U.currentTimeMillis() < stop ) {
-                        if (cache.remove(key))
-                            return;
-                    }
-
-                    break;
-                }
-                catch (CachePartialUpdateCheckedException e) {
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add items, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
+            while (U.currentTimeMillis() < stop) {
+                if (cache.remove(key))
+                    return;
             }
 
             U.warn(log, "Failed to remove item, [queue=" + queueName + ", idx=" + idx + ']');
@@ -239,21 +164,6 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> {
     @SuppressWarnings("unchecked")
     @Nullable private Long transformHeader(EntryProcessor<GridCacheQueueHeaderKey, GridCacheQueueHeader, Long> c)
         throws IgniteCheckedException {
-        int cnt = 0;
-
-        while (true) {
-            try {
-                return (Long)cache.invoke(queueKey, c).get();
-            }
-            catch (CachePartialUpdateCheckedException e) {
-                if (cnt++ == MAX_UPDATE_RETRIES)
-                    throw e;
-                else {
-                    U.warn(log, "Failed to update queue header, will retry [err=" + e + ']');
-
-                    U.sleep(RETRY_DELAY);
-                }
-            }
-        }
+        return (Long)cache.invoke(queueKey, c).get();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index c0c38b2..37cdaea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
 
 /**
  * Cache atomic reference implementation.
@@ -230,7 +231,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
      * @return Callable for execution in async and sync mode.
      */
     private Callable<Boolean> internalSet(final T val) {
-        return new Callable<Boolean>() {
+        return retryTopologySafe(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
                 try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -252,7 +253,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
                     throw e;
                 }
             }
-        };
+        });
     }
 
     /**
@@ -265,7 +266,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
      */
     private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
         final IgniteClosure<T, T> newValClos) {
-        return new Callable<Boolean>() {
+
+        return retryTopologySafe(new Callable<Boolean>() {
             @Override public Boolean call() throws Exception {
                 try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
                     GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
@@ -295,7 +297,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
                     throw e;
                 }
             }
-        };
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 2667938..c984ab3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -342,20 +342,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     private class GetCountCallable implements Callable<Integer> {
         /** {@inheritDoc} */
         @Override public Integer call() throws Exception {
-            Integer val;
+            GridCacheCountDownLatchValue latchVal = latchView.get(key);
 
-            try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
-                GridCacheCountDownLatchValue latchVal = latchView.get(key);
-
-                if (latchVal == null)
-                    return 0;
-
-                val = latchVal.get();
-
-                tx.rollback();
-            }
-
-            return val;
+            return latchVal == null ? 0 : latchVal.get();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
index 0e4aebc..df1bd88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java
@@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE;
 
     /** */
-    protected static final int MAX_UPDATE_RETRIES = 100;
-
-    /** */
     protected static final long RETRY_DELAY = 1;
 
     /** */
@@ -169,14 +166,22 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     @SuppressWarnings("unchecked")
     @Nullable @Override public T peek() throws IgniteException {
         try {
-            GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
+            while (true) {
+                GridCacheQueueHeader hdr = (GridCacheQueueHeader)cache.get(queueKey);
 
-            checkRemoved(hdr);
+                checkRemoved(hdr);
 
-            if (hdr.empty())
-                return null;
+                if (hdr.empty())
+                    return null;
 
-            return (T)cache.get(itemKey(hdr.head()));
+                T val = (T)cache.get(itemKey(hdr.head()));
+
+                if (val == null)
+                    // Header might have been polled. Retry.
+                    continue;
+
+                return val;
+            }
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -416,8 +421,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
         long startIdx,
         long endIdx,
         int batchSize)
-        throws IgniteCheckedException
-    {
+        throws IgniteCheckedException {
         Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10);
 
         for (long idx = startIdx; idx < endIdx; idx++) {
@@ -435,8 +439,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     }
 
     /**
-     * Checks result of closure modifying queue header, throws {@link IllegalStateException}
-     * if queue was removed.
+     * Checks result of closure modifying queue header, throws {@link IllegalStateException} if queue was removed.
      *
      * @param idx Result of closure execution.
      */
@@ -529,7 +532,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
      */
     protected abstract void removeItem(long rmvIdx) throws IgniteCheckedException;
 
-
     /**
      * @param idx Item index.
      * @return Item key.
@@ -1036,7 +1038,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
         if (o == null || getClass() != o.getClass())
             return false;
 
-        GridCacheQueueAdapter that = (GridCacheQueueAdapter) o;
+        GridCacheQueueAdapter that = (GridCacheQueueAdapter)o;
 
         return id.equals(that.id);
 
@@ -1051,4 +1053,4 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp
     @Override public String toString() {
         return S.toString(GridCacheQueueAdapter.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index c7750a6..4880324 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -20,19 +20,17 @@ package org.apache.ignite.internal.processors.datastructures;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.transactions.TransactionRollbackException;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
@@ -55,12 +53,10 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
         A.notNull(item, "item");
 
         try {
-            boolean retVal;
+            return retryTopologySafe(new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    boolean retVal;
 
-            int cnt = 0;
-
-            while (true) {
-                try {
                     try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
                         Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get();
 
@@ -76,75 +72,59 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
 
                         tx.commit();
 
-                        break;
+                        return retVal;
                     }
                 }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
-
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
-
-            return retVal;
+            }).call();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        catch (RuntimeException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteException(e.getMessage(), e);
+        }
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Nullable @Override public T poll() throws IgniteException {
         try {
-            int cnt = 0;
-
-            T retVal;
-
-            while (true) {
-                try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
+            return retryTopologySafe(new Callable<T>() {
+                @Override public T call() throws Exception {
+                    T retVal;
 
-                    if (idx != null) {
-                        checkRemoved(idx);
-
-                        retVal = (T)cache.getAndRemove(itemKey(idx));
+                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                        Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get();
 
-                        assert retVal != null : idx;
-                    }
-                    else
-                        retVal = null;
+                        if (idx != null) {
+                            checkRemoved(idx);
 
-                    tx.commit();
+                            retVal = (T)cache.getAndRemove(itemKey(idx));
 
-                    break;
-                }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
+                            assert retVal != null : idx;
+                        }
+                        else
+                            retVal = null;
 
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to poll item, will retry [err=" + e + ']');
+                        tx.commit();
 
-                        U.sleep(RETRY_DELAY);
+                        return retVal;
                     }
                 }
-            }
-
-            return retVal;
+            }).call();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        catch (RuntimeException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteException(e.getMessage(), e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -153,95 +133,78 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
         A.notNull(items, "items");
 
         try {
-            boolean retVal;
-
-            int cnt = 0;
+            return retryTopologySafe(new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    boolean retVal;
 
-            while (true) {
-                try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
-
-                    if (idx != null) {
-                        checkRemoved(idx);
-
-                        Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
+                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                        Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get();
 
-                        for (T item : items) {
-                            putMap.put(itemKey(idx), item);
+                        if (idx != null) {
+                            checkRemoved(idx);
 
-                            idx++;
-                        }
+                            Map<GridCacheQueueItemKey, T> putMap = new HashMap<>();
 
-                        cache.putAll(putMap);
+                            for (T item : items) {
+                                putMap.put(itemKey(idx), item);
 
-                        retVal = true;
-                    }
-                    else
-                        retVal = false;
+                                idx++;
+                            }
 
-                    tx.commit();
+                            cache.putAll(putMap);
 
-                    break;
-                }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
+                            retVal = true;
+                        }
+                        else
+                            retVal = false;
 
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add item, will retry [err=" + e + ']');
+                        tx.commit();
 
-                        U.sleep(RETRY_DELAY);
+                        return retVal;
                     }
                 }
-            }
-
-            return retVal;
+            }).call();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
         }
+        catch (RuntimeException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteException(e.getMessage(), e);
+        }
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected void removeItem(final long rmvIdx) throws IgniteCheckedException {
         try {
-            int cnt = 0;
+            retryTopologySafe(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
+                        Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
 
-            while (true) {
-                try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
-                    Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get();
+                        if (idx != null) {
+                            checkRemoved(idx);
 
-                    if (idx != null) {
-                        checkRemoved(idx);
+                            boolean rmv = cache.remove(itemKey(idx));
 
-                        boolean rmv = cache.remove(itemKey(idx));
+                            assert rmv : idx;
+                        }
 
-                        assert rmv : idx;
+                        tx.commit();
                     }
 
-                    tx.commit();
-
-                    break;
+                    return null;
                 }
-                catch (ClusterTopologyCheckedException | TransactionRollbackException | IgniteTxRollbackCheckedException e) {
-                    if (e instanceof ClusterGroupEmptyCheckedException)
-                        throw e;
-
-                    if (cnt++ == MAX_UPDATE_RETRIES)
-                        throw e;
-                    else {
-                        U.warn(log, "Failed to add item, will retry [err=" + e + ']');
-
-                        U.sleep(RETRY_DELAY);
-                    }
-                }
-            }
+            }).call();
         }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
+        catch (RuntimeException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 612c1f1..1ea5014 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -164,4 +164,4 @@ public interface DiscoverySpi extends IgniteSpi {
      * @throws IllegalStateException If discovery SPI has not started.
      */
     public boolean isClientMode() throws IllegalStateException;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c711484c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ae23d0e..ae3c8cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.SSLException;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -2152,6 +2153,41 @@ class ServerImpl extends TcpDiscoveryImpl {
             initConnectionCheckFrequency();
         }
 
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            try {
+                super.body();
+            }
+            catch (Throwable e) {
+                if (!spi.isNodeStopping0()) {
+                    final Ignite ignite = spi.ignite();
+
+                    if (ignite != null) {
+                        U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " +
+                            "Stopping the grid in order to prevent cluster wide instability.", e);
+
+                        new Thread(new Runnable() {
+                            @Override public void run() {
+                                try {
+                                    spi.ignite().close();
+
+                                    U.log(log, "Stopped the grid successfully in response to TcpDiscoverySpi's " +
+                                        "message worker thread abnormal termination.");
+                                }
+                                catch (Throwable e) {
+                                    U.error(log, "Failed to stop the grid in response to TcpDiscoverySpi's " +
+                                        "message worker thread abnormal termination.", e);
+                                }
+                            }
+                        }).start();
+                    }
+                }
+
+                // Must be processed by IgniteSpiThread as well.
+                throw e;
+            }
+        }
+
         /**
          * Initializes connection check frequency. Used only when failure detection timeout is enabled.
          */


Mime
View raw message