qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r815705 - in /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server: connection/ConnectionRegistry.java protocol/AMQProtocolEngine.java protocol/AMQProtocolSession.java virtualhost/VirtualHost.java
Date Wed, 16 Sep 2009 10:07:45 GMT
Author: aidan
Date: Wed Sep 16 10:07:44 2009
New Revision: 815705

URL: http://svn.apache.org/viewvc?rev=815705&view=rev
Log:
QPID-2106: Don't close connections if the broker has asked it to close and
there's still stuff to process. Let the cleanup thread do that so that publishes
which are denied don't result in instant connection death.


Modified:
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
(original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
Wed Sep 16 10:07:44 2009
@@ -44,6 +44,14 @@
     {
 
     }
+    
+    public void expireClosedChannels()
+    {
+        for (AMQProtocolSession connection : _registry)
+        {
+            connection.closeIfLingeringClosedChannels();
+        }
+    }
 
     /** Close all of the currently open connections. */
     public void close() throws AMQException

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
(original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
Wed Sep 16 10:07:44 2009
@@ -29,6 +29,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
@@ -135,7 +137,7 @@
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
-    private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
+    private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer,
Long>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
@@ -293,12 +295,8 @@
                 }
                 else
                 {
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Channel[" + channelId + "] awaiting closure. Should
close socket as client did not close-ok :" + frame);
-                    }
-
-                    closeProtocolSession();
+                    // The channel has been told to close, we don't process any more frames
until
+                    // it's closed. 
                     return;
                 }
             }
@@ -513,7 +511,7 @@
 
     public boolean channelAwaitingClosure(int channelId)
     {
-        return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
+        return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
     }
 
     public void addChannel(AMQChannel channel) throws AMQException
@@ -525,7 +523,7 @@
 
         final int channelId = channel.getChannelId();
 
-        if (_closingChannelsList.contains(channelId))
+        if (_closingChannelsList.containsKey(channelId))
         {
             throw new AMQException("Session is marked awaiting channel close");
         }
@@ -632,7 +630,7 @@
 
     private void markChannelAwaitingCloseOk(int channelId)
     {
-        _closingChannelsList.add(channelId);
+        _closingChannelsList.put(channelId, System.currentTimeMillis());
     }
 
     /**
@@ -1023,7 +1021,19 @@
     {
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
-    
-    
+
+    @Override
+    public void closeIfLingeringClosedChannels()
+    {
+        for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+        {
+            if (id.getValue() + 30000 > System.currentTimeMillis())
+            {
+                // We have a channel that we closed 30 seconds ago. Client's dead, kill the
connection
+                _logger.error("Closing connection as channel was closed more than 30 seconds
ago and no ChannelCloseOk has been processed");
+                closeProtocolSession();
+            }
+        }
+    }
     
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
(original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Wed Sep 16 10:07:44 2009
@@ -225,5 +225,7 @@
     void commitTransactions(AMQChannel channel) throws AMQException;
 
     List<AMQChannel> getChannels();
+
+    void closeIfLingeringClosedChannels();
     
 }

Modified: qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=815705&r1=815704&r2=815705&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
(original)
+++ qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
Wed Sep 16 10:07:44 2009
@@ -267,6 +267,14 @@
             _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
                                                    period / 2,
                                                    period);
+            
+            class ForceChannelClosuresTask extends TimerTask
+            {
+                public void run()
+                {
+                    _connectionRegistry.expireClosedChannels();
+                }
+            }
         }
     }
     



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message