qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1558363 - in /qpid/trunk/qpid/java: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ client/src/main/java/org/apache/qpid/client/transport/ common/src/main/java/org/apache/qpid/transport/ test-profiles/
Date Wed, 15 Jan 2014 12:40:37 GMT
Author: rgodfrey
Date: Wed Jan 15 12:40:37 2014
New Revision: 1558363

URL: http://svn.apache.org/r1558363
Log:
QPID-5342 : [Java Client] 0-10 client does not close connection when heartbeat timeout interval
has been exceeded

Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
    qpid/trunk/qpid/java/test-profiles/Java010Excludes

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
Wed Jan 15 12:40:37 2014
@@ -24,7 +24,6 @@ import org.apache.qpid.protocol.ServerPr
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
-import org.apache.qpid.server.protocol.v0_10.ServerConnection;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
@@ -156,7 +155,8 @@ public class ProtocolEngine_0_10  extend
 
     public void readerIdle()
     {
-        //Todo
+        _connection.getLogActor().message(ConnectionMessages.IDLE_CLOSE());
+        _network.close();
     }
 
     public String getAddress()

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
Wed Jan 15 12:40:37 2014
@@ -73,7 +73,6 @@ public class ServerConnection extends Co
     private Port _port;
     private AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
-    private NetworkConnection _networkConnection;
     private Transport _transport;
     private volatile boolean _stopped;
 
@@ -528,7 +527,7 @@ public class ServerConnection extends Co
 
     public Principal getPeerPrincipal()
     {
-        return _networkConnection.getPeerPrincipal();
+        return getNetworkConnection().getPeerPrincipal();
     }
 
     @Override
@@ -543,16 +542,6 @@ public class ServerConnection extends Co
         super.setLocalAddress(localAddress);
     }
 
-    public void setNetworkConnection(NetworkConnection network)
-    {
-        _networkConnection = network;
-    }
-
-    public NetworkConnection getNetworkConnection()
-    {
-        return _networkConnection;
-    }
-
     public void doHeartbeat()
     {
         super.doHeartBeat();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Wed Jan 15 12:40:37 2014
@@ -236,7 +236,6 @@ public class ServerConnectionDelegate ex
         }
 
         final NetworkConnection networkConnection = sconn.getNetworkConnection();
-
         if(ok.hasHeartbeat())
         {
             int heartbeat = ok.getHeartbeat();
@@ -352,4 +351,11 @@ public class ServerConnectionDelegate ex
     {
         return _clientProperties == null ? null : (String) _clientProperties.get(ConnectionStartProperties.PRODUCT);
     }
+
+    @Override
+    protected int getHeartbeatMax()
+    {
+        int delay = (Integer)_broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY);
+        return delay == 0 ? super.getHeartbeatMax() : delay;
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/ClientConnectionDelegate.java
Wed Jan 15 12:40:37 2014
@@ -179,12 +179,9 @@ public class ClientConnectionDelegate ex
     }
 
     @Override
-    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat heartbeat)
     {
-        // ClientDelegate simply responds to heartbeats with heartbeats
         _heartbeatListener.heartbeatReceived();
-        super.connectionHeartbeat(conn, hearbeat);
-        _heartbeatListener.heartbeatSent();
     }
 
 
@@ -192,4 +189,11 @@ public class ClientConnectionDelegate ex
     {
         _heartbeatListener = listener == null ? HeartbeatListener.DEFAULT : listener;
     }
+
+    @Override
+    public void writerIdle(final Connection connection)
+    {
+        super.writerIdle(connection);
+        _heartbeatListener.heartbeatSent();
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
Wed Jan 15 12:40:37 2014
@@ -143,6 +143,8 @@ public class ClientDelegate extends Conn
                               actualHeartbeatInterval);
 
         int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+        conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
+        conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
         conn.setIdleTimeout(idleTimeout);
 
         int channelMax = tune.getChannelMax();

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Wed
Jan 15 12:40:37 2014
@@ -70,6 +70,7 @@ public class Connection extends Connecti
     public static final int MIN_USABLE_CHANNEL_NUM = 0;
     private long _lastSendTime;
     private long _lastReadTime;
+    private NetworkConnection _networkConnection;
 
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -229,12 +230,13 @@ public class Connection extends Connecti
                 addConnectionListener((ConnectionListener)secureReceiver);
             }
 
-            NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
+            _networkConnection = transport.connect(settings, secureReceiver, new ConnectionActivity());
 
-            setRemoteAddress(network.getRemoteAddress());
-            setLocalAddress(network.getLocalAddress());
 
-            final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+            setRemoteAddress(_networkConnection.getRemoteAddress());
+            setLocalAddress(_networkConnection.getLocalAddress());
+
+            final Sender<ByteBuffer> secureSender = securityLayer.sender(_networkConnection.getSender());
             if(secureSender instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureSender);
@@ -785,14 +787,26 @@ public class Connection extends Connecti
         @Override
         public void writerIdle()
         {
+            getConnectionDelegate().writerIdle(Connection.this);
             connectionHeartbeat();
         }
 
         @Override
         public void readerIdle()
         {
-            // TODO
-
+            log.error("Closing connection as no heartbeat or other activity detected within
specified interval");
+            _networkConnection.close();
         }
     }
+
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        _networkConnection = network;
+    }
+
+    public NetworkConnection getNetworkConnection()
+    {
+        return _networkConnection;
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
Wed Jan 15 12:40:37 2014
@@ -105,4 +105,9 @@ public abstract class ConnectionDelegate
             ssn.closed();
         }
     }
+
+    public void writerIdle(final Connection connection)
+    {
+        connection.doHeartBeat();
+    }
 }

Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1558363&r1=1558362&r2=1558363&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Wed Jan 15 12:40:37 2014
@@ -67,9 +67,5 @@ org.apache.qpid.client.failover.AddressB
 // QPID-3604: Immediate Prefetch no longer supported by 0-10
 org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
 
-// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the
server, not timeout based
-org.apache.qpid.client.HeartbeatTest#testUnidirectionalHeartbeating
-org.apache.qpid.client.HeartbeatTest#testHeartbeatsEnabledBrokerSide
-
 // Java 0-10 client does not support re-binding the queue to the same exchange
 org.apache.qpid.server.queue.QueueBindTest#testQueueCanBeReboundOnTopicExchange



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message