ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [3/5] ignite git commit: IGNITE-3108: .NET: Added communication SPI stubs. This closes #702.
Date Thu, 12 May 2016 04:20:34 GMT
IGNITE-3108: .NET: Added communication SPI stubs. This closes #702.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/003fe5e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/003fe5e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/003fe5e3

Branch: refs/heads/master
Commit: 003fe5e35aaf85d8ed5696f36eff5618f916e166
Parents: b50eb65
Author: Pavel Tupitsyn <ptupitsyn@gridgain.com>
Authored: Wed May 11 18:26:08 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed May 11 18:26:08 2016 +0300

----------------------------------------------------------------------
 .../utils/PlatformConfigurationUtils.java       |  54 ++++
 .../IgniteConfigurationSerializerTest.cs        |  27 ++
 .../IgniteConfigurationTest.cs                  |  44 ++-
 .../Apache.Ignite.Core.csproj                   |   2 +
 .../Communication/ICommunicationSpi.cs          |  37 +++
 .../Communication/Tcp/TcpCommunicationSpi.cs    | 283 +++++++++++++++++++
 .../Apache.Ignite.Core/IgniteConfiguration.cs   |  28 ++
 .../IgniteConfigurationSection.xsd              |  22 ++
 8 files changed, 496 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 5ee19c1..30e45ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -42,6 +42,9 @@ import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration;
 import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration;
 import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative;
 import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration;
+import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean;
 import org.apache.ignite.spi.discovery.DiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -351,6 +354,30 @@ public class PlatformConfigurationUtils {
         readDiscoveryConfiguration(in, cfg);
 
         if (in.readBoolean()) {
+            TcpCommunicationSpi comm = new TcpCommunicationSpi();
+
+            comm.setAckSendThreshold(in.readInt());
+            comm.setConnectTimeout(in.readLong());
+            comm.setDirectBuffer(in.readBoolean());
+            comm.setDirectSendBuffer(in.readBoolean());
+            comm.setIdleConnectionTimeout(in.readLong());
+            comm.setLocalAddress(in.readString());
+            comm.setLocalPort(in.readInt());
+            comm.setLocalPortRange(in.readInt());
+            comm.setMaxConnectTimeout(in.readLong());
+            comm.setMessageQueueLimit(in.readInt());
+            comm.setReconnectCount(in.readInt());
+            comm.setSelectorsCount(in.readInt());
+            comm.setSlowClientQueueLimit(in.readInt());
+            comm.setSocketReceiveBuffer(in.readInt());
+            comm.setSocketSendBuffer(in.readInt());
+            comm.setTcpNoDelay(in.readBoolean());
+            comm.setUnacknowledgedMessagesBufferSize(in.readInt());
+
+            cfg.setCommunicationSpi(comm);
+        }
+
+        if (in.readBoolean()) {
             if (cfg.getBinaryConfiguration() == null)
                 cfg.setBinaryConfiguration(new BinaryConfiguration());
 
@@ -687,6 +714,33 @@ public class PlatformConfigurationUtils {
 
         writeDiscoveryConfiguration(w, cfg.getDiscoverySpi());
 
+        CommunicationSpi comm = cfg.getCommunicationSpi();
+
+        if (comm instanceof TcpCommunicationSpi) {
+            w.writeBoolean(true);
+            TcpCommunicationSpiMBean tcp = (TcpCommunicationSpiMBean) comm;
+
+            w.writeInt(tcp.getAckSendThreshold());
+            w.writeLong(tcp.getConnectTimeout());
+            w.writeBoolean(tcp.isDirectBuffer());
+            w.writeBoolean(tcp.isDirectSendBuffer());
+            w.writeLong(tcp.getIdleConnectionTimeout());
+            w.writeString(tcp.getLocalAddress());
+            w.writeInt(tcp.getLocalPort());
+            w.writeInt(tcp.getLocalPortRange());
+            w.writeLong(tcp.getMaxConnectTimeout());
+            w.writeInt(tcp.getMessageQueueLimit());
+            w.writeInt(tcp.getReconnectCount());
+            w.writeInt(tcp.getSelectorsCount());
+            w.writeInt(tcp.getSlowClientQueueLimit());
+            w.writeInt(tcp.getSocketReceiveBuffer());
+            w.writeInt(tcp.getSocketSendBuffer());
+            w.writeBoolean(tcp.isTcpNoDelay());
+            w.writeInt(tcp.getUnacknowledgedMessagesBufferSize());
+        }
+        else
+            w.writeBoolean(false);
+
         BinaryConfiguration bc = cfg.getBinaryConfiguration();
         w.writeBoolean(bc != null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index 36bda2e..e3507b8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -33,6 +33,7 @@ namespace Apache.Ignite.Core.Tests
     using Apache.Ignite.Core.Cache.Configuration;
     using Apache.Ignite.Core.Cache.Store;
     using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Communication.Tcp;
     using Apache.Ignite.Core.DataStructures.Configuration;
     using Apache.Ignite.Core.Discovery.Tcp;
     using Apache.Ignite.Core.Discovery.Tcp.Multicast;
@@ -65,6 +66,7 @@ namespace Apache.Ignite.Core.Tests
                             <discoverySpi type='TcpDiscoverySpi' joinTimeout='0:1:0'>
                                 <ipFinder type='TcpDiscoveryMulticastIpFinder' addressRequestAttempts='7'
/>
                             </discoverySpi>
+                            <communicationSpi type='TcpCommunicationSpi' ackSendThreshold='33'
idleConnectionTimeout='0:1:2' />
                             <jvmOptions><string>-Xms1g</string><string>-Xmx4g</string></jvmOptions>
                             <lifecycleBeans>
                                 <iLifecycleBean type='Apache.Ignite.Core.Tests.IgniteConfigurationSerializerTest+LifecycleBean,
Apache.Ignite.Core.Tests' foo='15' />
@@ -154,6 +156,11 @@ namespace Apache.Ignite.Core.Tests
             Assert.AreEqual(new TimeSpan(0,1,2), tx.DefaultTimeout);
             Assert.AreEqual(15, tx.PessimisticTransactionLogSize);
             Assert.AreEqual(TimeSpan.FromSeconds(33), tx.PessimisticTransactionLogLinger);
+
+            var comm = cfg.CommunicationSpi as TcpCommunicationSpi;
+            Assert.IsNotNull(comm);
+            Assert.AreEqual(33, comm.AckSendThreshold);
+            Assert.AreEqual(new TimeSpan(0, 1, 2), comm.IdleConnectionTimeout);
         }
 
         /// <summary>
@@ -445,6 +452,26 @@ namespace Apache.Ignite.Core.Tests
                     DefaultTimeout = TimeSpan.FromDays(2),
                     DefaultTransactionConcurrency = TransactionConcurrency.Optimistic,
                     PessimisticTransactionLogLinger = TimeSpan.FromHours(3)
+                },
+                CommunicationSpi = new TcpCommunicationSpi
+                {
+                    LocalPort = 47501,
+                    MaxConnectTimeout = TimeSpan.FromSeconds(34),
+                    MessageQueueLimit = 15,
+                    ConnectTimeout = TimeSpan.FromSeconds(17),
+                    IdleConnectionTimeout = TimeSpan.FromSeconds(19),
+                    SelectorsCount = 8,
+                    ReconnectCount = 33,
+                    SocketReceiveBufferSize = 512,
+                    AckSendThreshold = 99,
+                    DirectBuffer = false,
+                    DirectSendBuffer = true,
+                    LocalPortRange = 45,
+                    LocalAddress = "127.0.0.1",
+                    TcpNoDelay = false,
+                    SlowClientQueueLimit = 98,
+                    SocketSendBufferSize = 2045,
+                    UnacknowledgedMessagesBufferSize = 3450
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index 45b9a05..3e5e877 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -25,6 +25,7 @@ namespace Apache.Ignite.Core.Tests
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache.Configuration;
     using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Communication.Tcp;
     using Apache.Ignite.Core.DataStructures.Configuration;
     using Apache.Ignite.Core.Discovery.Tcp;
     using Apache.Ignite.Core.Discovery.Tcp.Multicast;
@@ -67,6 +68,7 @@ namespace Apache.Ignite.Core.Tests
             CheckDefaultValueAttributes(new TcpDiscoverySpi());
             CheckDefaultValueAttributes(new CacheConfiguration());
             CheckDefaultValueAttributes(new TcpDiscoveryMulticastIpFinder());
+            CheckDefaultValueAttributes(new TcpCommunicationSpi());
         }
 
         /// <summary>
@@ -127,6 +129,26 @@ namespace Apache.Ignite.Core.Tests
                 Assert.AreEqual(tx.DefaultTransactionIsolation, resTx.DefaultTransactionIsolation);
                 Assert.AreEqual(tx.PessimisticTransactionLogLinger, resTx.PessimisticTransactionLogLinger);
                 Assert.AreEqual(tx.PessimisticTransactionLogSize, resTx.PessimisticTransactionLogSize);
+
+                var com = (TcpCommunicationSpi) cfg.CommunicationSpi;
+                var resCom = (TcpCommunicationSpi) resCfg.CommunicationSpi;
+                Assert.AreEqual(com.AckSendThreshold, resCom.AckSendThreshold);
+                Assert.AreEqual(com.ConnectTimeout, resCom.ConnectTimeout);
+                Assert.AreEqual(com.DirectBuffer, resCom.DirectBuffer);
+                Assert.AreEqual(com.DirectSendBuffer, resCom.DirectSendBuffer);
+                Assert.AreEqual(com.IdleConnectionTimeout, resCom.IdleConnectionTimeout);
+                Assert.AreEqual(com.LocalAddress, resCom.LocalAddress);
+                Assert.AreEqual(com.LocalPort, resCom.LocalPort);
+                Assert.AreEqual(com.LocalPortRange, resCom.LocalPortRange);
+                Assert.AreEqual(com.MaxConnectTimeout, resCom.MaxConnectTimeout);
+                Assert.AreEqual(com.MessageQueueLimit, resCom.MessageQueueLimit);
+                Assert.AreEqual(com.ReconnectCount, resCom.ReconnectCount);
+                Assert.AreEqual(com.SelectorsCount, resCom.SelectorsCount);
+                Assert.AreEqual(com.SlowClientQueueLimit, resCom.SlowClientQueueLimit);
+                Assert.AreEqual(com.SocketReceiveBufferSize, resCom.SocketReceiveBufferSize);
+                Assert.AreEqual(com.SocketSendBufferSize, resCom.SocketSendBufferSize);
+                Assert.AreEqual(com.TcpNoDelay, resCom.TcpNoDelay);
+                Assert.AreEqual(com.UnacknowledgedMessagesBufferSize, resCom.UnacknowledgedMessagesBufferSize);
             }
         }
 
@@ -323,7 +345,7 @@ namespace Apache.Ignite.Core.Tests
         {
             var props = obj.GetType().GetProperties();
 
-            foreach (var prop in props)
+            foreach (var prop in props.Where(p => p.Name != "SelectorsCount"))
             {
                 var attr = prop.GetCustomAttributes(true).OfType<DefaultValueAttribute>().FirstOrDefault();
                 var propValue = prop.GetValue(obj, null);
@@ -385,6 +407,26 @@ namespace Apache.Ignite.Core.Tests
                     DefaultTransactionIsolation = TransactionIsolation.Serializable,
                     PessimisticTransactionLogLinger = TimeSpan.FromHours(1),
                     PessimisticTransactionLogSize = 240
+                },
+                CommunicationSpi = new TcpCommunicationSpi
+                {
+                    LocalPort = 47501,
+                    MaxConnectTimeout = TimeSpan.FromSeconds(34),
+                    MessageQueueLimit = 15,
+                    ConnectTimeout = TimeSpan.FromSeconds(17),
+                    IdleConnectionTimeout = TimeSpan.FromSeconds(19),
+                    SelectorsCount = 8,
+                    ReconnectCount = 33,
+                    SocketReceiveBufferSize = 512,
+                    AckSendThreshold = 99,
+                    DirectBuffer = false,
+                    DirectSendBuffer = true,
+                    LocalPortRange = 45,
+                    LocalAddress = "127.0.0.1",
+                    TcpNoDelay = false,
+                    SlowClientQueueLimit = 98,
+                    SocketSendBufferSize = 2045,
+                    UnacknowledgedMessagesBufferSize = 3450
                 }
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 409a7cf..8943030 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -93,6 +93,8 @@
     <Compile Include="Cache\CachePartialUpdateException.cs" />
     <Compile Include="Cache\CachePeekMode.cs" />
     <Compile Include="Cache\Configuration\NearCacheConfiguration.cs" />
+    <Compile Include="Communication\ICommunicationSpi.cs" />
+    <Compile Include="Communication\Tcp\TcpCommunicationSpi.cs" />
     <Compile Include="DataStructures\Configuration\AtomicConfiguration.cs" />
     <Compile Include="Cache\Configuration\QueryAlias.cs" />
     <Compile Include="Cache\Configuration\QueryTextFieldAttribute.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs
new file mode 100644
index 0000000..bd3f51d
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/ICommunicationSpi.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Communication
+{
+    using System.Diagnostics.CodeAnalysis;
+    using Apache.Ignite.Core.Communication.Tcp;
+
+    /// <summary>
+    /// Communication SPI is responsible for data exchange between nodes. 
+    /// <para />
+    /// Communication SPI is one of the most important SPIs in Ignite. It is used
+    /// heavily throughout the system and provides means for all data exchanges
+    /// between nodes, such as internal implementation details and user driven messages.
+    /// <para />
+    /// Only predefined implementation is supported now: <see cref="TcpCommunicationSpi"/>.
+    /// </summary>
+    [SuppressMessage("Microsoft.Design", "CA1040:AvoidEmptyInterfaces")]
+    public interface ICommunicationSpi
+    {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs
new file mode 100644
index 0000000..afd3b57
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Communication/Tcp/TcpCommunicationSpi.cs
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Communication.Tcp
+{
+    using System;
+    using System.ComponentModel;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Impl.Binary;
+
+    /// <summary>
+    /// <see cref="TcpCommunicationSpi"/> is default communication SPI which uses
+    /// TCP/IP protocol and Java NIO to communicate with other nodes.
+    /// <para />
+    /// At startup, this SPI tries to start listening to local port specified by
+    /// <see cref="LocalPort"/> property. If local port is occupied, then SPI will
+    /// automatically increment the port number until it can successfully bind for
+    /// listening. <see cref="LocalPortRange"/> configuration parameter controls
+    /// maximum number of ports that SPI will try before it fails. Port range comes
+    /// very handy when starting multiple grid nodes on the same machine or even
+    /// in the same VM. In this case all nodes can be brought up without a single
+    /// change in configuration.
+    /// </summary>
+    public class TcpCommunicationSpi : ICommunicationSpi
+    {
+        /// <summary> Default value of <see cref="AckSendThreshold"/> property.
</summary>
+        public const int DefaultAckSendThreshold = 16;
+
+        /// <summary> Default value of <see cref="ConnectTimeout"/> property.
</summary>
+        public static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(5);
+
+        /// <summary> Default value of <see cref="DirectBuffer"/> property. </summary>
+        public const bool DefaultDirectBuffer = true;
+
+        /// <summary> Default value of <see cref="DirectSendBuffer"/> property.
</summary>
+        public const bool DefaultDirectSendBuffer = false;
+
+        /// <summary> Default value of <see cref="IdleConnectionTimeout"/> property.
</summary>
+        public static readonly TimeSpan DefaultIdleConnectionTimeout = TimeSpan.FromSeconds(30);
+
+        /// <summary> Default value of <see cref="LocalPort"/> property. </summary>
+        public const int DefaultLocalPort = 47100;
+
+        /// <summary> Default value of <see cref="LocalPortRange"/> property.
</summary>
+        public const int DefaultLocalPortRange = 100;
+
+        /// <summary> Default value of <see cref="MaxConnectTimeout"/> property.
</summary>
+        public static readonly TimeSpan DefaultMaxConnectTimeout = TimeSpan.FromMinutes(10);
+
+        /// <summary> Default value of <see cref="MessageQueueLimit"/> property.
</summary>
+        public const int DefaultMessageQueueLimit = 1024;
+
+        /// <summary> Default value of <see cref="ReconnectCount"/> property.
</summary>
+        public const int DefaultReconnectCount = 10;
+
+        /// <summary> Default value of <see cref="SelectorsCount"/> property.
</summary>
+        public static readonly int DefaultSelectorsCount = Math.Min(4, Environment.ProcessorCount);
+
+        /// <summary> Default socket buffer size. </summary>
+        public const int DefaultSocketBufferSize = 32 * 1024;
+
+        /// <summary> Default value of <see cref="TcpNoDelay"/> property. </summary>
+        public const bool DefaultTcpNoDelay = true;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="TcpCommunicationSpi"/> class.
+        /// </summary>
+        public TcpCommunicationSpi()
+        {
+            AckSendThreshold = DefaultAckSendThreshold;
+            ConnectTimeout = DefaultConnectTimeout;
+            DirectBuffer = DefaultDirectBuffer;
+            DirectSendBuffer = DefaultDirectSendBuffer;
+            IdleConnectionTimeout = DefaultIdleConnectionTimeout;
+            LocalPort = DefaultLocalPort;
+            LocalPortRange = DefaultLocalPortRange;
+            MaxConnectTimeout = DefaultMaxConnectTimeout;
+            MessageQueueLimit = DefaultMessageQueueLimit;
+            ReconnectCount = DefaultReconnectCount;
+            SelectorsCount = DefaultSelectorsCount;
+            SocketReceiveBufferSize = DefaultSocketBufferSize;
+            SocketSendBufferSize = DefaultSocketBufferSize;
+            TcpNoDelay = DefaultTcpNoDelay;
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="TcpCommunicationSpi"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        internal TcpCommunicationSpi(BinaryReader reader)
+        {
+            AckSendThreshold = reader.ReadInt();
+            ConnectTimeout = reader.ReadLongAsTimespan();
+            DirectBuffer = reader.ReadBoolean();
+            DirectSendBuffer = reader.ReadBoolean();
+            IdleConnectionTimeout = reader.ReadLongAsTimespan();
+            LocalAddress = reader.ReadString();
+            LocalPort = reader.ReadInt();
+            LocalPortRange = reader.ReadInt();
+            MaxConnectTimeout = reader.ReadLongAsTimespan();
+            MessageQueueLimit = reader.ReadInt();
+            ReconnectCount = reader.ReadInt();
+            SelectorsCount = reader.ReadInt();
+            SlowClientQueueLimit = reader.ReadInt();
+            SocketReceiveBufferSize = reader.ReadInt();
+            SocketSendBufferSize = reader.ReadInt();
+            TcpNoDelay = reader.ReadBoolean();
+            UnacknowledgedMessagesBufferSize = reader.ReadInt();
+        }
+
+        /// <summary>
+        /// Gets or sets the number of received messages per connection to node 
+        /// after which acknowledgment message is sent.
+        /// </summary>
+        [DefaultValue(DefaultAckSendThreshold)]
+        public int AckSendThreshold { get; set; }
+
+        /// <summary>
+        /// Gets or sets the connect timeout used when establishing connection with remote
nodes.
+        /// </summary>
+        [DefaultValue(typeof(TimeSpan), "00:00:05")]
+        public TimeSpan ConnectTimeout { get; set; }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether to allocate direct (ByteBuffer.allocateDirect)

+        /// or heap (ByteBuffer.allocate) buffer.
+        /// </summary>
+        [DefaultValue(DefaultDirectBuffer)]
+        public bool DirectBuffer { get; set; }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether to allocate direct (ByteBuffer.allocateDirect)

+        /// or heap (ByteBuffer.allocate) send buffer.
+        /// </summary>
+        [DefaultValue(DefaultDirectSendBuffer)]
+        public bool DirectSendBuffer { get; set; }
+
+        /// <summary>
+        /// Sets maximum idle connection timeout upon which a connection to client will be
closed.
+        /// </summary>
+        [DefaultValue(typeof(TimeSpan), "00:00:30")]
+        public TimeSpan IdleConnectionTimeout { get; set; }
+
+        /// <summary>
+        /// Gets or sets the local host address for socket binding. Note that one node could
have 
+        /// additional addresses beside the loopback one. This configuration parameter is
optional.
+        /// </summary>
+        public string LocalAddress { get; set; }
+
+        /// <summary>
+        /// Gets or sets the local port for socket binding.
+        /// </summary>
+        [DefaultValue(DefaultLocalPort)]
+        public int LocalPort { get; set; }
+
+        /// <summary>
+        /// Gets or sets local port range for local host ports (value must greater than or
equal to <tt>0</tt>).
+        /// If provided local port <see cref="LocalPort"/> is occupied,
+        /// implementation will try to increment the port number for as long as it is less
than
+        /// initial value plus this range.
+        /// <para />
+        /// If port range value is <c>0</c>, then implementation will try bind
only to the port provided by
+        /// <see cref="LocalPort"/> method and fail if binding to this port did not
succeed.
+        /// </summary>
+        [DefaultValue(DefaultLocalPortRange)]
+        public int LocalPortRange { get; set; }
+
+        /// <summary>
+        /// Gets or sets maximum connect timeout. If handshake is not established within
connect timeout,
+        /// then SPI tries to repeat handshake procedure with increased connect timeout.
+        /// Connect timeout can grow till maximum timeout value,
+        /// if maximum timeout value is reached then the handshake is considered as failed.
+        /// <para />
+        /// <c>0</c> is interpreted as infinite timeout.
+        /// </summary>
+        [DefaultValue(typeof(TimeSpan), "00:10:00")]
+        public TimeSpan MaxConnectTimeout { get; set; }
+
+        /// <summary>
+        /// Gets or sets the message queue limit for incoming and outgoing messages.
+        /// <para />
+        /// When set to positive number send queue is limited to the configured value. 
+        /// <c>0</c> disables the limitation.
+        /// </summary>
+        [DefaultValue(DefaultMessageQueueLimit)]
+        public int MessageQueueLimit { get; set; }
+
+        /// <summary>
+        /// Gets or sets the maximum number of reconnect attempts used when establishing
connection with remote nodes.
+        /// </summary>
+        [DefaultValue(DefaultReconnectCount)]
+        public int ReconnectCount { get; set; }
+
+        /// <summary>
+        /// Gets or sets the count of selectors te be used in TCP server.
+        /// <para />
+        /// Default value is <see cref="DefaultSelectorsCount"/>, which is calculated
as
+        /// <c>Math.Min(4, Environment.ProcessorCount)</c>
+        /// </summary>
+        public int SelectorsCount { get; set; }
+
+        /// <summary>
+        /// Gets or sets slow client queue limit.
+        /// <para/>
+        /// When set to a positive number, communication SPI will monitor clients outbound
message queue sizes 
+        /// and will drop those clients whose queue exceeded this limit.
+        /// <para/>
+        /// Usually this value should be set to the same value as <see cref="MessageQueueLimit"/>
which controls
+        /// message back-pressure for server nodes. The default value for this parameter
is <c>0</c> 
+        /// which means unlimited.
+        /// </summary>
+        public int SlowClientQueueLimit { get; set; }
+
+        /// <summary>
+        /// Gets or sets the size of the socket receive buffer.
+        /// </summary>
+        [DefaultValue(DefaultSocketBufferSize)]
+        public int SocketReceiveBufferSize { get; set; }
+        
+        /// <summary>
+        /// Gets or sets the size of the socket send buffer.
+        /// </summary>
+        [DefaultValue(DefaultSocketBufferSize)]
+        public int SocketSendBufferSize { get; set; }
+
+        /// <summary>
+        /// Gets or sets the value for <c>TCP_NODELAY</c> socket option. Each
+        /// socket will be opened using provided value.
+        /// <para />
+        /// Setting this option to <c>true</c> disables Nagle's algorithm
+        /// for socket decreasing latency and delivery time for small messages.
+        /// <para />
+        /// For systems that work under heavy network load it is advisable to set this value
to <c>false</c>.
+        /// </summary>
+        [DefaultValue(DefaultTcpNoDelay)]
+        public bool TcpNoDelay { get; set; }
+
+        /// <summary>
+        /// Gets or sets the maximum number of stored unacknowledged messages per connection
to node. 
+        /// If number of unacknowledged messages exceeds this number 
+        /// then connection to node is closed and reconnect is attempted.
+        /// </summary>
+        public int UnacknowledgedMessagesBufferSize { get; set; }
+
+        /// <summary>
+        /// Writes this instance to the specified writer.
+        /// </summary>
+        internal void Write(IBinaryRawWriter writer)
+        {
+            writer.WriteInt(AckSendThreshold);
+            writer.WriteLong((long) ConnectTimeout.TotalMilliseconds);
+            writer.WriteBoolean(DirectBuffer);
+            writer.WriteBoolean(DirectSendBuffer);
+            writer.WriteLong((long) IdleConnectionTimeout.TotalMilliseconds);
+            writer.WriteString(LocalAddress);
+            writer.WriteInt(LocalPort);
+            writer.WriteInt(LocalPortRange);
+            writer.WriteLong((long) MaxConnectTimeout.TotalMilliseconds);
+            writer.WriteInt(MessageQueueLimit);
+            writer.WriteInt(ReconnectCount);
+            writer.WriteInt(SelectorsCount);
+            writer.WriteInt(SlowClientQueueLimit);
+            writer.WriteInt(SocketReceiveBufferSize);
+            writer.WriteInt(SocketSendBufferSize);
+            writer.WriteBoolean(TcpNoDelay);
+            writer.WriteInt(UnacknowledgedMessagesBufferSize);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
index e14c15b..62cad19 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
@@ -29,6 +29,8 @@
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Configuration;
     using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Communication;
+    using Apache.Ignite.Core.Communication.Tcp;
     using Apache.Ignite.Core.DataStructures.Configuration;
     using Apache.Ignite.Core.Discovery;
     using Apache.Ignite.Core.Discovery.Tcp;
@@ -220,6 +222,23 @@
             else
                 writer.WriteBoolean(false);
 
+            // Communication config
+            var comm = CommunicationSpi;
+
+            if (comm != null)
+            {
+                writer.WriteBoolean(true);
+
+                var tcpComm = comm as TcpCommunicationSpi;
+
+                if (tcpComm == null)
+                    throw new InvalidOperationException("Unsupported communication SPI: "
+ comm.GetType());
+
+                tcpComm.Write(writer);
+            }
+            else
+                writer.WriteBoolean(false);
+
             // Binary config
             var isCompactFooterSet = BinaryConfiguration != null && BinaryConfiguration.CompactFooterInternal
!= null;
 
@@ -302,6 +321,9 @@
             // Discovery config
             DiscoverySpi = r.ReadBoolean() ? new TcpDiscoverySpi(r) : null;
 
+            // Communication config
+            CommunicationSpi = r.ReadBoolean() ? new TcpCommunicationSpi(r) : null;
+
             // Binary config
             if (r.ReadBoolean())
             {
@@ -478,6 +500,12 @@
         public IDiscoverySpi DiscoverySpi { get; set; }
 
         /// <summary>
+        /// Gets or sets the communication service provider.
+        /// Null for default communication.
+        /// </summary>
+        public ICommunicationSpi CommunicationSpi { get; set; }
+
+        /// <summary>
         /// Gets or sets a value indicating whether node should start in client mode.
         /// Client node cannot hold data in the caches.
         /// </summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/003fe5e3/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index a0df870..29074e7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -262,6 +262,28 @@
                         <xs:attribute name="type" type="xs:string" use="required" />
                     </xs:complexType>
                 </xs:element>
+                <xs:element name="communicationSpi" minOccurs="0">
+                    <xs:complexType>
+                        <xs:attribute name="ackSendThreshold" type="xs:int" />
+                        <xs:attribute name="connectTimeout" type="xs:string" />
+                        <xs:attribute name="directBuffer" type="xs:boolean" />
+                        <xs:attribute name="directSendBuffer" type="xs:boolean" />
+                        <xs:attribute name="idleConnectionTimeout" type="xs:string" />
+                        <xs:attribute name="localAddress" type="xs:string" />
+                        <xs:attribute name="localPort" type="xs:int" />
+                        <xs:attribute name="localPortRange" type="xs:int" />
+                        <xs:attribute name="maxConnectTimeout" type="xs:string" />
+                        <xs:attribute name="messageQueueLimit" type="xs:string" />
+                        <xs:attribute name="reconnectCount" type="xs:int" />
+                        <xs:attribute name="selectorsCount" type="xs:int" />
+                        <xs:attribute name="slowClientQueueLimit" type="xs:int" />
+                        <xs:attribute name="socketReceiveBufferSize" type="xs:int" />
+                        <xs:attribute name="socketSendBufferSize" type="xs:int" />
+                        <xs:attribute name="tcpNoDelay" type="xs:boolean" />
+                        <xs:attribute name="unacknowledgedMessagesBufferSize" type="xs:int"
/>
+                        <xs:attribute name="type" type="xs:string" use="required" />
+                    </xs:complexType>
+                </xs:element>
                 <xs:element name="includedEventTypes" minOccurs="0">
                     <xs:complexType>
                         <xs:sequence>


Mime
View raw message