qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1736560 - in /qpid/java/branches/6.0.x: ./ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/ test-profiles/
Date Fri, 25 Mar 2016 11:13:08 GMT
Author: kwall
Date: Fri Mar 25 11:13:08 2016
New Revision: 1736560

URL: http://svn.apache.org/viewvc?rev=1736560&view=rev
Log:
QPID-7033: [Java Broker] Busy IO thread pools may cause client connections to be unfairly
closed

Should have been part of previous commit.

Merged with command:

svn merge -c 1732452,1732461,1732812    ^/qpid/java/trunk


Added:
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java
      - copied unchanged from r1732452, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java
Modified:
    qpid/java/branches/6.0.x/   (props changed)
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
    qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/java/branches/6.0.x/test-profiles/CPPExcludes   (contents, props changed)

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 25 11:13:08 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732525,1734452
+/qpid/java/trunk
 657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452
 /qpid/trunk/qpid:796646-796653

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Fri Mar 25 11:13:08 2016
@@ -105,8 +105,6 @@ public abstract class AbstractAMQPConnec
     private volatile AccessControlContext _accessControllerContext;
     private volatile Thread _ioThread;
 
-    private volatile SlowConnectionOpenTicker _slowConnectionOpenTicker;
-
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -168,8 +166,9 @@ public abstract class AbstractAMQPConnec
     {
         super.onOpen();
         long maxAuthDelay = _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
-        _slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
-        _aggregateTicker.addTicker(_slowConnectionOpenTicker);
+        SlowConnectionOpenTicker slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
+        _network.addSchedulingDelayNotificationListeners(slowConnectionOpenTicker);
+        _aggregateTicker.addTicker(slowConnectionOpenTicker);
         _lastReadTime = _lastWriteTime = getCreatedTime();
 
     }
@@ -676,21 +675,10 @@ public abstract class AbstractAMQPConnec
 
     protected abstract EventLogger getEventLogger();
 
-    @Override
-    public void processingStarted(final long currentTime)
-    {
-        SlowConnectionOpenTicker ticker = _slowConnectionOpenTicker;
-        long scheduledTime = _network.getScheduledTime();
-        if (ticker != null && scheduledTime > 0)
-        {
-            ticker.addSchedulingDelay(currentTime - scheduledTime);
-        }
-    }
-
-    private class SlowConnectionOpenTicker implements Ticker
+    private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
     {
         private final long _allowedTime;
-        private volatile long _accumulatedDelay;
+        private volatile long _accumulatedSchedulingDelay;
 
         public SlowConnectionOpenTicker(long timeoutTime)
         {
@@ -700,7 +688,8 @@ public abstract class AbstractAMQPConnec
         @Override
         public int getTimeToNextTick(final long currentTime)
         {
-            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime + _accumulatedDelay
- currentTime);
+            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime + _accumulatedSchedulingDelay
+                                              - currentTime);
             return timeToNextTick;
         }
 
@@ -720,15 +709,19 @@ public abstract class AbstractAMQPConnec
                 else
                 {
                     _aggregateTicker.removeTicker(this);
-                    _slowConnectionOpenTicker = null;
+                    _network.removeSchedulingDelayNotificationListeners(this);
                 }
             }
             return nextTick;
         }
 
-        public void addSchedulingDelay(final long delay)
+        @Override
+        public void notifySchedulingDelay(final long schedulingDelay)
         {
-            _accumulatedDelay += delay;
+            if (schedulingDelay > 0)
+            {
+                _accumulatedSchedulingDelay += schedulingDelay;
+            }
         }
     }
 }

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Fri Mar 25 11:13:08 2016
@@ -148,12 +148,6 @@ public class MultiVersionProtocolEngine
         _delegate.setIOThread(ioThread);
     }
 
-    @Override
-    public void processingStarted(final long currentTime)
-    {
-        _delegate.processingStarted(currentTime);
-    }
-
     public long getConnectionId()
     {
         return _id;
@@ -300,12 +294,6 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
-        public void processingStarted(final long currentTime)
-        {
-
-        }
-
-        @Override
         public void closed()
         {
 
@@ -550,12 +538,6 @@ public class MultiVersionProtocolEngine
         {
 
         }
-
-        @Override
-        public void processingStarted(final long currentTime)
-        {
-
-        }
 
         @Override
         public Subject getSubject()

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
(original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
Fri Mar 25 11:13:08 2016
@@ -101,6 +101,7 @@ public class NetworkConnectionScheduler
     void processConnection(final NonBlockingConnection connection)
     {
         Thread.currentThread().setName(connection.getThreadName());
+        connection.doPreWork();
         boolean rerun;
         do
         {

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Fri Mar 25 11:13:08 2016
@@ -27,8 +27,10 @@ import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -73,6 +75,7 @@ public class NonBlockingConnection imple
     private Iterator<Runnable> _pendingIterator;
     private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
     private final AtomicLong _maxReadIdleMillis = new AtomicLong();
+    private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners
= new CopyOnWriteArrayList<>();
 
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ProtocolEngine protocolEngine,
@@ -234,6 +237,22 @@ public class NonBlockingConnection imple
         return _protocolEngine.hasWork();
     }
 
+    public void doPreWork()
+    {
+        if (!_closed.get())
+        {
+            long currentTime = System.currentTimeMillis();
+            long schedulingDelay = currentTime - getScheduledTime();
+            if (!_schedulingDelayNotificationListeners.isEmpty())
+            {
+                for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
+                {
+                    listener.notifySchedulingDelay(schedulingDelay);
+                }
+            }
+        }
+    }
+
     public boolean doWork()
     {
         _protocolEngine.clearWork();
@@ -242,13 +261,11 @@ public class NonBlockingConnection imple
             try
             {
                 long currentTime = System.currentTimeMillis();
-                _protocolEngine.processingStarted(currentTime);
                 int tick = getTicker().getTimeToNextTick(currentTime);
                 if (tick <= 0)
                 {
                     getTicker().tick(currentTime);
                 }
-                _scheduledTime = 0;
 
                 _protocolEngine.setIOThread(Thread.currentThread());
                 _protocolEngine.setMessageAssignmentSuspended(true, true);
@@ -312,6 +329,18 @@ public class NonBlockingConnection imple
 
     }
 
+    @Override
+    public void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener
listener)
+    {
+        _schedulingDelayNotificationListeners.add(listener);
+    }
+
+    @Override
+    public void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener
listener)
+    {
+        _schedulingDelayNotificationListeners.remove(listener);
+    }
+
     private boolean processPending() throws IOException
     {
         if(_pendingIterator == null)
@@ -605,6 +634,7 @@ public class NonBlockingConnection imple
     public void clearScheduled()
     {
         _scheduled.set(false);
+        _scheduledTime = 0;
     }
 
     @Override

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
(original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
Fri Mar 25 11:13:08 2016
@@ -75,6 +75,4 @@ public interface ProtocolEngine extends
     void received(QpidByteBuffer msg);
 
     void setIOThread(Thread ioThread);
-
-    void processingStarted(long currentTime);
 }

Modified: qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
(original)
+++ qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
Fri Mar 25 11:13:08 2016
@@ -28,4 +28,8 @@ public interface ServerNetworkConnection
     String getTransportInfo();
 
     long getScheduledTime();
+
+    void addSchedulingDelayNotificationListeners(SchedulingDelayNotificationListener listener);
+
+    void removeSchedulingDelayNotificationListeners(SchedulingDelayNotificationListener listener);
 }

Modified: qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++ qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Fri Mar 25 11:13:08 2016
@@ -61,6 +61,7 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.transport.AcceptingTransport;
 import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.SchedulingDelayNotificationListener;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -431,6 +432,16 @@ class WebSocketProvider implements Accep
         }
 
         @Override
+        public void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener
listener)
+        {
+        }
+
+        @Override
+        public void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener
listener)
+        {
+        }
+
+        @Override
         public void reserveOutboundMessageSpace(final long size)
         {
             if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)

Modified: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/test-profiles/CPPExcludes?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/test-profiles/CPPExcludes (original)
+++ qpid/java/branches/6.0.x/test-profiles/CPPExcludes Fri Mar 25 11:13:08 2016
@@ -205,6 +205,7 @@ org.apache.qpid.transport.MaxFrameSizeTe
 
 // CPP Broker does not timeout connections with no activity like the Java Broker
 org.apache.qpid.transport.ProtocolNegotiationTest#testNoProtocolHeaderSent_BrokerClosesConnection
+org.apache.qpid.transport.ProtocolNegotiationTest#testNoConnectionOpenSent_BrokerClosesConnection
 
 // QPID-6000 : Tests Java specific message compression functionality, and uses the REST API
to test it
 org.apache.qpid.systest.MessageCompressionTest#*

Propchange: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 25 11:13:08 2016
@@ -6,4 +6,4 @@
 /qpid/branches/java-broker-vhost-refactor/java/test-profiles/CPPExcludes:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
 /qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes:1061302-1072333
-/qpid/java/trunk/test-profiles/CPPExcludes:1715446
+/qpid/java/trunk/test-profiles/CPPExcludes:1715446,1732812



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message