qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1732184 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/main/java/org/apache/qpid/server/transport/ broker-plugins/amqp-0-10-protocol...
Date Wed, 24 Feb 2016 16:30:32 GMT
Author: kwall
Date: Wed Feb 24 16:30:32 2016
New Revision: 1732184

URL: http://svn.apache.org/viewvc?rev=1732184&view=rev
Log:
QPID-7033: [Java Broker] Change IO tickers use scheduled time when considering if it is time
to 'tick'

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
Wed Feb 24 16:30:32 2016
@@ -20,15 +20,15 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 
 public class ConnectionClosingTicker implements Ticker
 {
     private final long _timeoutTime;
-    private final NetworkConnection _network;
+    private final ServerNetworkConnection _network;
 
-    public ConnectionClosingTicker(final long timeoutTime, final NetworkConnection network)
+    public ConnectionClosingTicker(final long timeoutTime, final ServerNetworkConnection
network)
     {
         _timeoutTime = timeoutTime;
         _network = network;
@@ -37,6 +37,11 @@ public class ConnectionClosingTicker imp
     @Override
     public int getTimeToNextTick(final long currentTime)
     {
+        if (_network.getScheduledTime() > 0)
+        {
+            return (int) (_timeoutTime - _network.getScheduledTime());
+        }
+
         return (int) (_timeoutTime - currentTime);
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
Wed Feb 24 16:30:32 2016
@@ -24,8 +24,8 @@ import org.apache.qpid.transport.network
 
 abstract public class SuspendedConsumerLoggingTicker implements Ticker
 {
-    private long _nextTick;
-    private long _startTime;
+    private volatile long _nextTick;
+    private volatile long _startTime;
     private final long _repeatPeriod;
 
     public SuspendedConsumerLoggingTicker(final long repeatPeriod)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Wed Feb 24 16:30:32 2016
@@ -43,7 +43,6 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -107,6 +106,7 @@ public abstract class AbstractAMQPConnec
     private volatile Thread _ioThread;
 
     private volatile boolean _messageAuthorizationRequired;
+    private volatile SlowConnectionOpenTicker _slowConnectionOpenTicker;
 
     private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
 
@@ -173,7 +173,8 @@ public abstract class AbstractAMQPConnec
     {
         super.onOpen();
         long maxAuthDelay = _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
-        _aggregateTicker.addTicker(new SlowConnectionOpenTicker(maxAuthDelay));
+        _slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
+        _aggregateTicker.addTicker(_slowConnectionOpenTicker);
         _lastReadTime = _lastWriteTime = getCreatedTime();
 
     }
@@ -183,7 +184,7 @@ public abstract class AbstractAMQPConnec
         return _broker;
     }
 
-    public final NetworkConnection getNetwork()
+    public final ServerNetworkConnection getNetwork()
     {
         return _network;
     }
@@ -722,9 +723,21 @@ public abstract class AbstractAMQPConnec
         return !_messageAuthorizationRequired || getAuthorizedPrincipal().getName().equals(userId
== null? "" : userId);
     }
 
+    @Override
+    public void processingStarted(final long currentTime)
+    {
+        SlowConnectionOpenTicker ticker = _slowConnectionOpenTicker;
+        long scheduledTime = _network.getScheduledTime();
+        if (ticker != null && scheduledTime > 0)
+        {
+            ticker.addSchedulingDelay(currentTime - scheduledTime);
+        }
+    }
+
     private class SlowConnectionOpenTicker implements Ticker
     {
         private final long _allowedTime;
+        private volatile long _accumulatedDelay;
 
         public SlowConnectionOpenTicker(long timeoutTime)
         {
@@ -734,7 +747,7 @@ public abstract class AbstractAMQPConnec
         @Override
         public int getTimeToNextTick(final long currentTime)
         {
-            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime - currentTime);
+            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime + _accumulatedDelay
- currentTime);
             return timeToNextTick;
         }
 
@@ -754,10 +767,15 @@ public abstract class AbstractAMQPConnec
                 else
                 {
                     _aggregateTicker.removeTicker(this);
+                    _slowConnectionOpenTicker = null;
                 }
             }
             return nextTick;
         }
-    }
 
+        public void addSchedulingDelay(final long delay)
+        {
+            _accumulatedDelay += delay;
+        }
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Wed Feb 24 16:30:32 2016
@@ -148,6 +148,12 @@ public class MultiVersionProtocolEngine
         _delegate.setIOThread(ioThread);
     }
 
+    @Override
+    public void processingStarted(final long currentTime)
+    {
+        _delegate.processingStarted(currentTime);
+    }
+
     public long getConnectionId()
     {
         return _id;
@@ -294,6 +300,12 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
+        public void processingStarted(final long currentTime)
+        {
+
+        }
+
+        @Override
         public void closed()
         {
 
@@ -539,6 +551,11 @@ public class MultiVersionProtocolEngine
 
         }
 
+        @Override
+        public void processingStarted(final long currentTime)
+        {
+
+        }
 
         @Override
         public Subject getSubject()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Wed Feb 24 16:30:32 2016
@@ -66,6 +66,7 @@ public class NonBlockingConnection imple
 
     private final AmqpPort _port;
     private final AtomicBoolean _scheduled = new AtomicBoolean();
+    private volatile long _scheduledTime;
     private volatile boolean _unexpectedByteBufferSizeReported;
     private final String _threadName;
     private volatile SelectorThread.SelectionTask _selectionTask;
@@ -241,11 +242,13 @@ public class NonBlockingConnection imple
             try
             {
                 long currentTime = System.currentTimeMillis();
+                _protocolEngine.processingStarted(currentTime);
                 int tick = getTicker().getTimeToNextTick(currentTime);
                 if (tick <= 0)
                 {
                     getTicker().tick(currentTime);
                 }
+                _scheduledTime = 0;
 
                 _protocolEngine.setIOThread(Thread.currentThread());
                 _protocolEngine.setMessageAssignmentSuspended(true, true);
@@ -597,7 +600,12 @@ public class NonBlockingConnection imple
 
     public boolean setScheduled()
     {
-        return _scheduled.compareAndSet(false,true);
+        final boolean scheduled = _scheduled.compareAndSet(false, true);
+        if (scheduled)
+        {
+            _scheduledTime = System.currentTimeMillis();
+        }
+        return scheduled;
     }
 
     public void clearScheduled()
@@ -605,6 +613,12 @@ public class NonBlockingConnection imple
         _scheduled.set(false);
     }
 
+    @Override
+    public long getScheduledTime()
+    {
+        return _scheduledTime;
+    }
+
     void reportUnexpectedByteBufferSizeUsage()
     {
         if (!_unexpectedByteBufferSizeReported)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Wed Feb 24 16:30:32 2016
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
 
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
@@ -149,10 +149,7 @@ public class NonBlockingNetworkTransport
 
                     socketChannel.configureBlocking(false);
 
-                    AggregateTicker aggregateTicker = engine.getAggregateTicker();
 
-                    final IdleTimeoutTicker idleTimeoutTicker = new IdleTimeoutTicker(engine,
_timeout);
-                    aggregateTicker.addTicker(idleTimeoutTicker);
 
                     NonBlockingConnection connection =
                             new NonBlockingConnection(socketChannel,
@@ -170,11 +167,16 @@ public class NonBlockingNetworkTransport
                                                       _scheduler,
                                                       _port);
 
+                    AggregateTicker aggregateTicker = engine.getAggregateTicker();
+
+                    Ticker writeIdleTimeoutTicker = new ServerIdleWriteTimeoutTicker(connection,
engine, _timeout);
+                    Ticker readIdleTimeoutTicker = new ServerIdleReadTimeoutTicker(connection,
engine, _timeout);
+                    aggregateTicker.addTicker(writeIdleTimeoutTicker);
+                    aggregateTicker.addTicker(readIdleTimeoutTicker);
+
                     engine.setNetworkConnection(connection);
                     connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
 
-                    idleTimeoutTicker.setConnection(connection);
-
                     connection.start();
 
                     _scheduler.addConnection(connection);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
Wed Feb 24 16:30:32 2016
@@ -75,4 +75,6 @@ public interface ProtocolEngine extends
     void received(QpidByteBuffer msg);
 
     void setIOThread(Thread ioThread);
+
+    void processingStarted(long currentTime);
 }

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java?rev=1732184&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
(added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
Wed Feb 24 16:30:32 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class ServerIdleReadTimeoutTicker implements Ticker
+{
+    private final TransportActivity _transport;
+    private final int _defaultTimeout;
+    private final ServerNetworkConnection _connection;
+
+    public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection, TransportActivity
transport,
+                                       int defaultTimeout)
+    {
+        _connection = connection;
+        _transport = transport;
+        _defaultTimeout = defaultTimeout;
+    }
+
+    @Override
+    public int getTimeToNextTick(long currentTime)
+    {
+        final long maxReadIdle = _connection.getMaxReadIdleMillis();
+        if (maxReadIdle > 0)
+        {
+            long nextTime = _transport.getLastReadTime() + maxReadIdle;
+            return (int) (nextTime - (_connection.getScheduledTime() > 0 ?  _connection.getScheduledTime()
: currentTime) );
+        }
+
+        return _defaultTimeout;
+    }
+
+    @Override
+    public int tick(long currentTime)
+    {
+        int timeToNextTick = getTimeToNextTick(currentTime);;
+        if (_connection.getMaxReadIdleMillis() > 0 && timeToNextTick <= 0)
+        {
+            _transport.readerIdle();
+        }
+
+        return timeToNextTick;
+    }
+}

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java?rev=1732184&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
(added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Wed Feb 24 16:30:32 2016
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.transport;
+
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class ServerIdleWriteTimeoutTicker implements Ticker
+{
+    private final TransportActivity _transport;
+    private final int _defaultTimeout;
+    private final ServerNetworkConnection _connection;
+
+    public ServerIdleWriteTimeoutTicker(ServerNetworkConnection connection, TransportActivity
transport,
+                                        int defaultTimeout)
+    {
+        _connection = connection;
+        _transport = transport;
+        _defaultTimeout = defaultTimeout;
+    }
+
+    @Override
+    public int getTimeToNextTick(long currentTime)
+    {
+        long maxWriteIdle = _connection.getMaxWriteIdleMillis();
+        if (maxWriteIdle > 0)
+        {
+            long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
+            return (int) (writeTime - currentTime);
+        }
+
+        return _defaultTimeout;
+    }
+
+    @Override
+    public int tick(long currentTime)
+    {
+        int timeToNextTick = getTimeToNextTick(currentTime);
+        if (_connection.getMaxWriteIdleMillis() > 0 && timeToNextTick <= 0)
+        {
+            _transport.writerIdle();
+        }
+
+        return timeToNextTick;
+    }
+}

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
(original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
Wed Feb 24 16:30:32 2016
@@ -26,4 +26,6 @@ public interface ServerNetworkConnection
     void reserveOutboundMessageSpace(long size);
 
     String getTransportInfo();
+
+    long getScheduledTime();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
(original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
Wed Feb 24 16:30:32 2016
@@ -50,6 +50,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.Connection;
@@ -139,7 +140,7 @@ public class ServerConnection extends Co
 
         if(state == State.CLOSING)
         {
-            getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis()
+ CLOSE_OK_TIMEOUT, getNetworkConnection()));
+            getAmqpConnection().getAggregateTicker().addTicker(new ConnectionClosingTicker(System.currentTimeMillis()
+ CLOSE_OK_TIMEOUT, (ServerNetworkConnection) getNetworkConnection()));
         }
     }
 

Modified: qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++ qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Wed Feb 24 16:30:32 2016
@@ -451,6 +451,12 @@ class WebSocketProvider implements Accep
             return _connection.getProtocol();
         }
 
+        @Override
+        public long getScheduledTime()
+        {
+            return 0;
+        }
+
         void setPeerCertificate(final Certificate certificate)
         {
             _certificate = certificate;

Modified: qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
(original)
+++ qpid/java/trunk/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
Wed Feb 24 16:30:32 2016
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * A basic implementation of TCP traffic forwarder between ports.
  * It is intended to use in tests.
  */
-public class TCPTunneler
+public class TCPTunneler implements AutoCloseable
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(TCPTunneler.class);
 
@@ -60,6 +60,11 @@ public class TCPTunneler
         _tcpWorker.start();
     }
 
+    public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
+    {
+        _tcpWorker.stopClientToServerForwarding(clientAddress);
+    }
+
     public void stop()
     {
         try
@@ -96,6 +101,12 @@ public class TCPTunneler
         return _tcpWorker.getLocalPort();
     }
 
+    @Override
+    public void close() throws Exception
+    {
+        stop();
+    }
+
     public interface TunnelListener
     {
         void clientConnected(InetSocketAddress clientAddress);
@@ -115,6 +126,7 @@ public class TCPTunneler
         private final TunnelListener _notifyingListener;
         private volatile ServerSocket _serverSocket;
         private volatile ExecutorService _executor;
+        private int _actualLocalPort;
 
         public TCPWorker(final int localPort,
                          final String remoteHost,
@@ -184,20 +196,22 @@ public class TCPTunneler
 
         public void start()
         {
-            LOGGER.info("Starting TCPTunneler forwarding from port {} to {}", _localPort,
_remoteHostPort);
+            _actualLocalPort = _localPort;
             try
             {
                 _serverSocket = new ServerSocket(_localPort);
+                _actualLocalPort = _serverSocket.getLocalPort();
+                LOGGER.info                                  ("Starting TCPTunneler forwarding
from port {} to {}",
+                            _actualLocalPort, _remoteHostPort);
                 _serverSocket.setReuseAddress(true);
             }
             catch (IOException e)
             {
-                throw new RuntimeException("Cannot start TCPTunneler on port " + _localPort,
e);
+                throw new RuntimeException("Cannot start TCPTunneler on port " + _actualLocalPort,
e);
             }
 
             if (_serverSocket != null)
             {
-                LOGGER.info("Listening on port {}", _localPort);
                 try
                 {
                     _executor.execute(this);
@@ -210,7 +224,7 @@ public class TCPTunneler
                     }
                     finally
                     {
-                        throw new RuntimeException("Cannot start acceptor thread for TCPTunneler
on port " + _localPort,
+                        throw new RuntimeException("Cannot start acceptor thread for TCPTunneler
on port " + _actualLocalPort,
                                                    e);
                     }
                 }
@@ -222,7 +236,7 @@ public class TCPTunneler
             if (_closed.compareAndSet(false, true))
             {
                 LOGGER.info("Stopping TCPTunneler forwarding from port {} to {}",
-                            _localPort,
+                            _actualLocalPort,
                             _remoteHostPort);
                 try
                 {
@@ -237,7 +251,7 @@ public class TCPTunneler
                 }
 
                 LOGGER.info("TCPTunneler forwarding from port {} to {} is stopped",
-                            _localPort,
+                            _actualLocalPort,
                             _remoteHostPort);
             }
         }
@@ -330,6 +344,28 @@ public class TCPTunneler
             }
         }
 
+        public void stopClientToServerForwarding(final InetSocketAddress clientAddress)
+        {
+            SocketTunnel target = null;
+            for (SocketTunnel tunnel : _tunnels)
+            {
+                if (tunnel.getClientAddress().equals(clientAddress))
+                {
+                    target = tunnel;
+                    break;
+                }
+            }
+            if (target != null)
+            {
+                LOGGER.debug("Stopping forwarding from client {} to server", clientAddress);
+                target.stopClientToServerForwarding();
+            }
+            else
+            {
+                throw new IllegalArgumentException("Could not find tunnel for address " +
clientAddress);
+            }
+        }
+
         private void closeServerSocket()
         {
             if (_serverSocket != null)
@@ -349,7 +385,6 @@ public class TCPTunneler
             }
         }
 
-
         private SocketTunnel removeTunnel(final InetSocketAddress clientAddress)
         {
             SocketTunnel client = null;
@@ -384,8 +419,8 @@ public class TCPTunneler
         private final Socket _serverSocket;
         private final TunnelListener _tunnelListener;
         private final AtomicBoolean _closed;
-        private final AutoClosingStreamForwarder _upStreamForwarder;
-        private final AutoClosingStreamForwarder _downStreamForwarder;
+        private final AutoClosingStreamForwarder _clientToServer;
+        private final AutoClosingStreamForwarder _serverToClient;
         private final InetSocketAddress _clientSocketAddress;
 
         public SocketTunnel(final Socket clientSocket,
@@ -400,8 +435,8 @@ public class TCPTunneler
             _tunnelListener = tunnelListener;
             _clientSocket.setKeepAlive(true);
             _serverSocket.setKeepAlive(true);
-            _upStreamForwarder = new AutoClosingStreamForwarder(new StreamForwarder(_clientSocket,
_serverSocket));
-            _downStreamForwarder = new AutoClosingStreamForwarder(new StreamForwarder(_serverSocket,
_clientSocket));
+            _clientToServer = new AutoClosingStreamForwarder(new StreamForwarder(_clientSocket,
_serverSocket));
+            _serverToClient = new AutoClosingStreamForwarder(new StreamForwarder(_serverSocket,
_clientSocket));
         }
 
         public void close()
@@ -422,11 +457,16 @@ public class TCPTunneler
 
         public void start(Executor executor) throws IOException
         {
-            executor.execute(_upStreamForwarder);
-            executor.execute(_downStreamForwarder);
+            executor.execute(_clientToServer);
+            executor.execute(_serverToClient);
             _tunnelListener.clientConnected(getClientAddress());
         }
 
+        public void stopClientToServerForwarding()
+        {
+            _clientToServer.stopForwarding();
+        }
+
         public boolean isClosed()
         {
             return _closed.get();
@@ -442,7 +482,6 @@ public class TCPTunneler
             return _clientSocketAddress;
         }
 
-
         private static void closeSocket(Socket socket)
         {
             if (socket != null)
@@ -484,6 +523,11 @@ public class TCPTunneler
                     currentThread.setName(originalThreadName);
                 }
             }
+
+            public void stopForwarding()
+            {
+                _streamForwarder.stopForwarding();
+            }
         }
     }
 
@@ -494,13 +538,13 @@ public class TCPTunneler
         private final InputStream _inputStream;
         private final OutputStream _outputStream;
         private final String _name;
+        private AtomicBoolean _stopForwarding = new AtomicBoolean();
 
         public StreamForwarder(Socket input, Socket output) throws IOException
         {
             _inputStream = input.getInputStream();
             _outputStream = output.getOutputStream();
-            _name = "Forwarder-" + input.getInetAddress().getHostName() + ":" + input.getPort()
+ "->"
-                    + output.getInetAddress().getHostName() + ":" + output.getPort();
+            _name = "Forwarder-" + input.getLocalSocketAddress() + "->" + output.getRemoteSocketAddress();
         }
 
         @Override
@@ -512,8 +556,16 @@ public class TCPTunneler
             {
                 while ((bytesRead = _inputStream.read(buffer)) != -1)
                 {
-                    _outputStream.write(buffer, 0, bytesRead);
-                    _outputStream.flush();
+                    if (!_stopForwarding.get())
+                    {
+                        _outputStream.write(buffer, 0, bytesRead);
+                        _outputStream.flush();
+                        LOGGER.debug("Forwarded {} byte(s)", bytesRead);
+                    }
+                    else
+                    {
+                        LOGGER.debug("Discarded {} byte(s)", bytesRead);
+                    }
                 }
             }
             catch (IOException e)
@@ -547,5 +599,11 @@ public class TCPTunneler
         {
             return _name;
         }
+
+        public void stopForwarding()
+        {
+            _stopForwarding.set(true);
+        }
+
     }
 }

Modified: qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
(original)
+++ qpid/java/trunk/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
Wed Feb 24 16:30:32 2016
@@ -529,6 +529,7 @@ public class QpidBrokerTestCase extends
     @Override
     protected void tearDown() throws java.lang.Exception
     {
+        _logger.debug("tearDown started");
         try
         {
             for (Connection c : _connections)

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java Wed Feb
24 16:30:32 2016
@@ -20,11 +20,15 @@ package org.apache.qpid.client;
 
 import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -34,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TCPTunneler;
 
 public class HeartbeatTest extends QpidBrokerTestCase
 {
@@ -192,6 +197,56 @@ public class HeartbeatTest extends QpidB
         conn.close();
     }
 
+    public void testClientStopsSendingHeartbeats_BrokerClosesConnection() throws Exception
+    {
+        try(TCPTunneler tcpTunneler = new TCPTunneler(getFailingPort(), "localhost", getDefaultBroker().getAmqpPort(),
1))
+        {
+            tcpTunneler.start();
+
+            final AtomicReference<InetSocketAddress> clientAddressRef = new AtomicReference<>();
+            tcpTunneler.addClientListener(new TCPTunneler.TunnelListener()
+            {
+                @Override
+                public void clientConnected(final InetSocketAddress clientAddress)
+                {
+                    clientAddressRef.set(clientAddress);
+                }
+
+                @Override
+                public void clientDisconnected(final InetSocketAddress clientAddress)
+                {
+                }
+            });
+
+            final CountDownLatch exceptionLatch = new CountDownLatch(1);
+            final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT,  tcpTunneler.getLocalPort(),
1);
+            AMQConnection conn = (AMQConnection) getConnection(new AMQConnectionURL(url));
+            conn.setHeartbeatListener(_listener);
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                @Override
+                public void onException(final JMSException exception)
+                {
+                    LOGGER.debug("Exception listener got exception", exception);
+                    exceptionLatch.countDown();
+                }
+            });
+            conn.start();
+
+            assertNotNull(clientAddressRef.get());
+
+            _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+            assertTrue("Too few heartbeats received: "+_listener.getHeartbeatsReceived()
+" (expected at least 2)", _listener.getHeartbeatsReceived() >=2);
+            assertTrue("Too few heartbeats sent "+_listener.getHeartbeatsSent() +" (expected
at least 2)", _listener.getHeartbeatsSent() >=2);
+
+            tcpTunneler.stopClientToServerForwarding(clientAddressRef.get());
+
+            exceptionLatch.await(5, TimeUnit.SECONDS);
+            assertTrue("Connection should be disconnected within timeout", conn.isConnected());
+        }
+    }
+
     private class TestListener implements HeartbeatListener
     {
         private final String _name;

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1732184&r1=1732183&r2=1732184&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
(original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Wed Feb 24 16:30:32 2016
@@ -143,11 +143,12 @@ public class ProtocolNegotiationTest ext
             boolean brokenPipe = false;
             while (timeout > System.currentTimeMillis())
             {
-                if (!writeHeartbeat(sender)) ;
+                if (!writeHeartbeat(sender))
                 {
                     brokenPipe = true;
                     break;
                 }
+                Thread.sleep(100);
             }
             assertTrue("Expected pipe to become broken within "
                        + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + " timeout", brokenPipe);
@@ -257,7 +258,6 @@ public class ProtocolNegotiationTest ext
             catch (IOException e)
             {
                 _success = false;
-                throw new RuntimeException(e);
             }
 
         }




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message