ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-3479
Date Thu, 28 Sep 2017 14:38:24 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3479 eb141c6ea -> d92cfa435


ignite-3479


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d92cfa43
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d92cfa43
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d92cfa43

Branch: refs/heads/ignite-3479
Commit: d92cfa435da5f72f316f252eb0be03eac1c5d71d
Parents: eb141c6
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Sep 28 15:31:06 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Sep 28 17:38:14 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 65 ++++++++++++---
 .../distributed/dht/GridDhtTxPrepareFuture.java | 10 ++-
 .../GridNearPessimisticTxPrepareFuture.java     |  1 +
 .../near/GridNearTxPrepareFutureAdapter.java    |  2 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  | 85 ++++++++++++++------
 .../processors/cache/mvcc/MvccQueryTracker.java | 40 +++++++--
 .../mvcc/NewCoordinatorQueryAckRequest.java     |  2 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 63 +++++++++++----
 8 files changed, 205 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f850ad3..097d90f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1719,9 +1719,15 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
         IgniteTxManager tm = cctx.tm();
 
         if (tm != null) {
-            U.warn(diagnosticLog, "Pending transactions:");
+            boolean first = true;
 
             for (IgniteInternalTx tx : tm.activeTransactions()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending transactions:");
+
+                    first = false;
+                }
+
                 if (exchTopVer != null) {
                     U.warn(diagnosticLog, ">>> [txVer=" + tx.topologyVersionSnapshot()
+
                         ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) +
@@ -1735,31 +1741,66 @@ public class GridCachePartitionExchangeManager<K, V> extends
GridCacheSharedMana
         GridCacheMvccManager mvcc = cctx.mvcc();
 
         if (mvcc != null) {
-            U.warn(diagnosticLog, "Pending explicit locks:");
+            boolean first = true;
+
+            for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending explicit locks:");
+
+                    first = false;
+                }
 
-            for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks())
                 U.warn(diagnosticLog, ">>> " + lockSpan);
+            }
+
+            first = true;
 
-            U.warn(diagnosticLog, "Pending cache futures:");
+            for (GridCacheFuture<?> fut : mvcc.activeFutures()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending cache futures:");
+
+                    first = false;
+                }
 
-            for (GridCacheFuture<?> fut : mvcc.activeFutures())
                 dumpDiagnosticInfo(fut, diagCtx);
+            }
+
+            first = true;
 
-            U.warn(diagnosticLog, "Pending atomic cache futures:");
+            for (GridCacheFuture<?> fut : mvcc.atomicFutures()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending atomic cache futures:");
+
+                    first = false;
+                }
 
-            for (GridCacheFuture<?> fut : mvcc.atomicFutures())
                 dumpDiagnosticInfo(fut, diagCtx);
+            }
 
-            U.warn(diagnosticLog, "Pending data streamer futures:");
+            first = true;
+
+            for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) {
+                if (first) {
+                    U.warn(diagnosticLog, "Pending data streamer futures:");
+
+                    first = false;
+                }
 
-            for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
                 dumpDiagnosticInfo(fut, diagCtx);
+            }
 
             if (tm != null) {
-                U.warn(diagnosticLog, "Pending transaction deadlock detection futures:");
+                first = true;
+
+                for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures()) {
+                    if (first) {
+                        U.warn(diagnosticLog, "Pending transaction deadlock detection futures:");
+
+                        first = false;
+                    }
 
-                for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures())
                     dumpDiagnosticInfo(fut, diagCtx);
+                }
             }
         }
 
@@ -1781,6 +1822,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     affDumpCnt++;
             }
         }
+
+        cctx.kernalContext().coordinators().dumpDebugInfo(diagnosticLog, diagCtx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 4eca4e8..723bd4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1239,6 +1239,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
             IgniteInternalFuture<MvccCoordinatorVersion> waitCrdCntrFut = null;
 
             if (req.requestMvccCounter()) {
+                assert last;
+
                 assert tx.txState().mvccEnabled(cctx);
 
                 MvccCoordinator crd = cctx.coordinators().currentCoordinator();
@@ -1286,17 +1288,17 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                                 fut.get();
 
                                 sendPrepareRequests();
+
+                                markInitialized();
                             }
                             catch (Throwable e) {
-                                U.error(log, "Failed to get coordinator counter: " + e, e);
+                                U.error(log, "Failed to get mvcc version for tx [txId=" +
tx.nearXidVersion() +
+                                    ", err=" + e + ']', e);
 
                                 GridNearTxPrepareResponse res = createPrepareResponse(e);
 
                                 onDone(res, res.error());
                             }
-                            finally {
-                                markInitialized();
-                            }
                         }
                     });
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 2001011..4a2aeb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -507,6 +507,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
 
         return S.toString(GridNearPessimisticTxPrepareFuture.class, this,
             "innerFuts", futs,
+            "txId", tx.nearXidVersion(),
             "super", super.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index ddc5826..987a751 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -160,7 +160,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends
      * @param txMapping Transaction mapping.
      */
     final void checkOnePhase(GridDhtTxMapping txMapping) {
-        if (tx.storeWriteThrough())
+        if (tx.storeWriteThrough() || tx.txState().mvccEnabled(cctx)) // TODO IGNITE-3479
(onePhase + mvcc)
             return;
 
         Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 759a369..cfd6c4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -35,6 +35,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -264,7 +265,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
                 MSG_POLICY);
         }
         catch (IgniteCheckedException e) {
-            fut.onError(e);
+            if (verFuts.remove(fut.id) != null)
+                fut.onError(e);
         }
 
         return fut;
@@ -328,7 +330,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         }
         catch (IgniteCheckedException e) {
             if (verFuts.remove(fut.id) != null)
-                fut.onDone(e);
+                fut.onError(e);
         }
 
         return fut;
@@ -354,13 +356,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
                 new CoordinatorWaitTxsRequest(fut.id, txs),
                 MSG_POLICY);
         }
-        catch (ClusterTopologyCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(); // No need to ack, finish without error.
-        }
         catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(e);
+            if (ackFuts.remove(fut.id) != null) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    fut.onDone(); // No need to wait, new coordinator will be assigned, finish
without error.
+                else
+                    fut.onDone(e);
+            }
         }
 
         return fut;
@@ -385,13 +387,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
                 new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
                 MSG_POLICY);
         }
-        catch (ClusterTopologyCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(); // No need to ack, finish without error.
-        }
         catch (IgniteCheckedException e) {
-            if (ackFuts.remove(fut.id) != null)
-                fut.onDone(e);
+            if (ackFuts.remove(fut.id) != null) {
+                if (e instanceof ClusterTopologyCheckedException)
+                    fut.onDone(); // No need to ack, finish without error.
+                else
+                    fut.onDone(e);
+            }
         }
 
         return fut;
@@ -964,6 +966,36 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
     }
 
     /**
+     * @param log Logger.
+     * @param diagCtx Diagnostic request.
+     */
+    public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext
diagCtx) {
+        boolean first = true;
+
+        for (MvccVersionFuture verFur : verFuts.values()) {
+            if (first) {
+                U.warn(log, "Pending mvcc version futures: ");
+
+                first = false;
+            }
+
+            U.warn(log, ">>> " + verFur.toString());
+        }
+
+        first = true;
+
+        for (WaitAckFuture waitAckFut : ackFuts.values()) {
+            if (first) {
+                U.warn(log, "Pending mvcc wait ack futures: ");
+
+                first = false;
+            }
+
+            U.warn(log, ">>> " + waitAckFut.toString());
+        }
+    }
+
+    /**
      *
      */
     public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion>
{
@@ -1004,22 +1036,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
             onDone(res);
         }
 
+        /**
+         * @param err Error.
+         */
         void onError(IgniteCheckedException err) {
-            if (verFuts.remove(id) != null) {
-                if (lsnr != null)
-                    lsnr.onMvccError(err);
+            if (lsnr != null)
+                lsnr.onMvccError(err);
 
-                onDone(err);
-            }
+            onDone(err);
         }
 
         /**
          * @param nodeId Failed node ID.
          */
-        void onNodeLeft(UUID nodeId) {
-            if (crdId.equals(nodeId)) {
-                ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed
to request coordinator version, " +
-                    "coordinator failed: " + nodeId);
+        void onNodeLeft(UUID nodeId ) {
+            if (crdId.equals(nodeId) && verFuts.remove(id) != null) {
+                ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed
to request mvcc " +
+                    "version, coordinator failed: " + nodeId);
 
                 onError(err);
             }
@@ -1073,13 +1106,15 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter
{
          * @param nodeId Failed node ID.
          */
         void onNodeLeft(UUID nodeId) {
-            if (crdId.equals(nodeId) && verFuts.remove(id) != null)
+            if (crdId.equals(nodeId) && ackFuts.remove(id) != null)
                 onDone();
         }
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return "WaitAckFuture [crdId=" + crdId + ", id=" + id + ']';
+            return "WaitAckFuture [crdId=" + crdId +
+                ", id=" + id +
+                ", ackTx=" + ackTx + ']';
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index 095f630..a460820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 
@@ -37,18 +39,20 @@ public class MvccQueryTracker {
     private MvccCoordinatorVersion mvccVer;
 
     /** */
+    @GridToStringExclude
     private final GridCacheContext cctx;
 
     /** */
     private final boolean canRemap;
 
     /** */
+    @GridToStringExclude
     private final MvccQueryAware lsnr;
 
     /**
-     * @param cctx
-     * @param canRemap
-     * @param lsnr
+     * @param cctx Cache context.
+     * @param canRemap {@code True} if can wait for topology changes.
+     * @param lsnr Listener.
      */
     public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr)
{
         assert cctx.mvccEnabled() : cctx.name();
@@ -58,6 +62,16 @@ public class MvccQueryTracker {
         this.lsnr = lsnr;
     }
 
+    /**
+     * @return Requested mvcc version.
+     */
+    public MvccCoordinatorVersion mvccVersion() {
+        assert mvccVer != null : this;
+
+        return mvccVer;
+    }
+
+    /** {@inheritDoc} */
     @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd)
{
         synchronized (this) {
             if (mvccVer != null) {
@@ -72,10 +86,9 @@ public class MvccQueryTracker {
         }
     }
 
-    public MvccCoordinatorVersion mvccVersion() {
-        return mvccVer;
-    }
-
+    /**
+     *
+     */
     public void onQueryDone() {
         MvccCoordinator mvccCrd0 = null;
         MvccCoordinatorVersion mvccVer0 = null;
@@ -95,6 +108,9 @@ public class MvccQueryTracker {
             cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0);
     }
 
+    /**
+     * @param topVer Topology version.
+     */
     public void requestVersion(final AffinityTopologyVersion topVer) {
         MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer);
 
@@ -130,6 +146,8 @@ public class MvccQueryTracker {
                 try {
                     MvccCoordinatorVersion rcvdVer = fut.get();
 
+                    assert rcvdVer != null;
+
                     boolean needRemap = false;
 
                     synchronized (MvccQueryTracker.this) {
@@ -168,6 +186,9 @@ public class MvccQueryTracker {
         });
     }
 
+    /**
+     * @param topVer Current topology version.
+     */
     private void waitNextTopology(AffinityTopologyVersion topVer) {
         assert canRemap;
 
@@ -189,4 +210,9 @@ public class MvccQueryTracker {
             });
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccQueryTracker.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
index 40b8e01..5631fed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java
@@ -54,7 +54,7 @@ public class NewCoordinatorQueryAckRequest implements MvccCoordinatorMessage
{
 
     /** {@inheritDoc} */
     @Override public boolean waitForCoordinatorInit() {
-        return true;
+        return false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index bec2725..074c4f8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -1677,21 +1677,21 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
-        readInProgressCoordinatorFails(false);
+        readInProgressCoordinatorFailsSimple(false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception {
-        readInProgressCoordinatorFails(true);
+        readInProgressCoordinatorFailsSimple(true);
     }
 
     /**
      * @param fromClient {@code True} if read from client node, otherwise from server node.
      * @throws Exception If failed.
      */
-    private void readInProgressCoordinatorFails(boolean fromClient) throws Exception {
+    private void readInProgressCoordinatorFailsSimple(boolean fromClient) throws Exception
{
         testSpi = true;
 
         startGrids(4);
@@ -1790,13 +1790,33 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
      * @throws Exception If failed.
      */
     public void testReadInProgressCoordinatorFails() throws Exception {
-        startGrids(3);
+        readInProgressCoordinatorFails(false);
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFails_ReadDelay() throws Exception {
+        readInProgressCoordinatorFails(true);
+    }
+
+    /**
+     * @param readDelay {@code True} if delays get requests.
+     * @throws Exception If failed.
+     */
+    private void readInProgressCoordinatorFails(boolean readDelay) throws Exception {
+        final int COORD_NODES = 5;
+        final int SRV_NODES = 4;
 
-        startGridsMultiThreaded(3, 4);
+        if (readDelay)
+            testSpi = true;
+
+        startGrids(COORD_NODES);
+
+        startGridsMultiThreaded(COORD_NODES, SRV_NODES);
 
         client = true;
 
-        Ignite client = startGrid(7);
+        Ignite client = startGrid(COORD_NODES + SRV_NODES);
 
         final List<String> cacheNames = new ArrayList<>();
 
@@ -1807,12 +1827,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
         for (int i = 0; i < KEYS; i++)
             vals.put(i, 0);
 
+        String[] exclude = new String[COORD_NODES];
+
+        for (int i = 0; i < COORD_NODES; i++)
+            exclude[i] = testNodeName(i);
+
         for (CacheConfiguration ccfg : cacheConfigurations()) {
             ccfg.setName("cache-" + cacheNames.size());
 
-            // First 3 server nodes are 'dedicated' coordinators.
-            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(
-                testNodeName(0), testNodeName(1), testNodeName(2)));
+            // First server nodes are 'dedicated' coordinators.
+            ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude));
 
             cacheNames.add(ccfg.getName());
 
@@ -1825,6 +1849,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
             }
         }
 
+        if (readDelay) {
+            for (int i = COORD_NODES; i < COORD_NODES + SRV_NODES + 1; i++) {
+                TestRecordingCommunicationSpi.spi(ignite(i)).closure(new IgniteBiInClosure<ClusterNode,
Message>() {
+                    @Override public void apply(ClusterNode node, Message msg) {
+                        if (msg instanceof GridNearGetRequest)
+                            doSleep(ThreadLocalRandom.current().nextLong(50) + 1);
+                    }
+                });
+            }
+        }
+
         final AtomicBoolean done = new AtomicBoolean();
 
         try {
@@ -1833,7 +1868,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
             IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>()
{
                 @Override public Void call() throws Exception {
                     try {
-                        Ignite node = ignite(3 + (readNodeIdx.getAndIncrement() % 5));
+                        Ignite node = ignite(COORD_NODES + (readNodeIdx.getAndIncrement()
% (SRV_NODES + 1)));
 
                         int cnt = 0;
 
@@ -1868,11 +1903,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
                         throw e;
                     }
                 }
-            }, 10, "get-thread");
+            }, (SRV_NODES + 1) + 1, "get-thread");
 
             IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new Callable() {
                 @Override public Void call() throws Exception {
-                    Ignite node = ignite(3);
+                    Ignite node = ignite(COORD_NODES);
 
                     List<IgniteCache> caches = new ArrayList<>();
 
@@ -1909,7 +1944,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
 
             IgniteInternalFuture putFut2 = GridTestUtils.runAsync(new Callable() {
                 @Override public Void call() throws Exception {
-                    Ignite node = ignite(3);
+                    Ignite node = ignite(COORD_NODES);
 
                     IgniteCache cache = node.cache(cacheNames.get(0));
 
@@ -1934,7 +1969,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
                 }
             }, "put-thread");
 
-            for (int i = 0; i < 3 && !getFut.isDone(); i++) {
+            for (int i = 0; i < COORD_NODES && !getFut.isDone(); i++) {
                 U.sleep(3000);
 
                 stopGrid(i);


Mime
View raw message