qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1732452 - /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/
Date Fri, 26 Feb 2016 10:55:34 GMT
Author: kwall
Date: Fri Feb 26 10:55:34 2016
New Revision: 1732452

URL: http://svn.apache.org/viewvc?rev=1732452&view=rev
Log:
QPID-7033: [Java Broker] Refactored to use a listener rather than special casing SlowConnectionOpenTicker
and avoided the zeroing scheduledTime during doWork algorithm.

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java
      - copied, changed from r1732333, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1732452&r1=1732451&r2=1732452&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Fri Feb 26 10:55:34 2016
@@ -106,11 +106,9 @@ public abstract class AbstractAMQPConnec
     private volatile Thread _ioThread;
 
     private volatile boolean _messageAuthorizationRequired;
-    private volatile SlowConnectionOpenTicker _slowConnectionOpenTicker;
 
     private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
 
-
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -173,8 +171,9 @@ public abstract class AbstractAMQPConnec
     {
         super.onOpen();
         long maxAuthDelay = _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
-        _slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
-        _aggregateTicker.addTicker(_slowConnectionOpenTicker);
+        SlowConnectionOpenTicker slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
+        _network.addSchedulingDelayNotificationListeners(slowConnectionOpenTicker);
+        _aggregateTicker.addTicker(slowConnectionOpenTicker);
         _lastReadTime = _lastWriteTime = getCreatedTime();
 
     }
@@ -723,21 +722,10 @@ public abstract class AbstractAMQPConnec
         return !_messageAuthorizationRequired || getAuthorizedPrincipal().getName().equals(userId
== null? "" : userId);
     }
 
-    @Override
-    public void processingStarted(final long currentTime)
-    {
-        SlowConnectionOpenTicker ticker = _slowConnectionOpenTicker;
-        long scheduledTime = _network.getScheduledTime();
-        if (ticker != null && scheduledTime > 0)
-        {
-            ticker.addSchedulingDelay(currentTime - scheduledTime);
-        }
-    }
-
-    private class SlowConnectionOpenTicker implements Ticker
+    private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
     {
         private final long _allowedTime;
-        private volatile long _accumulatedDelay;
+        private volatile long _accumulatedSchedulingDelay;
 
         public SlowConnectionOpenTicker(long timeoutTime)
         {
@@ -747,7 +735,8 @@ public abstract class AbstractAMQPConnec
         @Override
         public int getTimeToNextTick(final long currentTime)
         {
-            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime + _accumulatedDelay
- currentTime);
+            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime + _accumulatedSchedulingDelay
+                                              - currentTime);
             return timeToNextTick;
         }
 
@@ -767,15 +756,19 @@ public abstract class AbstractAMQPConnec
                 else
                 {
                     _aggregateTicker.removeTicker(this);
-                    _slowConnectionOpenTicker = null;
+                    _network.removeSchedulingDelayNotificationListeners(this);
                 }
             }
             return nextTick;
         }
 
-        public void addSchedulingDelay(final long delay)
+        @Override
+        public void notifySchedulingDelay(final long schedulingDelay)
         {
-            _accumulatedDelay += delay;
+            if (schedulingDelay > 0)
+            {
+                _accumulatedSchedulingDelay += schedulingDelay;
+            }
         }
     }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1732452&r1=1732451&r2=1732452&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Fri Feb 26 10:55:34 2016
@@ -148,12 +148,6 @@ public class MultiVersionProtocolEngine
         _delegate.setIOThread(ioThread);
     }
 
-    @Override
-    public void processingStarted(final long currentTime)
-    {
-        _delegate.processingStarted(currentTime);
-    }
-
     public long getConnectionId()
     {
         return _id;
@@ -300,12 +294,6 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
-        public void processingStarted(final long currentTime)
-        {
-
-        }
-
-        @Override
         public void closed()
         {
 
@@ -550,12 +538,6 @@ public class MultiVersionProtocolEngine
         {
 
         }
-
-        @Override
-        public void processingStarted(final long currentTime)
-        {
-
-        }
 
         @Override
         public Subject getSubject()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1732452&r1=1732451&r2=1732452&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
Fri Feb 26 10:55:34 2016
@@ -101,6 +101,7 @@ public class NetworkConnectionScheduler
     void processConnection(final NonBlockingConnection connection)
     {
         Thread.currentThread().setName(connection.getThreadName());
+        connection.doPreWork();
         boolean rerun;
         do
         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1732452&r1=1732451&r2=1732452&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Fri Feb 26 10:55:34 2016
@@ -27,8 +27,10 @@ import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -73,6 +75,7 @@ public class NonBlockingConnection imple
     private Iterator<Runnable> _pendingIterator;
     private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
     private final AtomicLong _maxReadIdleMillis = new AtomicLong();
+    private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners
= new CopyOnWriteArrayList<>();
 
     public NonBlockingConnection(SocketChannel socketChannel,
                                  ProtocolEngine protocolEngine,
@@ -234,6 +237,22 @@ public class NonBlockingConnection imple
         return _protocolEngine.hasWork();
     }
 
+    public void doPreWork()
+    {
+        if (!_closed.get())
+        {
+            long currentTime = System.currentTimeMillis();
+            long schedulingDelay = currentTime - getScheduledTime();
+            if (!_schedulingDelayNotificationListeners.isEmpty())
+            {
+                for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
+                {
+                    listener.notifySchedulingDelay(schedulingDelay);
+                }
+            }
+        }
+    }
+
     public boolean doWork()
     {
         _protocolEngine.clearWork();
@@ -242,13 +261,11 @@ public class NonBlockingConnection imple
             try
             {
                 long currentTime = System.currentTimeMillis();
-                _protocolEngine.processingStarted(currentTime);
                 int tick = getTicker().getTimeToNextTick(currentTime);
                 if (tick <= 0)
                 {
                     getTicker().tick(currentTime);
                 }
-                _scheduledTime = 0;
 
                 _protocolEngine.setIOThread(Thread.currentThread());
                 _protocolEngine.setMessageAssignmentSuspended(true, true);
@@ -312,6 +329,18 @@ public class NonBlockingConnection imple
 
     }
 
+    @Override
+    public void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener
listener)
+    {
+        _schedulingDelayNotificationListeners.add(listener);
+    }
+
+    @Override
+    public void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener
listener)
+    {
+        _schedulingDelayNotificationListeners.remove(listener);
+    }
+
     private boolean processPending() throws IOException
     {
         if(_pendingIterator == null)
@@ -611,6 +640,7 @@ public class NonBlockingConnection imple
     public void clearScheduled()
     {
         _scheduled.set(false);
+        _scheduledTime = 0;
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1732452&r1=1732451&r2=1732452&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
Fri Feb 26 10:55:34 2016
@@ -75,6 +75,4 @@ public interface ProtocolEngine extends
     void received(QpidByteBuffer msg);
 
     void setIOThread(Thread ioThread);
-
-    void processingStarted(long currentTime);
 }

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java
(from r1732333, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java&r1=1732333&r2=1732452&rev=1732452&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java
Fri Feb 26 10:55:34 2016
@@ -15,17 +15,12 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
 
 package org.apache.qpid.server.transport;
 
-import org.apache.qpid.transport.network.NetworkConnection;
-
-public interface ServerNetworkConnection extends NetworkConnection
+public interface SchedulingDelayNotificationListener
 {
-    void reserveOutboundMessageSpace(long size);
-
-    String getTransportInfo();
-
-    long getScheduledTime();
+    void notifySchedulingDelay(final long schedulingDelay);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1732452&r1=1732451&r2=1732452&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
Fri Feb 26 10:55:34 2016
@@ -28,4 +28,8 @@ public interface ServerNetworkConnection
     String getTransportInfo();
 
     long getScheduledTime();
+
+    void addSchedulingDelayNotificationListeners(SchedulingDelayNotificationListener listener);
+
+    void removeSchedulingDelayNotificationListeners(SchedulingDelayNotificationListener listener);
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message