tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From florianhockm...@apache.org
Subject [tinkerpop] 01/01: TINKERPOP-2288 Replace closed connections directly
Date Thu, 02 Apr 2020 12:42:12 GMT
This is an automated email from the ASF dual-hosted git repository.

florianhockmann pushed a commit to branch TINKERPOP-2288
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 5b490d310d089a82ccb8bf2e66d770299d224773
Author: Florian Hockmann <fh@florian-hockmann.de>
AuthorDate: Tue Mar 24 13:01:59 2020 +0100

    TINKERPOP-2288 Replace closed connections directly
    
    Closed connections are now replaced automatically in the background.
    If no open connection is available to answer a request, then the pool
    tries it again after some time. It uses a retry policy with exponential
    backoff for that, implemented with Polly.
    This change also ensures that only one task performs a pool resizing
    operation at a time.
    
    These changes should ensure that:
    - A connection is still returned quickly if one is available.
    - Closed connections are replaced immediately, without needing to wait
        for the next incoming request.
    - If the server is only unavailable temporarily (or it just closed
    open connections for some reason), then the user should should not get
    an exception.
    He only has to wait until the connections are replaced.
    
    TODO:
    - Make the retry policy configurable.
    - Document changes.
---
 gremlin-dotnet/glv/Gremlin.Net.csproj.template     |   7 +-
 .../src/Gremlin.Net/Driver/ConnectionFactory.cs    |   4 +-
 .../src/Gremlin.Net/Driver/ConnectionPool.cs       | 118 +++++++++----
 .../src/Gremlin.Net/Driver/GremlinClient.cs        |   2 +-
 .../src/Gremlin.Net/Driver/IConnection.cs          |   4 +
 .../{IConnection.cs => IConnectionFactory.cs}      |   9 +-
 .../src/Gremlin.Net/Driver/ProxyConnection.cs      |  26 ++-
 gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj  |   7 +-
 .../src/Gremlin.Net/Properties/AssemblyInfo.cs     |   3 +-
 .../Driver/ConnectionPoolTests.cs                  | 193 +++++++++++++++++++++
 10 files changed, 317 insertions(+), 56 deletions(-)

diff --git a/gremlin-dotnet/glv/Gremlin.Net.csproj.template b/gremlin-dotnet/glv/Gremlin.Net.csproj.template
index e4fafc1..aeb2df9 100644
--- a/gremlin-dotnet/glv/Gremlin.Net.csproj.template
+++ b/gremlin-dotnet/glv/Gremlin.Net.csproj.template
@@ -64,6 +64,7 @@ NOTE that versions suffixed with "-rc" are considered release candidates
(i.e. p
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
     <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
     <PackageReference Include="Microsoft.CSharp" Version="4.3.0" />
+    <PackageReference Include="Polly" Version="7.2.0" />
   </ItemGroup>
 
   <ItemGroup Condition="'\$(TargetFramework)' == 'netstandard1.3'">
@@ -73,9 +74,9 @@ NOTE that versions suffixed with "-rc" are considered release candidates
(i.e. p
   </ItemGroup>
 
   <ItemGroup>
-    <None Include="../../LICENSE" Pack="true" PackagePath=""/>
-    <None Include="../../NOTICE" Pack="true" PackagePath=""/>
-    <None Include="../../../docs/static/images/gremlin-dotnet-logo_256x256.png" Pack="true"
PackagePath="\"/>
+    <None Include="../../LICENSE" Pack="true" PackagePath="" />
+    <None Include="../../NOTICE" Pack="true" PackagePath="" />
+    <None Include="../../../docs/static/images/gremlin-dotnet-logo_256x256.png" Pack="true"
PackagePath="" />
   </ItemGroup>
 
 </Project>
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs
index d207a88..9ef32a5 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionFactory.cs
@@ -27,7 +27,7 @@ using Gremlin.Net.Structure.IO.GraphSON;
 
 namespace Gremlin.Net.Driver
 {
-    internal class ConnectionFactory
+    internal class ConnectionFactory : IConnectionFactory
     {
         private readonly GraphSONReader _graphSONReader;
         private readonly GraphSONWriter _graphSONWriter;
@@ -48,7 +48,7 @@ namespace Gremlin.Net.Driver
             _webSocketConfiguration = webSocketConfiguration;
         }
 
-        public Connection CreateConnection()
+        public IConnection CreateConnection()
         {
             return new Connection(_gremlinServer.Uri, _gremlinServer.Username, _gremlinServer.Password,
_graphSONReader,
                                  _graphSONWriter, _mimeType, _webSocketConfiguration, _sessionId);
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
index 34bc77f..50138f7 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ConnectionPool.cs
@@ -22,11 +22,13 @@
 #endregion
 
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Threading;
 using System.Threading.Tasks;
 using Gremlin.Net.Driver.Exceptions;
 using Gremlin.Net.Process;
+using Polly;
 
 namespace Gremlin.Net.Driver
 {
@@ -34,8 +36,11 @@ namespace Gremlin.Net.Driver
     {
         private const int ConnectionIndexOverflowLimit = int.MaxValue - 1000000;
         
-        private readonly ConnectionFactory _connectionFactory;
-        private readonly CopyOnWriteCollection<Connection> _connections = new CopyOnWriteCollection<Connection>();
+        private readonly IConnectionFactory _connectionFactory;
+        private readonly CopyOnWriteCollection<IConnection> _connections = new CopyOnWriteCollection<IConnection>();
+
+        private readonly ConcurrentDictionary<IConnection, byte> _deadConnections =
+            new ConcurrentDictionary<IConnection, byte>();
         private readonly int _poolSize;
         private readonly int _maxInProcessPerConnection;
         private int _connectionIndex;
@@ -43,53 +48,84 @@ namespace Gremlin.Net.Driver
         private const int PoolIdle = 0;
         private const int PoolPopulationInProgress = 1;
 
-        public ConnectionPool(ConnectionFactory connectionFactory, ConnectionPoolSettings
settings)
+        public ConnectionPool(IConnectionFactory connectionFactory, ConnectionPoolSettings
settings)
         {
             _connectionFactory = connectionFactory;
             _poolSize = settings.PoolSize;
             _maxInProcessPerConnection = settings.MaxInProcessPerConnection;
-            PopulatePoolAsync().WaitUnwrap();
+            ReplaceDeadConnectionsAsync().WaitUnwrap();
         }
         
         public int NrConnections => _connections.Count;
 
-        public async Task<IConnection> GetAvailableConnectionAsync()
+        public IConnection GetAvailableConnection()
         {
-            await EnsurePoolIsPopulatedAsync().ConfigureAwait(false);
-            return ProxiedConnection(GetConnectionFromPool());
+            var connection = Policy.Handle<ServerUnavailableException>()
+                .WaitAndRetry(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)))
+                .Execute(GetConnectionFromPool);
+
+            return ProxiedConnection(connection);
         }
 
-        private async Task EnsurePoolIsPopulatedAsync()
+        /// <summary>
+        ///     Replaces dead connections.
+        /// </summary>
+        /// <returns>True if the pool was repaired, false if repairing was not necessary.</returns>
+        private async Task<bool> EnsurePoolIsHealthyAsync()
         {
-            // The pool could have been (partially) empty because of connection problems.
So, we need to populate it again.
-            if (_poolSize <= NrConnections) return;
+            if (_deadConnections.IsEmpty) return false;
             var poolState = Interlocked.CompareExchange(ref _poolState, PoolPopulationInProgress,
PoolIdle);
-            if (poolState == PoolPopulationInProgress) return;
+            if (poolState == PoolPopulationInProgress) return false;
             try
             {
-                await PopulatePoolAsync().ConfigureAwait(false);
+                await ReplaceDeadConnectionsAsync().ConfigureAwait(false);
             }
             finally
             {
                 // We need to remove the PoolPopulationInProgress flag again even if an exception
occurred, so we don't block the pool population for ever
                 Interlocked.CompareExchange(ref _poolState, PoolIdle, PoolPopulationInProgress);
             }
+
+            return true;
+        }
+        
+        private async Task ReplaceDeadConnectionsAsync()
+        {
+            RemoveDeadConnections();
+
+            await FillPoolAsync().ConfigureAwait(false);
+        }
+
+        private void RemoveDeadConnections()
+        {
+            if (_deadConnections.IsEmpty) return;
+            
+            foreach (var deadConnection in _deadConnections.Keys)
+            {
+                if (_connections.TryRemove(deadConnection))
+                {
+                    DefinitelyDestroyConnection(deadConnection);
+                }
+            }
+
+            _deadConnections.Clear();
         }
         
-        private async Task PopulatePoolAsync()
+        private async Task FillPoolAsync()
         {
             var nrConnectionsToCreate = _poolSize - _connections.Count;
-            var connectionCreationTasks = new List<Task<Connection>>(nrConnectionsToCreate);
+            var connectionCreationTasks = new List<Task<IConnection>>(nrConnectionsToCreate);
             try
             {
                 for (var i = 0; i < nrConnectionsToCreate; i++)
                 {
                     connectionCreationTasks.Add(CreateNewConnectionAsync());
                 }
+
                 var createdConnections = await Task.WhenAll(connectionCreationTasks).ConfigureAwait(false);
                 _connections.AddRange(createdConnections);
             }
-            catch(Exception)
+            catch (Exception)
             {
                 // Dispose created connections if the connection establishment failed
                 foreach (var creationTask in connectionCreationTasks)
@@ -97,42 +133,45 @@ namespace Gremlin.Net.Driver
                     if (!creationTask.IsFaulted)
                         creationTask.Result?.Dispose();
                 }
+
                 throw;
             }
         }
-        
-        private async Task<Connection> CreateNewConnectionAsync()
+
+        private async Task<IConnection> CreateNewConnectionAsync()
         {
             var newConnection = _connectionFactory.CreateConnection();
             await newConnection.ConnectAsync().ConfigureAwait(false);
             return newConnection;
         }
 
-        private Connection GetConnectionFromPool()
+        private IConnection GetConnectionFromPool()
         {
             var connections = _connections.Snapshot;
             if (connections.Length == 0) throw new ServerUnavailableException();
             return TryGetAvailableConnection(connections);
         }
-
-        private Connection TryGetAvailableConnection(Connection[] connections)
+        
+        private IConnection TryGetAvailableConnection(IConnection[] connections)
         {
             var index = Interlocked.Increment(ref _connectionIndex);
             ProtectIndexFromOverflowing(index);
 
+            var closedConnections = 0;
             for (var i = 0; i < connections.Length; i++)
             {
                 var connection = connections[(index + i) % connections.Length];
                 if (connection.NrRequestsInFlight >= _maxInProcessPerConnection) continue;
                 if (!connection.IsOpen)
                 {
-                    RemoveConnectionFromPool(connection);
+                    ReplaceConnection(connection);
+                    closedConnections++;
                     continue;
                 }
                 return connection;
             }
 
-            if (connections.Length > 0) 
+            if (connections.Length > closedConnections) 
             {
                 throw new ConnectionPoolBusyException(_poolSize, _maxInProcessPerConnection);
             }
@@ -148,26 +187,39 @@ namespace Gremlin.Net.Driver
                 Interlocked.Exchange(ref _connectionIndex, 0);
         }
 
-        private void RemoveConnectionFromPool(Connection connection)
+        private void ReplaceConnection(IConnection connection)
         {
-            if (_connections.TryRemove(connection))
-                DefinitelyDestroyConnection(connection);
+            RemoveConnectionFromPool(connection);
+            TriggerReplacementOfDeadConnections();
         }
         
-        private IConnection ProxiedConnection(Connection connection)
+        private void RemoveConnectionFromPool(IConnection connection)
         {
-            return new ProxyConnection(connection, ReturnConnectionIfOpen);
+            _deadConnections.TryAdd(connection, 0);
         }
 
-        private void ReturnConnectionIfOpen(Connection connection)
+        private void TriggerReplacementOfDeadConnections()
         {
-            if (connection.IsOpen) return;
-            ConsiderUnavailable();
+            ReplaceClosedConnectionsAsync().Forget();
         }
 
-        private void ConsiderUnavailable()
+        private async Task ReplaceClosedConnectionsAsync()
         {
-            CloseAndRemoveAllConnectionsAsync().WaitUnwrap();
+            var poolWasPopulated = await EnsurePoolIsHealthyAsync().ConfigureAwait(false);
+            // Another connection could have been removed already, check if another population
is necessary
+            if (poolWasPopulated)
+                await ReplaceClosedConnectionsAsync().ConfigureAwait(false);
+        }
+
+        private IConnection ProxiedConnection(IConnection connection)
+        {
+            return new ProxyConnection(connection, ReplaceConnectionIfItWasClosed);
+        }
+
+        private void ReplaceConnectionIfItWasClosed(IConnection connection)
+        {
+            if (connection.IsOpen) return;
+            ReplaceConnection(connection);
         }
 
         private async Task CloseAndRemoveAllConnectionsAsync()
@@ -179,7 +231,7 @@ namespace Gremlin.Net.Driver
             }
         }
 
-        private void DefinitelyDestroyConnection(Connection connection)
+        private void DefinitelyDestroyConnection(IConnection connection)
         {
             connection.Dispose();
         }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs
index 262b489..bf637bf 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/GremlinClient.cs
@@ -96,7 +96,7 @@ namespace Gremlin.Net.Driver
         /// <inheritdoc />
         public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
         {
-            using (var connection = await _connectionPool.GetAvailableConnectionAsync().ConfigureAwait(false))
+            using (var connection = _connectionPool.GetAvailableConnection())
             {
                 return await connection.SubmitAsync<T>(requestMessage).ConfigureAwait(false);
             }
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
index b5ef52c..7d29571 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
@@ -30,6 +30,10 @@ namespace Gremlin.Net.Driver
 {
     internal interface IConnection : IDisposable
     {
+        Task ConnectAsync();
         Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
+        int NrRequestsInFlight { get; }
+        bool IsOpen { get; }
+        Task CloseAsync();
     }
 }
\ No newline at end of file
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs
similarity index 78%
copy from gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
copy to gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs
index b5ef52c..0c7ace2 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/IConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/IConnectionFactory.cs
@@ -21,15 +21,10 @@
 
 #endregion
 
-using System;
-using System.Collections.Generic;
-using System.Threading.Tasks;
-using Gremlin.Net.Driver.Messages;
-
 namespace Gremlin.Net.Driver
 {
-    internal interface IConnection : IDisposable
+    internal interface IConnectionFactory
     {
-        Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage);
+        IConnection CreateConnection();
     }
 }
\ No newline at end of file
diff --git a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
index fef6ede..421d310 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Driver/ProxyConnection.cs
@@ -30,23 +30,37 @@ namespace Gremlin.Net.Driver
 {
     internal sealed class ProxyConnection : IConnection
     {
-        private readonly Connection _realConnection;
-        private readonly Action<Connection> _releaseAction;
+        public IConnection ProxiedConnection { get; set; }
+        private readonly Action<IConnection> _releaseAction;
 
-        public ProxyConnection(Connection realConnection, Action<Connection> releaseAction)
+        public ProxyConnection(IConnection proxiedConnection, Action<IConnection> releaseAction)
         {
-            _realConnection = realConnection;
+            ProxiedConnection = proxiedConnection;
             _releaseAction = releaseAction;
         }
 
+        public async Task ConnectAsync()
+        {
+            await ProxiedConnection.ConnectAsync().ConfigureAwait(false);
+        }
+
         public async Task<ResultSet<T>> SubmitAsync<T>(RequestMessage requestMessage)
         {
-            return await _realConnection.SubmitAsync<T>(requestMessage).ConfigureAwait(false);
+            return await ProxiedConnection.SubmitAsync<T>(requestMessage).ConfigureAwait(false);
+        }
+
+        public int NrRequestsInFlight => ProxiedConnection.NrRequestsInFlight;
+
+        public bool IsOpen => ProxiedConnection.IsOpen;
+
+        public async Task CloseAsync()
+        {
+            await ProxiedConnection.CloseAsync().ConfigureAwait(false);
         }
 
         public void Dispose()
         {
-            _releaseAction(_realConnection);
+            _releaseAction(ProxiedConnection);
         }
     }
 }
\ No newline at end of file
diff --git a/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj b/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj
index 3392ca8..ff22147 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj
+++ b/gremlin-dotnet/src/Gremlin.Net/Gremlin.Net.csproj
@@ -64,6 +64,7 @@ NOTE that versions suffixed with "-rc" are considered release candidates
(i.e. p
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All"/>
     <PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
     <PackageReference Include="Microsoft.CSharp" Version="4.3.0" />
+    <PackageReference Include="Polly" Version="7.2.0" />
   </ItemGroup>
 
   <ItemGroup Condition="'$(TargetFramework)' == 'netstandard1.3'">
@@ -73,9 +74,9 @@ NOTE that versions suffixed with "-rc" are considered release candidates
(i.e. p
   </ItemGroup>
 
   <ItemGroup>
-    <None Include="../../LICENSE" Pack="true" PackagePath=""/>
-    <None Include="../../NOTICE" Pack="true" PackagePath=""/>
-    <None Include="../../../docs/static/images/gremlin-dotnet-logo_256x256.png" Pack="true"
PackagePath="\"/>
+    <None Include="../../LICENSE" Pack="true" PackagePath="" />
+    <None Include="../../NOTICE" Pack="true" PackagePath="" />
+    <None Include="../../../docs/static/images/gremlin-dotnet-logo_256x256.png" Pack="true"
PackagePath="" />
   </ItemGroup>
 
 </Project>
diff --git a/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs b/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs
index 3f90e5d..4351b0e 100644
--- a/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs
+++ b/gremlin-dotnet/src/Gremlin.Net/Properties/AssemblyInfo.cs
@@ -23,4 +23,5 @@
 
 using System.Runtime.CompilerServices;
 
-[assembly: InternalsVisibleTo("Gremlin.Net.UnitTest, PublicKey=00240000048000009400000006020000002400005253413100040000010001009bbf7a5b9966d9207d8abb9d3d3e98f5e387b292742cfb791dc657357221c3ac9b38ab6dab89630dc8edb3cde84a107f493d192116a934afa463355eefd58b82fd08dc2616ee6074a74bf5845652864746e285bd04e2e1a87921e8e2c383d1b302e7bee1fd7cdab5fe2bbed8c6677624d63433548d43a873ab5650ed96fb0687")]
\ No newline at end of file
+[assembly: InternalsVisibleTo("Gremlin.Net.UnitTest, PublicKey=00240000048000009400000006020000002400005253413100040000010001009bbf7a5b9966d9207d8abb9d3d3e98f5e387b292742cfb791dc657357221c3ac9b38ab6dab89630dc8edb3cde84a107f493d192116a934afa463355eefd58b82fd08dc2616ee6074a74bf5845652864746e285bd04e2e1a87921e8e2c383d1b302e7bee1fd7cdab5fe2bbed8c6677624d63433548d43a873ab5650ed96fb0687")]
+[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")]
\ No newline at end of file
diff --git a/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
new file mode 100644
index 0000000..2d33d23
--- /dev/null
+++ b/gremlin-dotnet/test/Gremlin.Net.UnitTest/Driver/ConnectionPoolTests.cs
@@ -0,0 +1,193 @@
+#region License
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Gremlin.Net.Driver;
+using Gremlin.Net.Driver.Exceptions;
+using Moq;
+using Xunit;
+
+namespace Gremlin.Net.UnitTest.Driver
+{
+    public class ConnectionPoolTests
+    {
+        [Theory]
+        [InlineData(1)]
+        [InlineData(2)]
+        [InlineData(10)]
+        public void ShouldEstablishConfiguredNrConnections(int poolSize)
+        {
+            var mockedConnectionFactory = new Mock<IConnectionFactory>();
+            var mockedConnection = new Mock<IConnection>();
+            mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(mockedConnection.Object);
+            var pool = CreateConnectionPool(mockedConnectionFactory.Object, poolSize);
+            
+            Assert.Equal(poolSize, pool.NrConnections);
+            mockedConnectionFactory.Verify(m => m.CreateConnection(), Times.Exactly(poolSize));
+            mockedConnection.Verify(m => m.ConnectAsync(), Times.Exactly(poolSize));
+        }
+
+        [Fact]
+        public void GetAvailableConnectionShouldReturnFirstOpenConnection()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            var openConnectionToReturn = OpenConnection;
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection()).Returns(ClosedConnection)
+                .Returns(ClosedConnection).Returns(openConnectionToReturn);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3);
+
+            var returnedConnection = pool.GetAvailableConnection();
+
+            Assert.Equal(openConnectionToReturn, ((ProxyConnection) returnedConnection).ProxiedConnection);
+        }
+        
+        [Fact]
+        public void GetAvailableConnectionShouldThrowIfAllConnectionsAreClosed()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object);
+
+            Assert.Throws<ServerUnavailableException>(() => pool.GetAvailableConnection());
+        }
+        
+        [Fact]
+        public void GetAvailableConnectionShouldReplaceClosedConnections()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.SetupSequence(m => m.CreateConnection()).Returns(ClosedConnection)
+                .Returns(ClosedConnection).Returns(OpenConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 3);
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection);
+            var nrCreatedConnections = pool.NrConnections;
+            
+            pool.GetAvailableConnection();
+            pool.GetAvailableConnection();
+            pool.GetAvailableConnection();
+
+            AssertNrOpenConnections(pool, nrCreatedConnections);
+        }
+
+        private static void AssertNrOpenConnections(ConnectionPool connectionPool, int expectedNrConnections)
+        {
+            for (var i = 0; i < expectedNrConnections; i++)
+            {
+                var connection = connectionPool.GetAvailableConnection();
+                Assert.True(connection.IsOpen);
+            }
+            Assert.Equal(expectedNrConnections, connectionPool.NrConnections);
+        }
+        
+        [Fact]
+        public async Task ShouldNotCreateMoreConnectionsThanConfiguredForParallelRequests()
+        {
+            var mockedConnectionFactory = new Mock<IConnectionFactory>();
+            mockedConnectionFactory.SetupSequence(m => m.CreateConnection()).Returns(ClosedConnection)
+                .Returns(ClosedConnection).Returns(OpenConnection);
+            var pool = CreateConnectionPool(mockedConnectionFactory.Object, 3);
+            mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection);
+            var nrCreatedConnections = pool.NrConnections;
+            var getConnectionTasks = new List<Task<IConnection>>();
+
+            for (var i = 0; i < 100; i++)
+            {
+                getConnectionTasks.Add(Task.Run(() => pool.GetAvailableConnection()));
+            }
+            await Task.WhenAll(getConnectionTasks);
+
+            await Task.Delay(1000);
+            Assert.Equal(nrCreatedConnections, pool.NrConnections);
+        }
+
+        [Fact]
+        public async Task ShouldReplaceConnectionClosedDuringSubmit()
+        {
+            var mockedConnectionFactory = new Mock<IConnectionFactory>();
+            var fakedConnection = new Mock<IConnection>();
+            fakedConnection.Setup(f => f.IsOpen).Returns(true);
+            mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(fakedConnection.Object);
+            var pool = CreateConnectionPool(mockedConnectionFactory.Object, 1);
+            var returnedConnection = pool.GetAvailableConnection();
+            fakedConnection.Setup(f => f.IsOpen).Returns(false);
+            mockedConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection);
+
+            await returnedConnection.SubmitAsync<bool>(null);
+            returnedConnection.Dispose();
+
+            Assert.Equal(1, pool.NrConnections);
+            Assert.True(pool.GetAvailableConnection().IsOpen);
+        }
+
+        [Fact]
+        public void ShouldWaitForHostToBecomeAvailable()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(OpenConnection);
+            var nrCreatedConnections = pool.NrConnections;
+            
+            var connection = pool.GetAvailableConnection();
+
+            AssertNrOpenConnections(pool, nrCreatedConnections);
+            Assert.True(connection.IsOpen);
+        }
+
+        [Fact]
+        public void ShouldThrowAfterWaitingTooLongForUnavailableServer()
+        {
+            var fakeConnectionFactory = new Mock<IConnectionFactory>();
+            fakeConnectionFactory.Setup(m => m.CreateConnection()).Returns(ClosedConnection);
+            var pool = CreateConnectionPool(fakeConnectionFactory.Object, 1);
+            
+            Assert.Throws<ServerUnavailableException>(() => pool.GetAvailableConnection());
+        }
+
+        private static IConnection OpenConnection
+        {
+            get
+            {
+                var fakedConnection = new Mock<IConnection>();
+                fakedConnection.Setup(f => f.IsOpen).Returns(true);
+                return fakedConnection.Object;
+            }
+        }
+        
+        private static IConnection ClosedConnection
+        {
+            get
+            {
+                var fakedConnection = new Mock<IConnection>();
+                fakedConnection.Setup(f => f.IsOpen).Returns(false);
+                return fakedConnection.Object;
+            }
+        }
+
+        private static ConnectionPool CreateConnectionPool(IConnectionFactory connectionFactory,
int poolSize = 2)
+        {
+            return new ConnectionPool(connectionFactory, new ConnectionPoolSettings {PoolSize
= poolSize});
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message