ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/50] [abbrv] ignite git commit: ignite-3220 Use pair of connections (in/out) instead of single connection, possibility to configure mutiple connection pairs.
Date Mon, 10 Oct 2016 12:13:17 GMT
ignite-3220 Use pair of connections (in/out) instead of single connection, possibility to configure
mutiple connection pairs.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6023539d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6023539d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6023539d

Branch: refs/heads/ignite-gg-8-io2-selNow
Commit: 6023539d5f078dff8753e44c5ac151ecf9a9c69c
Parents: ffbc218
Author: sboikov <sboikov@gridgain.com>
Authored: Thu Sep 29 12:07:02 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Thu Sep 29 12:07:02 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../apache/ignite/internal/IgniteKernal.java    |  10 +
 .../client/router/impl/GridTcpRouterImpl.java   |   1 +
 .../managers/communication/GridIoManager.java   | 104 ++
 .../communication/GridIoMessageFactory.java     |  13 +-
 .../communication/IgniteIoTestMessage.java      | 216 +++++
 .../internal/processors/odbc/OdbcProcessor.java |   1 +
 .../rest/protocols/tcp/GridTcpRestProtocol.java |   1 +
 .../nio/GridAbstractCommunicationClient.java    |  12 +-
 .../util/nio/GridCommunicationClient.java       |   5 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  31 +-
 .../ignite/internal/util/nio/GridNioServer.java |  99 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  24 +-
 .../util/nio/GridShmemCommunicationClient.java  |   7 +-
 .../util/nio/GridTcpNioCommunicationClient.java |   8 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 968 +++++++++++++------
 .../ignite/stream/socket/SocketStreamer.java    |   1 +
 .../IgniteSlowClientDetectionSelfTest.java      |   1 +
 .../IgniteCacheMessageRecoveryAbstractTest.java |  24 +-
 .../IgniteCacheMessageWriteTimeoutTest.java     |  17 +-
 .../spi/GridTcpSpiForwardingSelfTest.java       |  18 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |  28 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |  57 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |   5 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |  12 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   1 +
 ...tionSpiRecoveryFailureDetectionSelfTest.java |   1 +
 ...GridTcpCommunicationSpiRecoverySelfTest.java |  30 +-
 ...CommunicationRecoveryAckClosureSelfTest.java |   1 +
 .../HadoopExternalCommunication.java            |   1 +
 .../yardstick/cache/IgniteIoTestBenchmark.java  |  73 ++
 31 files changed, 1395 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 638bc45..cbd7dd0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -97,7 +97,10 @@ public enum GridTopic {
     TOPIC_TX,
 
     /** */
-    TOPIC_BACKUP;
+    TOPIC_BACKUP,
+
+    /** */
+    TOPIC_IO_TEST;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c349667..87c0074 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3431,6 +3431,16 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         }
     }
 
+    /**
+     * @param node Node.
+     * @param payload Message payload.
+     * @param processFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread)
{
+        return ctx.io().sendIoTest(node, payload, processFromNioThread);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
index 06a4929..3566830 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
@@ -258,6 +258,7 @@ public class GridTcpRouterImpl implements GridTcpRouter, GridTcpRouterMBean,
Lif
                 .logger(log)
                 .selectorCount(Runtime.getRuntime().availableProcessors())
                 .gridName(gridName)
+                .serverName("router")
                 .tcpNoDelay(tcpNoDelay)
                 .directBuffer(false)
                 .byteOrder(ByteOrder.nativeOrder())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3fdda30..d6a2835 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -35,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -57,6 +61,8 @@ import org.apache.ignite.internal.processors.platform.message.PlatformMessageFil
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -87,6 +93,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
+import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
@@ -207,6 +214,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Stopping flag. */
     private boolean stopping;
 
+    /** */
+    private final AtomicReference<ConcurrentHashMap<Long, GridFutureAdapter>>
ioTestMap = new AtomicReference<>();
+
+    /** */
+    private final AtomicLong ioTestId = new AtomicLong();
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -348,6 +361,87 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             log.debug(startInfo());
 
         registerIoPoolExtensions();
+
+        addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node == null)
+                    return;
+
+                IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
+
+                if (msg0.request()) {
+                    IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null);
+
+                    res.flags(msg0.flags());
+
+                    try {
+                        send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send IO test response [msg=" + msg0 + "]",
e);
+                    }
+                }
+                else {
+                    GridFutureAdapter fut = ioTestMap().remove(msg0.id());
+
+                    if (fut == null) {
+                        U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
+
+                        return;
+                    }
+
+                    fut.onDone();
+                }
+            }
+        });
+    }
+
+    /**
+     * @param node Node.
+     * @param payload Payload.
+     * @param processFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread)
{
+        if (ctx.localNodeId().equals(node.id()))
+            throw new IllegalArgumentException();
+
+        long id = ioTestId.getAndIncrement();
+
+        GridFutureAdapter fut = new GridFutureAdapter();
+
+        ioTestMap().put(id, fut);
+
+        try {
+            IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+            msg.processFromNioThread(processFromNioThread);
+
+            send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            ioTestMap().remove(id);
+
+            return new GridFinishedFuture(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @return IO test futures map.
+     */
+    private ConcurrentHashMap<Long, GridFutureAdapter> ioTestMap() {
+        ConcurrentHashMap<Long, GridFutureAdapter> map = ioTestMap.get();
+
+        if (map == null) {
+            if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
+                map = ioTestMap.get();
+        }
+
+        return map;
     }
 
     /**
@@ -836,6 +930,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         };
 
+        if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
+            IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
+
+            if (msg0.processFromNioThread()) {
+                c.run();
+
+                return;
+            }
+        }
+
         try {
             pool(plc).execute(c);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ff8b201..f5c46a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -756,7 +756,18 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124] - this
+            case 125:
+                msg = new TcpCommunicationSpi.HandshakeMessage2();
+
+                break;
+
+            // [-3..119] [124-125] - this
+            case 126:
+                msg = new IgniteIoTestMessage();
+
+                break;
+
+            // [-3..119] [124-126] - this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
new file mode 100644
index 0000000..08bd110
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class IgniteIoTestMessage implements Message {
+    /** */
+    private static byte FLAG_PROCESS_FROM_NIO = 1;
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long id;
+
+    /** */
+    private byte flags;
+
+    /** */
+    private boolean req;
+
+    /** */
+    private byte payload[];
+
+    /**
+     *
+     */
+    public IgniteIoTestMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param id
+     * @param req
+     * @param payload
+     */
+    public IgniteIoTestMessage(long id, boolean req, byte[] payload) {
+        this.id = id;
+        this.req = req;
+        this.payload = payload;
+    }
+
+    public boolean processFromNioThread() {
+        return isFlag(FLAG_PROCESS_FROM_NIO);
+    }
+
+    public void processFromNioThread(boolean processFromNioThread) {
+        setFlag(processFromNioThread, FLAG_PROCESS_FROM_NIO);
+    }
+
+    public void flags(byte flags) {
+        this.flags = flags;
+    }
+
+    public byte flags() {
+        return flags;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    public boolean request() {
+        return req;
+    }
+
+    public long id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("id", id))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeByteArray("payload", payload))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeBoolean("req", req))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                id = reader.readLong("id");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                payload = reader.readByteArray("payload");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                req = reader.readBoolean("req");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(IgniteIoTestMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 126;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteIoTestMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
index ead8901..2faab3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
@@ -110,6 +110,7 @@ public class OdbcProcessor extends GridProcessorAdapter {
                             .logger(log)
                             .selectorCount(DFLT_SELECTOR_CNT)
                             .gridName(ctx.gridName())
+                            .serverName("odbc")
                             .tcpNoDelay(DFLT_TCP_NODELAY)
                             .directBuffer(DFLT_TCP_DIRECT_BUF)
                             .byteOrder(ByteOrder.nativeOrder())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index 6338fcc..2a002a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -257,6 +257,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
                 .logger(log)
                 .selectorCount(cfg.getSelectorCount())
                 .gridName(ctx.gridName())
+                .serverName("tcp-rest")
                 .tcpNoDelay(cfg.isNoDelay())
                 .directBuffer(cfg.isDirectBuffer())
                 .byteOrder(ByteOrder.nativeOrder())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 9b014ec..f2ab932 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -35,14 +35,24 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati
     /** Metrics listener. */
     protected final GridNioMetricsListener metricsLsnr;
 
+    /** */
+    private final int connIdx;
+
     /**
+     * @param connIdx Connection index.
      * @param metricsLsnr Metrics listener.
      */
-    protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener metricsLsnr)
{
+    protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener
metricsLsnr) {
+        this.connIdx = connIdx;
         this.metricsLsnr = metricsLsnr;
     }
 
     /** {@inheritDoc} */
+    @Override public int connectionIndex() {
+        return connIdx;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean close() {
         return reserves.compareAndSet(0, -1);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 0de54e9..312a20e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -107,4 +107,9 @@ public interface GridCommunicationClient {
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();
+
+    /**
+     * @return Connection index.
+     */
+    public int connectionIndex();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index bf8e26a..4598eef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,7 +31,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Recovery information for single node.
  */
-@Deprecated // To be splitted into in/out parts when do need maintain backward compatibility.
+@Deprecated // To be splitted into separate classes for in/out data when do not need maintain
backward compatibility.
 public class GridNioRecoveryDescriptor {
     /** Number of acknowledged messages. */
     private long acked;
@@ -285,7 +285,7 @@ public class GridNioRecoveryDescriptor {
     /**
      *
      */
-    public void connected() {
+    public void onConnected() {
         synchronized (this) {
             assert reserved : this;
             assert !connected : this;
@@ -307,6 +307,33 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
+     * @return Connected flag.
+     */
+    public boolean connected() {
+        synchronized (this) {
+            return connected;
+        }
+    }
+
+    /**
+     * @return Reserved flag.
+     */
+    public boolean reserved() {
+        synchronized (this) {
+            return reserved;
+        }
+    }
+
+    /**
+     * @return Current handshake index.
+     */
+    public Long handshakeIndex() {
+        synchronized (this) {
+            return handshakeReq != null ? handshakeReq.get1() : null;
+        }
+    }
+
+    /**
      *
      */
     public void release() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 2ea88fc..ce1f59c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -45,7 +45,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -106,10 +105,6 @@ public class GridNioServer<T> {
     /** SSL write buf limit. */
     private static final int WRITE_BUF_LIMIT = GridNioSessionMetaKey.nextUniqueKey();
 
-    // TODO
-    private static final int WRITE_BUF_SIZE = IgniteSystemProperties.getInteger("IGNITE_WRITE_BUF_SIZE",
65536);
-    private static final int READ_BUF_SIZE = IgniteSystemProperties.getInteger("IGNITE_READ_BUF_SIZE",
65536);
-
     /** */
     private static final boolean DISABLE_KEYSET_OPTIMIZATION =
         IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS);
@@ -151,13 +146,13 @@ public class GridNioServer<T> {
     /** Flag indicating if this server should use direct buffers. */
     private final boolean directBuf;
 
-    /** Index to select which thread will serve next socket channel. Using round-robin balancing.
*/
+    /** Index to select which thread will serve next incoming socket channel. Using round-robin
balancing. */
     @GridToStringExclude
-    private final AtomicInteger readBalanceIdx = new AtomicInteger();
+    private int readBalanceIdx;
 
-    // TODO
+    /** Index to select which thread will serve next out socket channel. Using round-robin
balancing. */
     @GridToStringExclude
-    private final AtomicInteger writeBalanceIdx = new AtomicInteger(1);
+    private int writeBalanceIdx = 1;
 
     /** Tcp no delay flag. */
     private final boolean tcpNoDelay;
@@ -220,6 +215,7 @@ public class GridNioServer<T> {
      * @param log Log.
      * @param selectorCnt Count of selectors and selecting threads.
      * @param gridName Grid name.
+     * @param srvName Logical server name for threads identification.
      * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
      * @param directBuf Direct buffer flag.
      * @param order Byte order.
@@ -242,6 +238,7 @@ public class GridNioServer<T> {
         IgniteLogger log,
         int selectorCnt,
         @Nullable String gridName,
+        @Nullable String srvName,
         boolean tcpNoDelay,
         boolean directBuf,
         ByteOrder order,
@@ -309,9 +306,16 @@ public class GridNioServer<T> {
         clientThreads = new IgniteThread[selectorCnt];
 
         for (int i = 0; i < selectorCnt; i++) {
+            String threadName;
+
+            if (srvName == null)
+                threadName = "grid-nio-worker-" + i;
+            else
+                threadName = "grid-nio-worker-" + srvName + "-" + i;
+
             AbstractNioClientWorker worker = directMode ?
-                new DirectNioClientWorker(i, gridName, "grid-nio-worker-" + i, log) :
-                new ByteBufferNioClientWorker(i, gridName, "grid-nio-worker-" + i, log);
+                new DirectNioClientWorker(i, gridName, threadName, log) :
+                new ByteBufferNioClientWorker(i, gridName, threadName, log);
 
             clientWorkers.add(worker);
 
@@ -460,9 +464,8 @@ public class GridNioServer<T> {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();
         }
-        else if (msgCnt == 1)
-            // Change from 0 to 1 means that worker thread should be waken up.
-            clientWorkers.get(ses.selectorIndex()).offer(fut);
+        else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true))
+                clientWorkers.get(ses.selectorIndex()).offer(fut);
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -692,12 +695,35 @@ public class GridNioServer<T> {
      * @param req Request to balance.
      */
     private synchronized void offerBalanced(NioOperationFuture req) {
-        assert req.operation() == NioOperation.REGISTER;
-        assert req.socketChannel() != null;
+        assert req.operation() == NioOperation.REGISTER : req;
+        assert req.socketChannel() != null : req;
+
+        int workers = clientWorkers.size();
+
+        int balanceIdx;
+
+        if (workers > 1) {
+            if (req.accepted()) {
+                balanceIdx = readBalanceIdx;
 
-        int balanceIdx = req.accepted() ? readBalanceIdx.getAndAdd(2) : writeBalanceIdx.getAndAdd(2);
+                readBalanceIdx += 2;
+
+                if (readBalanceIdx >= workers)
+                    readBalanceIdx = 0;
+            }
+            else {
+                balanceIdx = writeBalanceIdx;
+
+                writeBalanceIdx += 2;
+
+                if (writeBalanceIdx >= workers)
+                    writeBalanceIdx = 1;
+            }
+        }
+        else
+            balanceIdx = 0;
 
-        clientWorkers.get(balanceIdx & (clientWorkers.size() - 1)).offer(req);
+        clientWorkers.get(balanceIdx).offer(req);
     }
 
     /** {@inheritDoc} */
@@ -1181,7 +1207,16 @@ public class GridNioServer<T> {
                 req = (NioOperationFuture<?>)ses.pollFuture();
 
                 if (req == null && buf.position() == 0) {
-                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                    if (ses.procWrite.get()) {
+                        boolean set = ses.procWrite.compareAndSet(true, false);
+
+                        assert set;
+
+                        if (ses.writeQueue().isEmpty())
+                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
+                        else
+                            ses.procWrite.set(true);
+                    }
 
                     return;
                 }
@@ -1477,7 +1512,8 @@ public class GridNioServer<T> {
                                     MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
 
                                     sb.append("    Connection info [")
-                                        .append("rmtAddr=").append(ses.remoteAddress())
+                                        .append("in=").append(ses.accepted())
+                                        .append(", rmtAddr=").append(ses.remoteAddress())
                                         .append(", locAddr=").append(ses.localAddress());
 
                                     GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
@@ -1494,6 +1530,7 @@ public class GridNioServer<T> {
 
                                     if (inDesc != null) {
                                         sb.append(", msgsRcvd=").append(inDesc.received())
+                                            .append(", lastAcked=").append(inDesc.lastAcknowledged())
                                             .append(", descIdHash=").append(System.identityHashCode(inDesc));
                                     }
                                     else
@@ -1729,10 +1766,10 @@ public class GridNioServer<T> {
                 ByteBuffer readBuf = null;
 
                 if (directMode) {
-                    writeBuf = directBuf ? ByteBuffer.allocateDirect(WRITE_BUF_SIZE) :
-                        ByteBuffer.allocate(WRITE_BUF_SIZE);
-                    readBuf = directBuf ? ByteBuffer.allocateDirect(READ_BUF_SIZE) :
-                        ByteBuffer.allocate(READ_BUF_SIZE);
+                    writeBuf = directBuf ? ByteBuffer.allocateDirect(sock.getSendBufferSize())
:
+                        ByteBuffer.allocate(sock.getSendBufferSize());
+                    readBuf = directBuf ? ByteBuffer.allocateDirect(sock.getReceiveBufferSize())
:
+                        ByteBuffer.allocate(sock.getReceiveBufferSize());
 
                     writeBuf.order(order);
                     readBuf.order(order);
@@ -2459,6 +2496,9 @@ public class GridNioServer<T> {
         /** Message queue size listener. */
         private IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr;
 
+        /** Name for threads identification. */
+        private String srvName;
+
         /**
          * Finishes building the instance.
          *
@@ -2472,6 +2512,7 @@ public class GridNioServer<T> {
                 log,
                 selectorCnt,
                 gridName,
+                srvName,
                 tcpNoDelay,
                 directBuf,
                 byteOrder,
@@ -2549,6 +2590,16 @@ public class GridNioServer<T> {
         }
 
         /**
+         * @param srvName Logical server name for threads identification.
+         * @return This for chaining.
+         */
+        public Builder<T> serverName(@Nullable String srvName) {
+            this.srvName = srvName;
+
+            return this;
+        }
+
+        /**
          * @param tcpNoDelay If TCP_NODELAY option should be set to accepted sockets.
          * @return This for chaining.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 456b1a3..6b00281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.util.Collection;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -46,9 +46,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Worker index for server */
     private final int selectorIdx;
 
-    /** Size counter. */
-    private final AtomicInteger queueSize = new AtomicInteger();
-
     /** Semaphore. */
     @GridToStringExclude
     private final Semaphore sem;
@@ -68,6 +65,9 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    final AtomicBoolean procWrite = new AtomicBoolean();
+
     /**
      * Creates session instance.
      *
@@ -106,7 +106,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         this.selectorIdx = selectorIdx;
 
-        sem = null;//sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null;
+        sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null;
 
         if (writeBuf != null) {
             writeBuf.clear();
@@ -173,7 +173,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert res : "Future was not added to queue";
 
-        return queueSize.incrementAndGet();
+        return queue.sizex();
     }
 
     /**
@@ -198,7 +198,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert res : "Future was not added to queue";
 
-        return queueSize.incrementAndGet();
+        return queue.sizex();
     }
 
     /**
@@ -210,10 +210,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         boolean add = queue.addAll(futs);
 
         assert add;
-
-        boolean set = queueSize.compareAndSet(0, futs.size());
-
-        assert set;
     }
 
     /**
@@ -223,8 +219,6 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         GridNioFuture<?> last = queue.poll();
 
         if (last != null) {
-            queueSize.decrementAndGet();
-
             if (sem != null && !last.messageThread())
                 sem.release();
 
@@ -264,7 +258,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @return Number of write requests.
      */
     int writeQueueSize() {
-        return queueSize.get();
+        return queue.sizex();
     }
 
     /**
@@ -303,7 +297,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
         if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
             outRecovery = (GridNioRecoveryDescriptor)val;
 
-            outRecovery.connected();
+            outRecovery.onConnected();
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index ebe86fb..d941bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     private final MessageFormatter formatter;
 
     /**
+     * @param connIdx Connection index.
      * @param metricsLsnr Metrics listener.
      * @param port Shared memory IPC server port.
      * @param connTimeout Connection timeout.
@@ -55,14 +56,16 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
      * @param formatter Message formatter.
      * @throws IgniteCheckedException If failed.
      */
-    public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr,
+    public GridShmemCommunicationClient(
+        int connIdx,
+        GridNioMetricsListener metricsLsnr,
         int port,
         long connTimeout,
         IgniteLogger log,
         MessageFormatter formatter)
         throws IgniteCheckedException
     {
-        super(metricsLsnr);
+        super(connIdx, metricsLsnr);
 
         assert metricsLsnr != null;
         assert port > 0 && port < 0xffff;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6023539d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 5fe521d..fcb40c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -45,11 +45,15 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     private final IgniteLogger log;
 
     /**
+     * @param connIdx Connection index.
      * @param ses Session.
      * @param log Logger.
      */
-    public GridTcpNioCommunicationClient(GridNioSession ses, IgniteLogger log) {
-        super(null);
+    public GridTcpNioCommunicationClient(
+        int connIdx,
+        GridNioSession ses,
+        IgniteLogger log) {
+        super(connIdx, null);
 
         assert ses != null;
         assert log != null;


Mime
View raw message