ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1758
Date Mon, 26 Oct 2015 06:46:18 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1758 1444db19e -> 40c6d35eb


ignite-1758


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/40c6d35e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/40c6d35e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/40c6d35e

Branch: refs/heads/ignite-1758
Commit: 40c6d35eba479ade083de62de0ef8aaa81cb33fe
Parents: 1444db1
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Oct 22 14:03:16 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Oct 26 09:39:26 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  11 +-
 .../communication/tcp/TcpCommunicationSpi.java  |   8 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 122 ++++++--
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  30 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 ...teClientReconnectCacheMultiThreadedTest.java | 232 --------------
 ...gniteClientReconnectMassiveShutdownTest.java | 306 +++++++++++++++++++
 .../testsuites/IgniteClientNodesTestSuite.java  |   3 +
 9 files changed, 434 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 5d3b08b..26a4f3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -353,6 +353,9 @@ public final class IgniteSystemProperties {
     /** Maximum size for affinity assignment history. */
     public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE";
 
+    /** Maximum size for discovery messages history. */
+    public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE";
+
     /** Number of cache operation retries in case of topology exceptions. */
     public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 88837de..5647239 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -193,14 +193,19 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * Node left callback.
+     *
+     * @return {@code False} if descriptor is reserved.
      */
-    public void onNodeLeft() {
+    public boolean onNodeLeft() {
         GridNioFuture<?>[] futs = null;
 
         synchronized (this) {
             nodeLeft = true;
 
-            if (!reserved && !msgFuts.isEmpty()) {
+            if (reserved)
+                return false;
+
+            if (!msgFuts.isEmpty()) {
                 futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]);
 
                 msgFuts.clear();
@@ -209,6 +214,8 @@ public class GridNioRecoveryDescriptor {
 
         if (futs != null)
             completeOnNodeLeft(futs);
+
+        return true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 5ea2c02..c29943c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3100,10 +3100,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 assert !left.isEmpty();
 
                 for (ClientKey id : left) {
-                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id);
+                    GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id);
 
-                    if (recoverySnd != null)
-                        recoverySnd.onNodeLeft();
+                    if (recoverySnd != null) {
+                        if (recoverySnd.onNodeLeft())
+                            recoveryDescs.remove(id);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index a5d0866..7e5af5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -461,7 +461,8 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout)
throws IgniteSpiException, InterruptedException {
+    @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout)
+        throws IgniteSpiException, InterruptedException {
         Collection<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
@@ -501,7 +502,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 InetSocketAddress addr = it.next();
 
-                T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr);
+                T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon,
addr);
 
                 if (sockAndRes == null) {
                     it.remove();
@@ -511,11 +512,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes;
 
-                Socket sock = sockAndRes.get1();
+                Socket sock = sockAndRes.get1().socket();
 
                 switch (sockAndRes.get2()) {
                     case RES_OK:
-                        return new T2<>(sock, sockAndRes.get3());
+                        return new T2<>(sockAndRes.get1(), sockAndRes.get3());
 
                     case RES_CONTINUE_JOIN:
                     case RES_WAIT:
@@ -548,7 +549,7 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @param addr Address.
      * @return Socket, connect response and client acknowledge support flag.
      */
-    @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress
addr) {
+    @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon,
InetSocketAddress addr) {
         assert addr != null;
 
         if (log.isDebugEnabled())
@@ -621,7 +622,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Message has been sent to address [msg=" + msg + ", addr="
+ addr +
                         ", rmtNodeId=" + rmtNodeId + ']');
 
-                return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
+                return new T3<>(new SocketStream(sock),
+                    spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)),
                     res.clientAck());
             }
             catch (IOException | IgniteCheckedException e) {
@@ -765,7 +767,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
     /** {@inheritDoc} */
     @Override public void brakeConnection() {
-        U.closeQuiet(msgWorker.currSock);
+        SocketStream sockStream = msgWorker.currSock;
+
+        if (sockStream != null)
+            U.closeQuiet(sockStream.socket());
     }
 
     /** {@inheritDoc} */
@@ -826,7 +831,7 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final Object mux = new Object();
 
         /** */
-        private Socket sock;
+        private SocketStream sockStream;
 
         /** */
         private UUID rmtNodeId;
@@ -838,12 +843,12 @@ class ClientImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * @param sock Socket.
+         * @param sockStream Socket.
          * @param rmtNodeId Rmt node id.
          */
-        public void setSocket(Socket sock, UUID rmtNodeId) {
+        public void setSocket(SocketStream sockStream, UUID rmtNodeId) {
             synchronized (mux) {
-                this.sock = sock;
+                this.sockStream = sockStream;
 
                 this.rmtNodeId = rmtNodeId;
 
@@ -854,22 +859,24 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             while (!isInterrupted()) {
-                Socket sock;
+                SocketStream sockStream;
                 UUID rmtNodeId;
 
                 synchronized (mux) {
-                    if (this.sock == null) {
+                    if (this.sockStream == null) {
                         mux.wait();
 
                         continue;
                     }
 
-                    sock = this.sock;
+                    sockStream = this.sockStream;
                     rmtNodeId = this.rmtNodeId;
                 }
 
+                Socket sock = sockStream.socket();
+
                 try {
-                    InputStream in = new BufferedInputStream(sock.getInputStream());
+                    InputStream in = sockStream.stream();
 
                     sock.setKeepAlive(true);
                     sock.setTcpNoDelay(true);
@@ -923,7 +930,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     }
                 }
                 catch (IOException e) {
-                    msgWorker.addMessage(new SocketClosedMessage(sock));
+                    msgWorker.addMessage(new SocketClosedMessage(sockStream));
 
                     if (log.isDebugEnabled())
                         U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" +
getLocalNodeId() + ']', e);
@@ -932,8 +939,8 @@ class ClientImpl extends TcpDiscoveryImpl {
                     U.closeQuiet(sock);
 
                     synchronized (mux) {
-                        if (this.sock == sock) {
-                            this.sock = null;
+                        if (this.sockStream == sockStream) {
+                            this.sockStream = null;
                             this.rmtNodeId = null;
                         }
                     }
@@ -1125,7 +1132,7 @@ class ClientImpl extends TcpDiscoveryImpl {
      */
     private class Reconnector extends IgniteSpiThread {
         /** */
-        private volatile Socket sock;
+        private volatile SocketStream sockStream;
 
         /** */
         private boolean clientAck;
@@ -1148,7 +1155,10 @@ class ClientImpl extends TcpDiscoveryImpl {
         public void cancel() {
             interrupt();
 
-            U.closeQuiet(sock);
+            SocketStream sockStream = this.sockStream;
+
+            if (sockStream != null)
+                U.closeQuiet(sockStream.socket());
         }
 
         /** {@inheritDoc} */
@@ -1166,24 +1176,26 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             try {
                 while (true) {
-                    T2<Socket, Boolean> joinRes = joinTopology(true, timeout);
+                    T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout);
 
                     if (joinRes == null) {
                         if (join) {
                             joinError(new IgniteSpiException("Join process timed out, connection
failed and " +
                                 "failed to reconnect (consider increasing 'joinTimeout' configuration
property) " +
-                                "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']'));
+                                "[joinTimeout=" + spi.joinTimeout + ']'));
                         }
                         else
                             U.error(log, "Failed to reconnect to cluster (consider increasing
'networkTimeout'" +
-                                " configuration  property) [networkTimeout=" + spi.netTimeout
+ ", sock=" + sock + ']');
+                                " configuration  property) [networkTimeout=" + spi.netTimeout
+ ']');
 
                         return;
                     }
 
-                    sock = joinRes.get1();
+                    sockStream = joinRes.get1();
                     clientAck = joinRes.get2();
 
+                    Socket sock = sockStream.socket();
+
                     if (isInterrupted())
                         throw new InterruptedException();
 
@@ -1194,7 +1206,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         sock.setSoTimeout((int)spi.netTimeout);
 
-                        InputStream in = new BufferedInputStream(sock.getInputStream());
+                        InputStream in = sockStream.stream();
 
                         sock.setKeepAlive(true);
                         sock.setTcpNoDelay(true);
@@ -1270,7 +1282,10 @@ class ClientImpl extends TcpDiscoveryImpl {
             }
             finally {
                 if (!success) {
-                    U.closeQuiet(sock);
+                    SocketStream sockStream = this.sockStream;
+
+                    if (sockStream != null)
+                        U.closeQuiet(sockStream.socket());
 
                     if (join)
                         joinError(new IgniteSpiException("Failed to connect to cluster, connection
failed and failed " +
@@ -1290,7 +1305,7 @@ class ClientImpl extends TcpDiscoveryImpl {
         private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>();
 
         /** */
-        private Socket currSock;
+        private SocketStream currSock;
 
         /** Indicates that pending messages are currently processed. */
         private boolean pending;
@@ -1469,7 +1484,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }
             }
             finally {
-                U.closeQuiet(currSock);
+                SocketStream currSock = this.currSock;
+
+                if (currSock != null)
+                    U.closeQuiet(currSock.socket());
 
                 if (joinLatch.getCount() > 0)
                     joinError(new IgniteSpiException("Some error in join process.")); //
This should not occur.
@@ -1492,7 +1510,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             joinCnt++;
 
-            T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
+            T2<SocketStream, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
 
             if (joinRes == null) {
                 if (join)
@@ -1508,7 +1526,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             currSock = joinRes.get1();
 
-            sockWriter.setSocket(joinRes.get1(), joinRes.get2());
+            sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2());
 
             if (spi.joinTimeout > 0) {
                 final int joinCnt0 = joinCnt;
@@ -1877,9 +1895,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 if (reconnector != null) {
                     assert msg.success() : msg;
 
-                    currSock = reconnector.sock;
+                    currSock = reconnector.sockStream;
 
-                    sockWriter.setSocket(currSock, reconnector.clientAck);
+                    sockWriter.setSocket(currSock.socket(), reconnector.clientAck);
                     sockReader.setSocket(currSock, locNode.clientRouterNodeId());
 
                     reconnector = null;
@@ -2050,13 +2068,51 @@ class ClientImpl extends TcpDiscoveryImpl {
      */
     private static class SocketClosedMessage {
         /** */
+        private final SocketStream sock;
+
+        /**
+         * @param sock Socket.
+         */
+        private SocketClosedMessage(SocketStream sock) {
+            this.sock = sock;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class SocketStream {
+        /** */
         private final Socket sock;
 
+        /** */
+        private final InputStream in;
+
         /**
          * @param sock Socket.
+         * @throws IOException If failed to create stream.
          */
-        private SocketClosedMessage(Socket sock) {
+        public SocketStream(Socket sock) throws IOException {
+            assert sock != null;
+
             this.sock = sock;
+
+            this.in = new BufferedInputStream(sock.getInputStream());
+        }
+
+        /**
+         * @return Socket.
+         */
+        Socket socket() {
+            return sock;
+
+        }
+
+        /**
+         * @return Socket input stream.
+         */
+        InputStream stream() {
+            return in;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index d8ee953..6bc0402 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -126,6 +126,8 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessa
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE;
+import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -152,6 +154,9 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe
 @SuppressWarnings("All")
 class ServerImpl extends TcpDiscoveryImpl {
     /** */
+    private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE,
1024 * 10);
+
+    /** */
     private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>());
 
@@ -1445,6 +1450,12 @@ class ServerImpl extends TcpDiscoveryImpl {
             tmp = U.arrayList(readers);
         }
 
+        for (ClientMessageWorker msgWorker : clientMsgWorkers.values()) {
+            U.interrupt(msgWorker);
+
+            U.join(msgWorker, log);
+        }
+
         U.interrupt(tmp);
         U.joinThreads(tmp, log);
 
@@ -1742,22 +1753,17 @@ class ServerImpl extends TcpDiscoveryImpl {
      * Discovery messages history used for client reconnect.
      */
     private class EnsuredMessageHistory {
-        /** */
-        private static final int MAX = 1024;
-
         /** Pending messages. */
-        private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX
* 2);
+        private final GridBoundedLinkedHashSet<TcpDiscoveryAbstractMessage>
+            msgs = new GridBoundedLinkedHashSet<>(ENSURED_MSG_HIST_SIZE);
 
         /**
          * @param msg Adds message.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            assert spi.ensured(msg) : msg;
-
-            msgs.addLast(msg);
+            assert spi.ensured(msg) && msg.verified() : msg;
 
-            while (msgs.size() > MAX)
-                msgs.pollFirst();
+            msgs.add(msg);
         }
 
         /**
@@ -1784,7 +1790,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             res = new ArrayList<>(msgs.size());
                     }
 
-                    if (res != null && msg.verified())
+                    if (res != null)
                         res.add(prepare(msg, node.id()));
                 }
 
@@ -1810,7 +1816,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (msg.id().equals(lastMsgId))
                             skip = false;
                     }
-                    else if (msg.verified())
+                    else
                         cp.add(prepare(msg, node.id()));
                 }
 
@@ -2130,7 +2136,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             else
                 assert false : "Unknown message type: " + msg.getClass().getSimpleName();
 
-            if (spi.ensured(msg))
+            if (msg.verified() && spi.ensured(msg))
                 msgHist.add(msg);
 
             if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId()))
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 6254605..7383cd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1956,7 +1956,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi,
T
      * <p>
      * This method is intended for test purposes only.
      */
-    void simulateNodeFailure() {
+    public void simulateNodeFailure() {
         impl.simulateNodeFailure();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java
deleted file mode 100644
index 4321ded..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheMultiThreadedTest.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import java.util.HashMap;
-import java.util.Random;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.CacheException;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteClientDisconnectedException;
-import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cluster.ClusterTopologyException;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.apache.ignite.transactions.TransactionIsolation;
-
-/**
- * Client reconnect test in multi threaded mode while cache operations are in progress.
- */
-public class IgniteClientReconnectCacheMultiThreadedTest extends GridCommonAbstractTest {
-    /** */
-    private static final int GRID_CNT = 14;
-
-    /** */
-    private static final int CLIENT_GRID_CNT = 14;
-
-    /** */
-    private static volatile boolean clientMode;
-
-    /** */
-    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /**
-     * @throws Exception If fails.
-     */
-    public IgniteClientReconnectCacheMultiThreadedTest() throws Exception {
-        super(false);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"IfMayBeConditional"})
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        if (clientMode)
-            cfg.setClientMode(true);
-
-        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        stopAllGrids();
-
-        super.afterTest();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return 5 * 60 * 1000;
-    }
-
-    /**
-     * @throws Exception If any error occurs.
-     */
-    public void testMassiveServersShutdown() throws Exception {
-        clientMode = false;
-
-        final int serversToKill = GRID_CNT / 2;
-
-        startGridsMultiThreaded(GRID_CNT);
-
-        clientMode = true;
-
-        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
-
-        final AtomicBoolean done = new AtomicBoolean();
-
-        // Starting a cache dynamically.
-        Ignite client = grid(GRID_CNT);
-
-        assertTrue(client.configuration().isClientMode());
-
-        CacheConfiguration cfg = new CacheConfiguration();
-
-        cfg.setCacheMode(CacheMode.PARTITIONED);
-        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
-        cfg.setBackups(2);
-        cfg.setOffHeapMaxMemory(0);
-        cfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
-
-        IgniteCache cache = client.getOrCreateCache(cfg);
-
-        HashMap<String, Integer> put = new HashMap<>();
-
-        // Preloading the cache with some data.
-        for (int i = 0; i < 10_000; i++)
-            put.put(String.valueOf(i), i);
-
-        cache.putAll(put);
-
-        // Preparing client nodes and starting cache operations from them.
-        final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>();
-
-        for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
-            clientIdx.add(i);
-
-        IgniteInternalFuture<?> clientsFut = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    int idx = clientIdx.take();
-
-                    Ignite ignite = grid(idx);
-
-                    assertTrue(ignite.configuration().isClientMode());
-
-                    IgniteCache<String, Integer> cache = ignite.cache(null);
-
-                    IgniteTransactions txs = ignite.transactions();
-
-                    Random rand = new Random();
-
-                    while (!done.get()) {
-                        Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC,
-                            TransactionIsolation.READ_COMMITTED);
-
-                        try {
-                            cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000));
-
-                            tx.commit();
-                        }
-                        catch (ClusterTopologyException ex) {
-                            ex.retryReadyFuture().get();
-                        }
-                        catch (CacheException e) {
-                            if (X.hasCause(e, IgniteClientDisconnectedException.class)) {
-                                IgniteClientDisconnectedException cause = X.cause(e,
-                                    IgniteClientDisconnectedException.class);
-
-                                cause.reconnectFuture().get(); // Wait for reconnect.
-                            }
-                            else if (X.hasCause(e, ClusterTopologyException.class)) {
-                                ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
-
-                                cause.retryReadyFuture().get();
-                            }
-                            else
-                                throw e;
-                        }
-                        finally {
-                            tx.close();
-                        }
-                    }
-
-                    return null;
-                }
-            },
-            CLIENT_GRID_CNT
-        );
-
-        // Killing a half of server nodes.
-        final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>();
-
-        for (int i = 0; i < serversToKill; i++)
-            victims.add(i);
-
-        final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>();
-
-        for (int i = serversToKill; i < GRID_CNT; i++)
-            assassins.add(i);
-
-        IgniteInternalFuture<?> serversShutdownFut = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Thread.sleep(5_000);
-
-                    Ignite assassin = grid(assassins.take());
-
-                    assertFalse(assassin.configuration().isClientMode());
-
-                    Ignite victim = grid(victims.take());
-
-                    assertFalse(victim.configuration().isClientMode());
-
-                    assassin.configuration().getDiscoverySpi().failNode(victim.cluster().localNode().id(),
null);
-
-                    return null;
-                }
-            },
-            assassins.size()
-        );
-
-        serversShutdownFut.get();
-
-        Thread.sleep(15_000);
-
-        done.set(true);
-
-        clientsFut.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java
new file mode 100644
index 0000000..5ae5a48
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectMassiveShutdownTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.HashMap;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Client reconnect test in multi threaded mode while cache operations are in progress.
+ */
+public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 14;
+
+    /** */
+    private static final int CLIENT_GRID_CNT = 14;
+
+    /** */
+    private static volatile boolean clientMode;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /**
+     * @throws Exception If fails.
+     */
+    public IgniteClientReconnectMassiveShutdownTest() throws Exception {
+        super(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setClientMode(clientMode);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60 * 1000;
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void _testMassiveServersShutdown1() throws Exception {
+        massiveServersShutdown(StopType.FAIL_EVENT);
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void testMassiveServersShutdown2() throws Exception {
+        massiveServersShutdown(StopType.SIMULATE_FAIL);
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void testMassiveServersShutdown3() throws Exception {
+        massiveServersShutdown(StopType.CLOSE);
+    }
+
+    /**
+     * @param stopType How tp stop node.
+     * @throws Exception If any error occurs.
+     */
+    private void massiveServersShutdown(final StopType stopType) throws Exception {
+        clientMode = false;
+
+        startGridsMultiThreaded(GRID_CNT);
+
+        clientMode = true;
+
+        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        // Starting a cache dynamically.
+        Ignite client = grid(GRID_CNT);
+
+        assertTrue(client.configuration().isClientMode());
+
+        CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>();
+
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setBackups(2);
+        cfg.setOffHeapMaxMemory(0);
+        cfg.setMemoryMode(OFFHEAP_TIERED);
+
+        IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg);
+
+        HashMap<String, Integer> put = new HashMap<>();
+
+        // Load some data.
+        for (int i = 0; i < 10_000; i++)
+            put.put(String.valueOf(i), i);
+
+        cache.putAll(put);
+
+        // Preparing client nodes and starting cache operations from them.
+        final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>();
+
+        for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
+            clientIdx.add(i);
+
+        IgniteInternalFuture<?> clientsFut = multithreadedAsync(
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int idx = clientIdx.take();
+
+                    Ignite ignite = grid(idx);
+
+                    Thread.currentThread().setName("client-thread-" + ignite.name());
+
+                    assertTrue(ignite.configuration().isClientMode());
+
+                    IgniteCache<String, Integer> cache = ignite.cache(null);
+
+                    IgniteTransactions txs = ignite.transactions();
+
+                    Random rand = new Random();
+
+                    while (!done.get()) {
+                        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ))
{
+                            cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000));
+
+                            tx.commit();
+                        }
+                        catch (ClusterTopologyException ex) {
+                            ex.retryReadyFuture().get();
+                        }
+                        catch (IgniteException | CacheException e) {
+                            if (X.hasCause(e, IgniteClientDisconnectedException.class)) {
+                                IgniteClientDisconnectedException cause = X.cause(e,
+                                    IgniteClientDisconnectedException.class);
+
+                                assert cause != null;
+
+                                cause.reconnectFuture().get();
+                            }
+                            else if (X.hasCause(e, ClusterTopologyException.class)) {
+                                ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+                                assert cause != null;
+
+                                cause.retryReadyFuture().get();
+                            }
+                            else
+                                throw e;
+                        }
+                    }
+
+                    return null;
+                }
+            },
+            CLIENT_GRID_CNT);
+
+        try {
+            // Killing a half of server nodes.
+            final int srvsToKill = GRID_CNT / 2;
+
+            final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>();
+
+            for (int i = 0; i < srvsToKill; i++)
+                victims.add(i);
+
+            final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>();
+
+            for (int i = srvsToKill; i < GRID_CNT; i++)
+                assassins.add(i);
+
+            IgniteInternalFuture<?> srvsShutdownFut = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        Thread.sleep(5_000);
+
+                        Ignite assassin = grid(assassins.take());
+
+                        assertFalse(assassin.configuration().isClientMode());
+
+                        Ignite victim = grid(victims.take());
+
+                        assertFalse(victim.configuration().isClientMode());
+
+                        log.info("Kill node [node=" + victim.name() + ", from=" + assassin.name()
+ ']');
+
+                        switch (stopType) {
+                            case CLOSE:
+                                victim.close();
+
+                                break;
+
+                            case FAIL_EVENT:
+                                UUID nodeId = victim.cluster().localNode().id();
+
+                                assassin.configuration().getDiscoverySpi().failNode(nodeId,
null);
+
+                                break;
+
+                            case SIMULATE_FAIL:
+                                ((TcpDiscoverySpi)victim.configuration().getDiscoverySpi()).simulateNodeFailure();
+
+                                break;
+
+                            default:
+                                fail();
+                        }
+
+                        return null;
+                    }
+                },
+                assassins.size()
+            );
+
+            srvsShutdownFut.get();
+
+            Thread.sleep(15_000);
+
+            done.set(true);
+
+            clientsFut.get();
+
+            awaitPartitionMapExchange();
+
+            for (int k = 0; k < 10_000; k++) {
+                String key = String.valueOf(k);
+
+                Object val = cache.get(key);
+
+                for (int i = srvsToKill; i < GRID_CNT; i++)
+                    assertEquals(val, ignite(i).cache(null).get(key));
+            }
+        }
+        finally {
+            done.set(true);
+        }
+    }
+
+    /**
+     *
+     */
+    enum StopType {
+        /** */
+        CLOSE,
+
+        /** */
+        SIMULATE_FAIL,
+
+        /** */
+        FAIL_EVENT
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/40c6d35e/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java
index c9405fa..689097e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientNodesTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.IgniteClientReconnectMassiveShutdownTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientNodeConcurrentStart;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheManyClientsTest;
@@ -39,6 +40,8 @@ public class IgniteClientNodesTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCacheClientNodeConcurrentStart.class);
         suite.addTestSuite(IgniteCacheClientReconnectTest.class);
 
+        suite.addTestSuite(IgniteClientReconnectMassiveShutdownTest.class);
+
         return suite;
     }
 }
\ No newline at end of file


Mime
View raw message