qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r1064084 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/registry/ broker/src/main/java/org/apache/qpid/server/transport/ common/src/main/java/org/apache/qpid/transport/ common/src/main/java/org/apache/qpid/transport...
Date Thu, 27 Jan 2011 11:18:40 GMT
Author: robbie
Date: Thu Jan 27 11:18:39 2011
New Revision: 1064084

URL: http://svn.apache.org/viewvc?rev=1064084&view=rev
Log:
QPID-3021: set the session/connection actor when the connection recieves new events, ensure
the correct thread logs close

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
Thu Jan 27 11:18:39 2011
@@ -41,7 +41,7 @@ public class ConfigurationFileApplicatio
     public void close()
     {
         //Set the Actor for Broker Shutdown
-        CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
+        CurrentActor.set(new BrokerActor(_rootMessageLogger));
         try
         {
             super.close();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
Thu Jan 27 11:18:39 2011
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.*;
 
 import java.text.MessageFormat;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -39,11 +40,13 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ProtocolEvent;
 
 public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject
 {
     private ConnectionConfig _config;
     private Runnable _onOpenTask;
+    private AtomicBoolean _logClosed = new AtomicBoolean(false);
     private LogActor _actor = GenericActor.getInstance(this);
 
     public ServerConnection()
@@ -73,6 +76,14 @@ public class ServerConnection extends Co
         
         if (state == State.CLOSED)
         {
+            logClosed();
+        }
+    }
+
+    protected void logClosed()
+    {
+        if(_logClosed.compareAndSet(false, true))
+        {
             CurrentActor.get().message(this, ConnectionMessages.CLOSE());
         }
     }
@@ -135,13 +146,36 @@ public class ServerConnection extends Co
         ((ServerSession)session).close();
     }
 
-    public String toLogString() {
+    @Override
+    public void received(ProtocolEvent event)
+    {
+        ServerSession channel = (ServerSession) getSession(event.getChannel());
+        LogActor channelActor = null;
+
+        if (channel != null)
+        {
+            channelActor = channel.getLogActor();
+        }
+
+        CurrentActor.set(channelActor == null ? _actor : channelActor);
+        try
+        {
+            super.received(event);
+        }
+        finally
+        {
+            CurrentActor.remove();
+        }
+    }
+
+    public String toLogString()
+    {
         boolean hasVirtualHost = (null != this.getVirtualHost());
         boolean hasPrincipal = (null != getAuthorizationID());
 
         if (hasPrincipal && hasVirtualHost)
         {
-            return " [" +
+            return "[" +
                     MessageFormat.format(CONNECTION_FORMAT,
                                          getConnectionId(),
                                          getClientId(),
@@ -151,7 +185,7 @@ public class ServerConnection extends Co
         }
         else if (hasPrincipal)
         {
-            return " [" +
+            return "[" +
                     MessageFormat.format(USER_FORMAT,
                                          getConnectionId(),
                                          getClientId(),
@@ -161,7 +195,7 @@ public class ServerConnection extends Co
         }
         else
         {
-            return " [" +
+            return "[" +
                     MessageFormat.format(SOCKET_FORMAT,
                                          getConnectionId(),
                                          getConfig().getAddress())

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
Thu Jan 27 11:18:39 2011
@@ -84,7 +84,22 @@ public class ServerConnectionDelegate ex
 
     }
 
-    @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+    @Override
+    public void connectionClose(Connection conn, ConnectionClose close)
+    {
+        try
+        {
+            ((ServerConnection) conn).logClosed();
+        }
+        finally
+        {
+            super.connectionClose(conn, close);
+        }
+        
+    }
+
+    @Override
+    public void connectionOpen(Connection conn, ConnectionOpen open)
     {
         ServerConnection sconn = (ServerConnection) conn;
         

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Thu Jan 27 11:18:39 2011
@@ -32,6 +32,7 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.configuration.ConnectionConfig;
 import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
@@ -57,7 +58,6 @@ import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.Session.State;
 
 import java.lang.ref.WeakReference;
 import java.security.Principal;
@@ -81,6 +81,7 @@ public class ServerSession extends Sessi
     private final UUID _id;
     private ConnectionConfig _connectionConfig;
     private long _createTime = System.currentTimeMillis();
+    private LogActor _actor = GenericActor.getInstance(this);
 
     public static interface MessageDispositionChangeListener
     {
@@ -130,7 +131,7 @@ public class ServerSession extends Sessi
 
         if (state == State.OPEN)
         {
-	        GenericActor.getInstance(this).message(ChannelMessages.CREATE());
+	        _actor.message(ChannelMessages.CREATE());
         }
     }
 
@@ -595,6 +596,11 @@ public class ServerSession extends Sessi
         return getConnection().getClientId();
     }
 
+    public LogActor getLogActor()
+    {
+        return _actor;
+    }
+
     public LogSubject getLogSubject()
     {
         return (LogSubject) this;
@@ -603,7 +609,7 @@ public class ServerSession extends Sessi
     @Override
     public String toLogString()
     {
-       return " [" +
+       return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
                                    getConnection().getConnectionId(),
                                    getClientID(),

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
(original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Thu Jan 27 11:18:39 2011
@@ -1223,7 +1223,6 @@ public class ServerSessionDelegate exten
     @Override
     public void closed(Session session)
     {
-        super.closed(session);
         for(Subscription_0_10 sub : getSubscriptions(session))
         {
             ((ServerSession)session).unregister(sub);

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu
Jan 27 11:18:39 2011
@@ -434,7 +434,7 @@ public class Connection extends Connecti
         }
     }
 
-    Session getSession(int channel)
+    protected Session getSession(int channel)
     {
         synchronized (lock)
         {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java?rev=1064084&r1=1064083&r2=1064084&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
(original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
Thu Jan 27 11:18:39 2011
@@ -52,7 +52,6 @@ public class IoNetworkTransport implemen
     private long timeout = 60000; 
     private ConnectionSettings settings;    
     
-    @Override
     public void init(ConnectionSettings settings)
     {
         try
@@ -84,20 +83,17 @@ public class IoNetworkTransport implemen
         }
     }
 
-    @Override
     public void receiver(Receiver<ByteBuffer> delegate)
     {
         receiver = new IoReceiver(this, delegate,
                 2*settings.getReadBufferSize() , timeout);
     }
 
-    @Override
     public Sender<ByteBuffer> sender()
     {
         return new IoSender(this, 2*settings.getWriteBufferSize(), timeout);
     }
-    
-    @Override
+
     public void close()
     {
         



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message