ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [2/4] ignite git commit: Merged 4.ea2 into 5267
Date Wed, 07 Jun 2017 18:09:10 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
new file mode 100644
index 0000000..72f8469
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.io.Externalizable;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+
+/**
+ * Grid partition state map. States are encoded using bits.
+ * <p>
+ * Null values are prohibited.
+ */
+public class GridPartitionStateMap extends AbstractMap<Integer, GridDhtPartitionState> implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Required bits to hold all state. Additional zero state is required as well. */
+    private static final int BITS = Integer.SIZE -
+        Integer.numberOfLeadingZeros(GridDhtPartitionState.values().length + 1);
+
+    /** */
+    private final BitSet states;
+
+    /** */
+    private int size;
+
+    /** {@inheritDoc} */
+    @Override public Set<Entry<Integer, GridDhtPartitionState>> entrySet() {
+        return new AbstractSet<Entry<Integer, GridDhtPartitionState>>() {
+            @Override public Iterator<Entry<Integer, GridDhtPartitionState>> iterator() {
+                final int size = states.length() == 0 ? 0 : (states.length() - 1)/ BITS + 1;
+
+                return new Iterator<Entry<Integer, GridDhtPartitionState>>() {
+                    private int next;
+                    private int cur;
+
+                    @Override public boolean hasNext() {
+                        while(state(next) == null && next < size)
+                            next++;
+
+                        return next < size;
+                    }
+
+                    @Override public Entry<Integer, GridDhtPartitionState> next() {
+                        cur = next;
+                        next++;
+
+                        return new Entry<Integer, GridDhtPartitionState>() {
+                            int p = cur;
+
+                            @Override public Integer getKey() {
+                                return p;
+                            }
+
+                            @Override public GridDhtPartitionState getValue() {
+                                return state(p);
+                            }
+
+                            @Override public GridDhtPartitionState setValue(GridDhtPartitionState val) {
+                                return setState(p, val);
+                            }
+                        };
+                    }
+
+                    @Override public void remove() {
+                        setState(cur, null);
+                    }
+                };
+            }
+
+            @Override public int size() {
+                return GridPartitionStateMap.this.size();
+            }
+        };
+    }
+
+    /**
+     * Default constructor.
+     */
+    public GridPartitionStateMap() {
+        states = new BitSet();
+    }
+
+    /**
+     * @param parts Partitions to hold.
+     */
+    public GridPartitionStateMap(int parts) {
+        states = new BitSet(parts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionState put(Integer key, GridDhtPartitionState val) {
+        assert val != null;
+
+        return setState(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionState get(Object key) {
+        return state((Integer)key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionState remove(Object key) {
+        return setState((Integer)key, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean containsKey(Object key) {
+        return state((Integer)key) != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return size;
+    }
+
+    /** */
+    private GridDhtPartitionState setState(int part, GridDhtPartitionState st) {
+        GridDhtPartitionState old = state(part);
+
+        if (old == st)
+            return old;
+
+        int off = part * BITS;
+
+        int ist = st == null ? 0 : st.ordinal() + 1; // Reserve all zero bits for empty value
+
+        for (int i = 0; i < BITS; i++) {
+            states.set(off + i, (ist & 1) == 1);
+
+            ist >>>= 1;
+        }
+
+        size += (st == null ? -1 : old == null ? 1 : 0);
+
+        return old;
+    }
+
+    /** */
+    private GridDhtPartitionState state(int part) {
+        int off = part * BITS;
+
+        int st = 0;
+
+        for (int i = 0; i < BITS; i++)
+            st |= ((states.get(off + i) ? 1 : 0) << i);
+
+        return st == 0 ? null : GridDhtPartitionState.values()[st - 1];
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index ba4717a..e30333b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8568,6 +8568,18 @@ public abstract class IgniteUtils {
     }
 
     /**
+     * A primitive override of {@link #hash(Object)} to avoid unnecessary boxing.
+     *
+     * @param key Value to hash.
+     * @return Hash value.
+     */
+    public static int hash(long key) {
+        int val = (int)(key ^ (key >>> 32));
+
+        return hash(val);
+    }
+
+    /**
      * @return PID of the current JVM or {@code -1} if it can't be determined.
      */
     public static int jvmPid() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
index 83765ce..ee6a04d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
@@ -197,6 +197,8 @@ public class OffheapReadWriteLock {
      * @param lock Lock address.
      */
     public boolean writeLock(long lock, int tag) {
+        assert tag != 0;
+
         for (int i = 0; i < SPIN_CNT; i++) {
             long state = GridUnsafe.getLongVolatile(null, lock);
 
@@ -252,6 +254,8 @@ public class OffheapReadWriteLock {
     public void writeUnlock(long lock, int tag) {
         long updated;
 
+        assert tag != 0;
+
         while (true) {
             long state = GridUnsafe.getLongVolatile(null, lock);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 66993ea..d1e251f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -54,7 +54,9 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -706,6 +709,7 @@ public class GridNioServer<T> {
     /**
      *
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void dumpStats() {
         U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
             ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
@@ -715,6 +719,51 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @param msg Message to add.
+     * @param p Session predicate.
+     * @return Future.
+     */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    public IgniteInternalFuture<String> dumpNodeStats(final String msg, IgnitePredicate<GridNioSession> p) {
+        GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new IgniteReducer<String, String>() {
+            private final StringBuilder sb = new StringBuilder(msg);
+
+            @Override public boolean collect(@Nullable String msg) {
+                if (!F.isEmpty(msg)) {
+                    synchronized (sb) {
+                        if (sb.length() > 0)
+                            sb.append(U.nl());
+
+                        sb.append(msg);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public String reduce() {
+                synchronized (sb) {
+                    return sb.toString();
+                }
+            }
+        });
+
+        for (int i = 0; i < clientWorkers.size(); i++) {
+            NioOperationFuture<String> opFut = new NioOperationFuture<>(null, NioOperation.DUMP_STATS);
+
+            opFut.msg = p;
+
+            clientWorkers.get(i).offer(opFut);
+
+            fut.add(opFut);
+        }
+
+        fut.markInitialized();
+
+        return fut;
+    }
+
+    /**
      * Establishes a session.
      *
      * @param ch Channel to register within the server and create session for.
@@ -1514,12 +1563,15 @@ public class GridNioServer<T> {
      */
     private abstract class AbstractNioClientWorker extends GridWorker implements GridNioWorker {
         /** Queue of change requests on this selector. */
+        @GridToStringExclude
         private final ConcurrentLinkedQueue<SessionChangeRequest> changeReqs = new ConcurrentLinkedQueue<>();
 
         /** Selector to select read events. */
+        @GridToStringExclude
         private Selector selector;
 
         /** Selected keys. */
+        @GridToStringExclude
         private SelectedSelectionKeySet selectedKeys;
 
         /** Worker index. */
@@ -1538,6 +1590,7 @@ public class GridNioServer<T> {
         private volatile long bytesSent0;
 
         /** Sessions assigned to this worker. */
+        @GridToStringExclude
         private final GridConcurrentHashSet<GridSelectorNioSessionImpl> workerSessions =
             new GridConcurrentHashSet<>();
 
@@ -1812,12 +1865,28 @@ public class GridNioServer<T> {
                             case DUMP_STATS: {
                                 NioOperationFuture req = (NioOperationFuture)req0;
 
-                                try {
-                                    dumpStats();
+                                if (req.msg instanceof IgnitePredicate) {
+                                    StringBuilder sb = new StringBuilder();
+
+                                    try {
+                                        dumpStats(sb, (IgnitePredicate<GridNioSession>)req.msg, true);
+                                    }
+                                    finally {
+                                        req.onDone(sb.toString());
+                                    }
                                 }
-                                finally {
-                                    // Complete the request just in case (none should wait on this future).
-                                    req.onDone(true);
+                                else {
+                                    try {
+                                        StringBuilder sb = new StringBuilder();
+
+                                        dumpStats(sb, null, false);
+
+                                        U.warn(log, sb.toString());
+                                    }
+                                    finally {
+                                        // Complete the request just in case (none should wait on this future).
+                                        req.onDone(true);
+                                    }
                                 }
                             }
                         }
@@ -1925,80 +1994,131 @@ public class GridNioServer<T> {
         }
 
         /**
-         *
+         * @param sb Message builder.
+         * @param keys Keys.
          */
-        private void dumpStats() {
-            StringBuilder sb = new StringBuilder();
-
-            Set<SelectionKey> keys = selector.keys();
-
-            sb.append(U.nl())
-                .append(">> Selector info [idx=").append(idx)
+        private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) {
+            sb.append(">> Selector info [idx=").append(idx)
                 .append(", keysCnt=").append(keys.size())
                 .append(", bytesRcvd=").append(bytesRcvd)
                 .append(", bytesRcvd0=").append(bytesRcvd0)
                 .append(", bytesSent=").append(bytesSent)
                 .append(", bytesSent0=").append(bytesSent0)
                 .append("]").append(U.nl());
+        }
+
+        /**
+         * @param sb Message builder.
+         * @param p Optional session predicate.
+         * @param shortInfo Short info flag.
+         */
+        private void dumpStats(StringBuilder sb,
+            @Nullable IgnitePredicate<GridNioSession> p,
+            boolean shortInfo) {
+            Set<SelectionKey> keys = selector.keys();
+
+            boolean selInfo = p == null;
+
+            if (selInfo)
+                dumpSelectorInfo(sb, keys);
 
             for (SelectionKey key : keys) {
                 GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
-                MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-                MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+                boolean sesInfo = p == null || p.apply(ses);
 
-                sb.append("    Connection info [")
-                    .append("in=").append(ses.accepted())
-                    .append(", rmtAddr=").append(ses.remoteAddress())
-                    .append(", locAddr=").append(ses.localAddress());
+                if (sesInfo) {
+                    if (!selInfo) {
+                        dumpSelectorInfo(sb, keys);
 
-                GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+                        selInfo = true;
+                    }
 
-                if (outDesc != null) {
-                    sb.append(", msgsSent=").append(outDesc.sent())
-                        .append(", msgsAckedByRmt=").append(outDesc.acked())
-                        .append(", descIdHash=").append(System.identityHashCode(outDesc));
-                }
-                else
-                    sb.append(", outRecoveryDesc=null");
+                    sb.append("    Connection info [")
+                        .append("in=").append(ses.accepted())
+                        .append(", rmtAddr=").append(ses.remoteAddress())
+                        .append(", locAddr=").append(ses.localAddress());
 
-                GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+                    GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
 
-                if (inDesc != null) {
-                    sb.append(", msgsRcvd=").append(inDesc.received())
-                        .append(", lastAcked=").append(inDesc.lastAcknowledged())
-                        .append(", descIdHash=").append(System.identityHashCode(inDesc));
-                }
-                else
-                    sb.append(", inRecoveryDesc=null");
+                    if (outDesc != null) {
+                        sb.append(", msgsSent=").append(outDesc.sent())
+                            .append(", msgsAckedByRmt=").append(outDesc.acked())
+                            .append(", descIdHash=").append(System.identityHashCode(outDesc));
 
-                sb.append(", bytesRcvd=").append(ses.bytesReceived())
-                    .append(", bytesRcvd0=").append(ses.bytesReceived0())
-                    .append(", bytesSent=").append(ses.bytesSent())
-                    .append(", bytesSent0=").append(ses.bytesSent0())
-                    .append(", opQueueSize=").append(ses.writeQueueSize())
-                    .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
-                    .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+                        if (!outDesc.messagesRequests().isEmpty()) {
+                            int cnt = 0;
 
-                int cnt = 0;
+                            sb.append(", unackedMsgs=[");
 
-                for (SessionWriteRequest req : ses.writeQueue()) {
-                    if (cnt == 0)
-                        sb.append(",\n opQueue=[").append(req);
+                            for (SessionWriteRequest req : outDesc.messagesRequests()) {
+                                if (cnt != 0)
+                                    sb.append(", ");
+
+                                Object msg = req.message();
+
+                                if (shortInfo && msg instanceof GridIoMessage)
+                                    msg = ((GridIoMessage)msg).message().getClass().getSimpleName();
+
+                                sb.append(msg);
+
+                                if (++cnt == 5)
+                                    break;
+                            }
+
+                            sb.append(']');
+                        }
+                    }
                     else
-                        sb.append(',').append(req);
+                        sb.append(", outRecoveryDesc=null");
 
-                    if (++cnt == 5) {
-                        sb.append(']');
+                    GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
 
-                        break;
+                    if (inDesc != null) {
+                        sb.append(", msgsRcvd=").append(inDesc.received())
+                            .append(", lastAcked=").append(inDesc.lastAcknowledged())
+                            .append(", descIdHash=").append(System.identityHashCode(inDesc));
                     }
-                }
+                    else
+                        sb.append(", inRecoveryDesc=null");
 
-                sb.append("]").append(U.nl());
-            }
+                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                        .append(", bytesRcvd0=").append(ses.bytesReceived0())
+                        .append(", bytesSent=").append(ses.bytesSent())
+                        .append(", bytesSent0=").append(ses.bytesSent0())
+                        .append(", opQueueSize=").append(ses.writeQueueSize());
+
+                    if (!shortInfo) {
+                        MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                        MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
+                        sb.append(", msgWriter=").append(writer != null ? writer.toString() : "null")
+                            .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+                    }
 
-            U.warn(log, sb.toString());
+                    int cnt = 0;
+
+                    for (SessionWriteRequest req : ses.writeQueue()) {
+                        Object msg = req.message();
+
+                        if (shortInfo && msg instanceof GridIoMessage)
+                            msg = ((GridIoMessage)msg).message().getClass().getSimpleName();
+
+                        if (cnt == 0)
+                            sb.append(",\n opQueue=[").append(msg);
+                        else
+                            sb.append(',').append(msg);
+
+                        if (++cnt == 5) {
+                            sb.append(']');
+
+                            break;
+                        }
+                    }
+
+                    sb.append("]");
+                }
+            }
         }
 
         /**
@@ -2037,7 +2157,14 @@ public class GridNioServer<T> {
                     // This exception will be handled in bodyInternal() method.
                     throw e;
                 }
-                catch (Exception e) {
+                catch (Exception | Error e) { // TODO IGNITE-2659.
+                    try {
+                        U.sleep(1000);
+                    }
+                    catch (IgniteInterruptedCheckedException ignore) {
+                        // No-op.
+                    }
+
                     U.warn(log, "Failed to process selector key (will close): " + ses, e);
 
                     close(ses, new GridNioException(e));
@@ -2082,7 +2209,14 @@ public class GridNioServer<T> {
                     // This exception will be handled in bodyInternal() method.
                     throw e;
                 }
-                catch (Exception e) {
+                catch (Exception | Error e) { // TODO IGNITE-2659.
+                    try {
+                        U.sleep(1000);
+                    }
+                    catch (IgniteInterruptedCheckedException ignore) {
+                        // No-op.
+                    }
+
                     if (!closed)
                         U.warn(log, "Failed to process selector key (will close): " + ses, e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2e35c6e..b569aa2 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -46,7 +46,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
@@ -61,12 +60,14 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.ipc.IpcEndpoint;
 import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -238,7 +239,7 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META
  */
 @IgniteSpiMultipleInstancesSupport(true)
 @IgniteSpiConsistencyChecked(optional = false)
-public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message> {
+public class TcpCommunicationSpi extends IgniteSpiAdapter implements CommunicationSpi<Message>, IgniteDiagnosticAware {
     /** IPC error message. */
     public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
         "(switching to TCP, may be slower).";
@@ -1764,18 +1765,70 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param nodeId Target node ID.
+     * @return Future.
+     */
+    public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) {
+        StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl());
+
+        dumpInfo(sb, nodeId);
+
+        GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+        if (nioSrvr != null) {
+            sb.append("NIO sessions statistics:");
+
+            IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>() {
+                @Override public boolean apply(GridNioSession ses) {
+                    ConnectionKey connId = ses.meta(CONN_IDX_META);
+
+                    return connId != null && nodeId.equals(connId.nodeId());
+                }
+            };
+
+            return nioSrvr.dumpNodeStats(sb.toString(), p);
+        }
+        else {
+            sb.append(U.nl()).append("GridNioServer is null.");
+
+            return new GridFinishedFuture<>(sb.toString());
+        }
+    }
+
+    /**
      * Dumps SPI per-connection stats to logs.
      */
-    public void dumpStats() {
+    @Override public void dumpDiagnosticInfo() {
         IgniteLogger log = this.log;
 
         if (log != null) {
-            StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors: ").append(U.nl());
+            StringBuilder sb = new StringBuilder();
 
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
-                GridNioRecoveryDescriptor desc = entry.getValue();
+            dumpInfo(sb, null);
 
-                sb.append("    [key=").append(entry.getKey())
+            U.warn(log, sb.toString());
+        }
+
+        GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+        if (nioSrvr != null)
+            nioSrvr.dumpStats();
+    }
+
+    /**
+     * @param sb Message builder.
+     * @param dstNodeId Target node ID.
+     */
+    private void dumpInfo(StringBuilder sb, UUID dstNodeId) {
+        sb.append("Communication SPI recovery descriptors: ").append(U.nl());
+
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
+
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
+
+            sb.append("    [key=").append(entry.getKey())
                     .append(", msgsSent=").append(desc.sent())
                     .append(", msgsAckedByRmt=").append(desc.acked())
                     .append(", msgsRcvd=").append(desc.received())
@@ -1783,12 +1836,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     .append(", reserveCnt=").append(desc.reserveCount())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
-            }
+        }
+
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
 
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet()) {
-                GridNioRecoveryDescriptor desc = entry.getValue();
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
 
-                sb.append("    [key=").append(entry.getKey())
+            sb.append("    [key=").append(entry.getKey())
                     .append(", msgsSent=").append(desc.sent())
                     .append(", msgsAckedByRmt=").append(desc.acked())
                     .append(", reserveCnt=").append(desc.reserveCount())
@@ -1796,12 +1852,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     .append(", reserved=").append(desc.reserved())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
-            }
+        }
+
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
+            GridNioRecoveryDescriptor desc = entry.getValue();
 
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet()) {
-                GridNioRecoveryDescriptor desc = entry.getValue();
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
 
-                sb.append("    [key=").append(entry.getKey())
+            sb.append("    [key=").append(entry.getKey())
                     .append(", msgsRcvd=").append(desc.received())
                     .append(", lastAcked=").append(desc.lastAcknowledged())
                     .append(", reserveCnt=").append(desc.reserveCount())
@@ -1810,38 +1869,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     .append(", handshakeIdx=").append(desc.handshakeIndex())
                     .append(", descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
-            }
+        }
 
-            sb.append("Communication SPI clients: ").append(U.nl());
+        sb.append("Communication SPI clients: ").append(U.nl());
 
-            for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
-                UUID nodeId = entry.getKey();
-                GridCommunicationClient[] clients0 = entry.getValue();
+        for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet()) {
+            UUID clientNodeId = entry.getKey();
 
-                for (GridCommunicationClient client : clients0) {
-                    if (client != null) {
-                        sb.append("    [node=").append(nodeId)
+            if (dstNodeId != null && !dstNodeId.equals(clientNodeId))
+                continue;
+
+            GridCommunicationClient[] clients0 = entry.getValue();
+
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    sb.append("    [node=").append(clientNodeId)
                             .append(", client=").append(client)
                             .append(']').append(U.nl());
-                    }
                 }
             }
-
-            U.warn(log, sb.toString());
         }
-
-        GridNioServer<Message> nioSrvr = this.nioSrvr;
-
-        if (nioSrvr != null)
-            nioSrvr.dumpStats();
     }
 
-    /** */
-    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
-
-    /** */
-    private final AtomicInteger connIdx = new AtomicInteger();
-
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
         initFailureDetectionTimeout();
@@ -2868,10 +2917,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
         // Try to connect first on bound addresses.
         if (isRmtAddrsExist) {
-            List<InetSocketAddress> addrs0 = new ArrayList<>(U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort));
-
             boolean sameHost = U.sameMacs(getSpiContext().localNode(), node);
 
+            List<InetSocketAddress> addrs0;
+
+            Collection<InetSocketAddress> socketAddrs = U.toSocketAddresses(rmtAddrs0, rmtHostNames0, boundPort);
+
+            if (sameHost)
+                addrs0 = new ArrayList<>(socketAddrs);
+            else {
+                addrs0 = new ArrayList<>(socketAddrs.size());
+
+                for (InetSocketAddress addr0 : socketAddrs) {
+                    if (!addr0.getAddress().isLoopbackAddress())
+                        addrs0.add(addr0);
+                }
+            }
+
             Collections.sort(addrs0, U.inetAddressesComparator(sameHost));
 
             addrs = new LinkedHashSet<>(addrs0);
@@ -3109,7 +3171,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                             "in order to prevent parties from waiting forever in case of network issues " +
                             "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
 
-                    errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+                    errs.addSuppressed(new IgniteCheckedException("Failed to connect to address " +
+                        "[addr=" + addr + ", err=" + e.getMessage() + ']', e));
 
                     // Reconnect for the second time, if connection is not established.
                     if (!failureDetThrReached && connectAttempts < 2 &&
@@ -3139,11 +3202,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             if (getSpiContext().node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) &&
                 X.hasCause(errs, ConnectException.class, SocketTimeoutException.class, HandshakeTimeoutException.class,
                     IgniteSpiOperationTimeoutException.class)) {
-                LT.warn(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
+
+                U.error(log, "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
                     "cluster [" +
-                    "rmtNode=" + node +
-                    ", err=" + errs +
-                    ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']');
+                    "rmtNode=" + node + "]", errs);
 
                 getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" +
                     "rmtNode=" + node +
@@ -4848,7 +4910,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
         /** {@inheritDoc} */
         @Override public void dumpStats() {
-            TcpCommunicationSpi.this.dumpStats();
+            TcpCommunicationSpi.this.dumpDiagnosticInfo();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 61ae27a..fdc3d34 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -2044,3 +2044,32 @@ org.apache.ignite.transactions.TransactionRollbackException
 org.apache.ignite.transactions.TransactionState
 org.apache.ignite.transactions.TransactionTimeoutException
 org.apache.ignite.util.AttributeNodeFilter
+org.apache.ignite.internal.util.GridPartitionStateMap
+org.gridgain.grid.cache.db.GridCacheOffheapManager$RebalanceIteratorAdapter
+org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$FileArchiver$1
+org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$Mode
+org.gridgain.grid.cache.db.wal.FileWriteAheadLogManager$RecordsIterator
+org.gridgain.grid.internal.processors.cache.database.CollectDependantSnapshotSetTask
+org.gridgain.grid.internal.processors.cache.database.CollectDependantSnapshotSetTask$CollectDependantSnapshotSetJob
+org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTask
+org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTask$CollectSnapshotInfoJob
+org.gridgain.grid.internal.processors.cache.database.CollectSnapshotInfoTaskResult
+org.gridgain.grid.internal.processors.cache.database.FullPageIdIterableComparator
+org.gridgain.grid.internal.processors.cache.database.GetOngoingOperationTask
+org.gridgain.grid.internal.processors.cache.database.GetOngoingOperationTask$GetOngoingOperationJob
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$13
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$15
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$18
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$CheckpointEntryType
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$Checkpointer$2
+org.gridgain.grid.internal.processors.cache.database.GridCacheDatabaseSharedManager$FullPageIdComparator
+org.gridgain.grid.internal.processors.cache.database.SnapshotOperationFuture$1
+org.gridgain.grid.internal.processors.cache.database.SnapshotTaskBase
+org.gridgain.grid.internal.processors.cache.database.messages.CheckSnapshotFinishedMessage
+org.gridgain.grid.internal.processors.cache.database.messages.CheckSnapshotMetadataMessage
+org.gridgain.grid.internal.processors.cache.database.messages.ClusterWideSnapshotOperationFinishedMessage
+org.gridgain.grid.internal.processors.cache.database.messages.SnapshotIssueMessage
+org.gridgain.grid.internal.processors.cache.database.messages.SnapshotOperationFinishedMessage
+org.gridgain.grid.internal.processors.cache.database.messages.SnapshotProgressMessage
+org.gridgain.grid.internal.visor.database.VisorCheckpointMetrics
+org.gridgain.grid.internal.visor.database.VisorMemoryPoolMetrics

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java
new file mode 100644
index 0000000..6b77704
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesMultipleConnectionsTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+
+/**
+ *
+ */
+public class IgniteDiagnosticMessagesMultipleConnectionsTest extends IgniteDiagnosticMessagesTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(5);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
new file mode 100644
index 0000000..698cc1f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers;
+
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setCacheMode(CacheMode.REPLICATED);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setName("c1");
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED, "true");
+
+        startGrids(3);
+
+        client = true;
+
+        startGrid(3);
+
+        startGrid(4);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.setProperty(IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED, "false");
+
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDiagnosticMessages() throws Exception {
+        awaitPartitionMapExchange();
+
+        sendDiagnostic();
+
+        for (int i = 0; i < 5; i++) {
+            final IgniteCache cache = ignite(i).cache("c1");
+
+            GridTestUtils.runMultiThreaded(new Runnable() {
+                @Override public void run() {
+                    for (int j = 0; j < 10; j++)
+                        cache.put(ThreadLocalRandom.current().nextInt(), j);
+                }
+            }, 10, "cache-thread");
+        }
+
+        sendDiagnostic();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void sendDiagnostic() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            IgniteKernal node = (IgniteKernal)ignite(i);
+
+            for (int j = 0; j < 5; j++) {
+                if (i != j) {
+                    ClusterNode dstNode = ignite(j).cluster().localNode();
+
+                    final GridFutureAdapter<String> fut = new GridFutureAdapter<>();
+
+                    node.context().cluster().dumpBasicInfo(dstNode.id(), "Test diagnostic",
+                        new IgniteInClosure<IgniteInternalFuture<String>>() {
+                            @Override public void apply(IgniteInternalFuture<String> diagFut) {
+                                try {
+                                    fut.onDone(diagFut.get());
+                                }
+                                catch (Exception e) {
+                                    fut.onDone(e);
+                                }
+                            }
+                        }
+                    );
+
+                    String msg = fut.get();
+
+                    assertTrue("Unexpected message: " + msg,
+                        msg.contains("Test diagnostic") &&
+                        msg.contains("General node info [id=" + dstNode.id() + ", client=" + dstNode.isClient() + ", discoTopVer=AffinityTopologyVersion [topVer=5, minorTopVer=0]") &&
+                        msg.contains("Partitions exchange info [readyVer=AffinityTopologyVersion [topVer=5, minorTopVer="));
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
new file mode 100644
index 0000000..58d4ea2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheClientsConcurrentStartTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class CacheClientsConcurrentStartTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRV_CNT = 4;
+
+    /** */
+    private static final int CLIENTS_CNT = 16;
+
+    /** */
+    private static final int CACHES = 30;
+
+    /** Stopped. */
+    private volatile boolean stopped;
+
+    /** Iteration. */
+    private static final int ITERATIONS = 2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+            @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException {
+                if (msg instanceof TcpDiscoveryCustomEventMessage && msg.verified()) {
+                    try {
+                        System.out.println(Thread.currentThread().getName() + " delay custom message");
+
+                        U.sleep(ThreadLocalRandom.current().nextLong(500) + 100);
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                super.writeToSocket(sock, out, msg, timeout);
+            }
+        };
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setMarshaller(null);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        if (getTestIgniteInstanceIndex(gridName) >= SRV_CNT)
+            cfg.setClientMode(true);
+        else {
+            CacheConfiguration ccfgs[] = new CacheConfiguration[CACHES / 2];
+
+            for (int i = 0; i < ccfgs.length; i++)
+                ccfgs[i] = cacheConfiguration("cache-" + i);
+
+            cfg.setCacheConfiguration(ccfgs);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartNodes() throws Exception {
+        for (int i = 0; i < ITERATIONS; i++) {
+            try {
+                log.info("Iteration: " + (i + 1) + '/' + ITERATIONS);
+
+                doTest();
+            }
+            finally {
+                stopAllGrids(true);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        final AtomicBoolean failed = new AtomicBoolean();
+
+        startGrids(SRV_CNT);
+
+        for (int i = 0; i < SRV_CNT; i++) {
+            ((TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi()).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtPartitionsFullMessage) {
+                        try {
+                            U.sleep(ThreadLocalRandom.current().nextLong(500) + 100);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+
+                    return false;
+                }
+            });
+        }
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < CLIENTS_CNT; i++) {
+            final int idx = i;
+
+            IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+                @Override public void run() {
+                    Random rnd = new Random();
+
+                    try {
+                        Ignite ignite = startGrid(SRV_CNT + idx);
+
+                        assertTrue(ignite.configuration().isClientMode());
+
+                        for (int i = 0; i < CACHES / 2; i++) {
+                            String cacheName = "cache-" + rnd.nextInt(CACHES);
+
+                            IgniteCache<Object, Object> cache = getCache(ignite, cacheName);
+
+                            cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
+
+                            IgniteAtomicSequence seq = ignite.atomicSequence("seq-" + rnd.nextInt(20), 0, true);
+
+                            seq.getAndIncrement();
+                        }
+
+                        while (!stopped) {
+                            IgniteCache<Object, Object> cache = getCache(ignite, "cache-" + rnd.nextInt(CACHES));
+
+                            int val = Math.abs(rnd.nextInt(100));
+
+                            if (val >= 0 && val < 40)
+                                cache.containsKey(ignite.cluster().localNode().id());
+                            else if (val >= 40 && val < 80)
+                                cache.get(ignite.cluster().localNode().id());
+                            else
+                                cache.put(ignite.cluster().localNode().id(), UUID.randomUUID());
+
+                            Thread.sleep(10);
+                        }
+                    }
+                    catch (Exception e) {
+                        log.error("Unexpected error: " + e, e);
+
+                        failed.set(true);
+                    }
+                }
+            }, 1, "client-thread");
+
+            futs.add(fut);
+        }
+
+        Thread.sleep(10_000);
+
+        stopped = true;
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get();
+
+        assertFalse(failed.get());
+    }
+
+    /**
+     * @param grid Grid.
+     * @return Cache.
+     */
+    private IgniteCache getCache(Ignite grid, String cacheName) {
+        return grid.getOrCreateCache(cacheConfiguration(cacheName));
+    }
+
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(cacheName);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setBackups(2);
+        ccfg.setNearConfiguration(null);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        return ccfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
index 1c8bede..c21d706 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCacheNearOnlyMultiNodeFullApiSelfTest.java
@@ -73,9 +73,9 @@ public class GridCacheNearOnlyMultiNodeFullApiSelfTest extends GridCachePartitio
         for (int i = 0; i < gridCount(); i++) {
             if (ignite(i).configuration().isClientMode()) {
                 if (clientHasNearCache())
-                    ignite(i).createNearCache(DEFAULT_CACHE_NAME, new NearCacheConfiguration<>());
+                    ignite(i).getOrCreateCache(defaultCacheConfiguration(), new NearCacheConfiguration<>());
                 else
-                    ignite(i).cache(DEFAULT_CACHE_NAME);
+                    ignite(i).getOrCreateCache(DEFAULT_CACHE_NAME);
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index e938498..0e31a45 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1213,6 +1213,13 @@ public abstract class GridAbstractTest extends TestCase {
     }
 
     /**
+     * @param cfg Config.
+     */
+    protected Ignite startGrid(IgniteConfiguration cfg) throws Exception {
+        return startGrid(cfg.getIgniteInstanceName(), cfg);
+    }
+
+    /**
      * Loads configuration from the given Spring XML file.
      *
      * @param springCfgPath Path to file.

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 2ae9297..42d9534 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -32,6 +32,8 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
 import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
 import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest;
+import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesMultipleConnectionsTest;
+import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest;
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest;
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalancePairedConnectionsTest;
 import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
@@ -305,6 +307,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(IgniteCommunicationBalancePairedConnectionsTest.class);
         suite.addTestSuite(IgniteCommunicationBalanceMultipleConnectionsTest.class);
         suite.addTestSuite(IgniteIoTestMessagesTest.class);
+        suite.addTestSuite(IgniteDiagnosticMessagesTest.class);
+        suite.addTestSuite(IgniteDiagnosticMessagesMultipleConnectionsTest.class);
 
         suite.addTestSuite(IgniteIncompleteCacheObjectSelfTest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/51823847/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index b84daf1..e7ce50e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -324,12 +324,14 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override public void destroy() {
         try {
             if (cctx.affinityNode()) {
-                for (int i = 0; i < segments.length; i++) {
-                    H2Tree tree = segments[i];
+                if (!cctx.kernalContext().cache().context().database().persistenceEnabled()) {
+                    for (int i = 0; i < segments.length; i++) {
+                        H2Tree tree = segments[i];
 
-                    tree.destroy();
+                        tree.destroy();
 
-                    dropMetaPage(tree.getName(), i);
+                        dropMetaPage(tree.getName(), i);
+                    }
                 }
             }
         }


Mime
View raw message