qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shus...@apache.org
Subject svn commit: r1186990 [21/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Date Thu, 20 Oct 2011 18:43:26 GMT
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Thu Oct 20 18:42:46 2011
@@ -20,18 +20,23 @@
  */
 package org.apache.qpid.server.connection;
 
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.List;
+
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 
 public interface IConnectionRegistry
 {
-
     public void initialise();
 
     public void close() throws AMQException;
+    
+    public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
+    
+    public List<AMQConnectionModel> getConnections();
 
-    public void registerConnection(AMQProtocolSession connnection);
-
-    public void deregisterConnection(AMQProtocolSession connnection);
+    public void registerConnection(AMQConnectionModel connnection);
 
+    public void deregisterConnection(AMQConnectionModel connnection);
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Thu Oct 20 18:42:46 2011
@@ -356,7 +356,7 @@ public abstract class AbstractExchange i
         _receivedMessageCount.incrementAndGet();
         _receivedMessageSize.addAndGet(message.getSize());
         final ArrayList<? extends BaseQueue> queues = doRoute(message);
-        if(queues != null && !queues.isEmpty())
+        if(!queues.isEmpty())
         {
             _routedMessageCount.incrementAndGet();
             _routedMessageSize.addAndGet(message.getSize());

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchangeMBean.java Thu Oct 20 18:42:46 2011
@@ -90,12 +90,12 @@ public abstract class AbstractExchangeMB
 
     public String getObjectInstanceName()
     {
-        return _exchange.getNameShortString().toString();
+        return ObjectName.quote(_exchange.getName());
     }
 
     public String getName()
     {
-        return _exchange.getNameShortString().toString();
+        return _exchange.getName();
     }
 
     public String getExchangeType()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Thu Oct 20 18:42:46 2011
@@ -30,15 +30,12 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.ExchangeConfig;
 
 import javax.management.JMException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
 
 public interface Exchange extends ExchangeReferrer, ExchangeConfig
 {
@@ -67,7 +64,12 @@ public interface Exchange extends Exchan
 
     void close() throws AMQException;
 
-
+    /**
+     * Returns a list of queues to which to route this message.   If there are
+     * no queues the empty list must be returned.
+     *
+     * @return list of queues to which to route the message.
+     */
     ArrayList<? extends BaseQueue> route(InboundMessage message);
 
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/exchange/headers/HeadersParser.java Thu Oct 20 18:42:46 2011
@@ -274,132 +274,6 @@ public class HeadersParser
         
     }
 
-    public static void main(String[] args) throws AMQFrameDecodingException
-    {        
-
-        FieldTable bindingTable = new FieldTable();
-
-        bindingTable.setString(new AMQShortString("x-match"),"all");
-        bindingTable.setInteger("a",1);
-        bindingTable.setVoid(new AMQShortString("b"));
-        bindingTable.setString("c","");
-        bindingTable.setInteger("d",4);
-        bindingTable.setInteger("e",1);
-
-
-
-        FieldTable bindingTable2 = new FieldTable();
-        bindingTable2.setString(new AMQShortString("x-match"),"all");
-        bindingTable2.setInteger("a",1);
-        bindingTable2.setVoid(new AMQShortString("b"));
-        bindingTable2.setString("c","");
-        bindingTable2.setInteger("d",4);
-        bindingTable2.setInteger("e",1);
-        bindingTable2.setInteger("f",1);
-
-
-        FieldTable table = new FieldTable();
-        table.setInteger("a",1);
-        table.setInteger("b",2);
-        table.setString("c","");
-        table.setInteger("d",4);
-        table.setInteger("e",1);
-        table.setInteger("f",1);
-        table.setInteger("h",1);
-        table.setInteger("i",1);
-        table.setInteger("j",1);
-        table.setInteger("k",1);
-        table.setInteger("l",1);
-
-        org.apache.mina.common.ByteBuffer buffer = org.apache.mina.common.ByteBuffer.allocate( (int) table.getEncodedSize());
-        EncodingUtils.writeFieldTableBytes(buffer, table);
-        buffer.flip();
-
-        FieldTable table2 = EncodingUtils.readFieldTable(buffer);
-
-
-
-        FieldTable bindingTable3 = new FieldTable();
-        bindingTable3.setString(new AMQShortString("x-match"),"any");
-        bindingTable3.setInteger("a",1);
-        bindingTable3.setInteger("b",3);
-
-
-        FieldTable bindingTable4 = new FieldTable();
-        bindingTable4.setString(new AMQShortString("x-match"),"any");
-        bindingTable4.setVoid(new AMQShortString("a"));
-
-
-        FieldTable bindingTable5 = new FieldTable();
-        bindingTable5.setString(new AMQShortString("x-match"),"all");
-        bindingTable5.setString(new AMQShortString("h"),"hello");
-
-        for(int i = 0; i < 100; i++)
-        {
-            printMatches(new FieldTable[] {bindingTable5} , table2);
-        }
-
-
-
-    }
-
-
-
-    private static void printMatches(final FieldTable[] bindingKeys, final FieldTable routingKey)
-    {
-        HeadersMatcherDFAState sm = null;
-        Map<HeaderMatcherResult, String> resultMap = new HashMap<HeaderMatcherResult, String>();
-
-        HeadersParser parser = new HeadersParser();
-
-        for(int i = 0; i < bindingKeys.length; i++)
-        {
-            HeaderMatcherResult r = new HeaderMatcherResult();
-            resultMap.put(r, bindingKeys[i].toString());
-
-
-            if(i==0)
-            {
-                sm = parser.createStateMachine(bindingKeys[i], r);
-            }
-            else
-            {
-                sm = sm.mergeStateMachines(parser.createStateMachine(bindingKeys[i], r));
-            }
-        }
-
-        Collection<HeaderMatcherResult> results = null;
-        long beforeTime = System.currentTimeMillis();
-        for(int i = 0; i < 1000000; i++)
-        {
-            routingKey.size();
-
-            assert sm != null;
-            results = sm.match(routingKey);
-
-        }
-        long elapsed = System.currentTimeMillis() - beforeTime;
-        System.out.println("1000000 Iterations took: " + elapsed);
-        Collection<String> resultStrings = new ArrayList<String>();
-
-        assert results != null;
-        for(HeaderMatcherResult result : results)
-        {
-            resultStrings.add(resultMap.get(result));
-        }
-
-        final ArrayList<String> nonMatches = new ArrayList<String>();
-        for(FieldTable key : bindingKeys)
-        {
-            nonMatches.add(key.toString());
-        }
-        nonMatches.removeAll(resultStrings);
-        System.out.println("\""+routingKey+"\" matched with " + resultStrings + " DID NOT MATCH with " + nonMatches);
-
-
-    }
-
-
     public final static class KeyValuePair
     {
         public final HeaderKey _key;

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Thu Oct 20 18:42:46 2011
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.transport.ServerSession;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -696,7 +697,7 @@ public class Bridge implements BridgeCon
 
             //TODO Handle the passing of non-null Filters and Arguments here
             
-            Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+            Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
                                                           _destination,
                                                           MessageAcceptMode.NONE,
                                                           MessageAcquireMode.PRE_ACQUIRED,
@@ -768,7 +769,7 @@ public class Bridge implements BridgeCon
 
           //TODO Handle the passing of non-null Filters and Arguments here
             
-            Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
+            Subscription_0_10 sub = SubscriptionFactoryImpl.INSTANCE.createSubscription((ServerSession)session,
                                                           _destination,
                                                           MessageAcceptMode.NONE,
                                                           MessageAcquireMode.PRE_ACQUIRED,

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java Thu Oct 20 18:42:46 2011
@@ -258,7 +258,6 @@ public class BrokerLink implements LinkC
                     _remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString();
                 }
                 _qpidConnection.setSessionFactory(new SessionFactory());
-                _qpidConnection.setAuthorizationID(_username == null ? "" : _username);
 
                 updateState(State.ESTABLISHING, State.OPERATIONAL);
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Thu Oct 20 18:42:46 2011
@@ -37,8 +37,8 @@ import org.apache.qpid.server.queue.Filt
 public class PropertyExpression implements Expression
 {
     // Constants - defined the same as JMS
-    private static final int NON_PERSISTENT = 1;
-    private static final int PERSISTENT = 2;
+    private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT }
+
     private static final int DEFAULT_PRIORITY = 4;
 
     private static final Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
@@ -172,13 +172,14 @@ public class PropertyExpression implemen
     {
         public Object evaluate(Filterable message)
         {
-                int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
+                JMSDeliveryMode mode = message.isPersistent() ? JMSDeliveryMode.PERSISTENT :
+                                                                JMSDeliveryMode.NON_PERSISTENT;
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("JMSDeliveryMode is :" + mode);
                 }
 
-                return mode;
+                return mode.toString();
         }
     }
 

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Thu Oct 20 18:42:46 2011
@@ -144,7 +144,7 @@ public class BasicConsumeMethodHandler i
                     _logger.debug("Closing connection due to invalid selector");
 
                     MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
-                    AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
+                    AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
                                                                                        new AMQShortString(ise.getMessage()),
                                                                                        body.getClazz(),
                                                                                        body.getMethod());

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Thu Oct 20 18:42:46 2011
@@ -162,14 +162,7 @@ public class BasicGetMethodHandler imple
         }
         else
         {
-            sub = new GetNoAckSubscription(channel,
-                                                 session,
-                                                 null,
-                                                 null,
-                                                 false,
-                                                 singleMessageCredit,
-                                                 getDeliveryMethod,
-                                                 getRecordMethod);
+            sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
         }
 
         queue.registerSubscription(sub,false);
@@ -180,27 +173,5 @@ public class BasicGetMethodHandler imple
 
     }
 
-    public static final class GetNoAckSubscription extends SubscriptionImpl.NoAckSubscription
-    {
-        public GetNoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
-                               AMQShortString consumerTag, FieldTable filters,
-                               boolean noLocal, FlowCreditManager creditManager,
-                                   ClientDeliveryMethod deliveryMethod,
-                                   RecordDeliveryMethod recordMethod)
-            throws AMQException
-        {
-            super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
-        }
-
-        public boolean isTransient()
-        {
-            return true;
-        }
 
-        public boolean wouldSuspend(QueueEntry msg)
-        {
-            return !getCreditManager().useCreditForMessage(msg.getMessage());
-        }
-
-    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Thu Oct 20 18:42:46 2011
@@ -68,5 +68,7 @@ public class ConnectionCloseMethodHandle
         ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
         session.writeFrame(responseBody.generateFrame(channelId));
 
+        session.closeProtocolSession();
+
     }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Thu Oct 20 18:42:46 2011
@@ -20,9 +20,9 @@
  */
 package org.apache.qpid.server.handler;
 
+
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
-
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionCloseBody;
@@ -68,7 +68,7 @@ public class ConnectionSecureOkMethodHan
         }
         MethodRegistry methodRegistry = session.getMethodRegistry();
         AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
-        switch (authResult.status)
+        switch (authResult.getStatus())
         {
             case ERROR:
                 Exception cause = authResult.getCause();
@@ -88,7 +88,10 @@ public class ConnectionSecureOkMethodHan
                 disposeSaslServer(session);
                 break;
             case SUCCESS:
-                _logger.info("Connected as: " + ss.getAuthorizationID());
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Connected as: " + UsernamePrincipal.getUsernamePrincipalFromSubject(authResult.getSubject()));
+                }
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
 
                 ConnectionTuneBody tuneBody =
@@ -96,13 +99,13 @@ public class ConnectionSecureOkMethodHan
                                                                 ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
                                                                 ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
                 session.writeFrame(tuneBody.generateFrame(0));
-                session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
+                session.setAuthorizedSubject(authResult.getSubject());
                 disposeSaslServer(session);                
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
 
-                ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+                ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
                 session.writeFrame(secureBody.generateFrame(0));
         }
     }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Thu Oct 20 18:42:46 2011
@@ -65,7 +65,6 @@ public class ConnectionStartOkMethodHand
         _logger.info("Locale selected: " + body.getLocale());
 
         AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
-
         SaslServer ss = null;
         try
         {                       
@@ -78,8 +77,7 @@ public class ConnectionStartOkMethodHand
 
             session.setSaslServer(ss);
 
-            AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
-
+            final AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
             //save clientProperties
             if (session.getClientProperties() == null)
             {
@@ -88,7 +86,7 @@ public class ConnectionStartOkMethodHand
 
             MethodRegistry methodRegistry = session.getMethodRegistry();
 
-            switch (authResult.status)
+            switch (authResult.getStatus())
             {
                 case ERROR:
                     Exception cause = authResult.getCause();
@@ -108,8 +106,11 @@ public class ConnectionStartOkMethodHand
                     break;
 
                 case SUCCESS:
-                    _logger.info("Connected as: " + ss.getAuthorizationID());
-                    session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Connected as: " + UsernamePrincipal.getUsernamePrincipalFromSubject(authResult.getSubject()));
+                    }
+                    session.setAuthorizedSubject(authResult.getSubject());
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
 
@@ -121,7 +122,7 @@ public class ConnectionStartOkMethodHand
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
 
-                    ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+                    ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
                     session.writeFrame(secureBody.generateFrame(0));
             }
         }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Thu Oct 20 18:42:46 2011
@@ -106,7 +106,7 @@ public class QueueDeclareHandler impleme
                 else
                 {
                     queue = createQueue(queueName, body, virtualHost, protocolConnection);
-                    queue.setPrincipalHolder(protocolConnection);
+                    queue.setAuthorizationHolder(protocolConnection);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
                         store.createQueue(queue, body.getArguments());
@@ -119,7 +119,7 @@ public class QueueDeclareHandler impleme
                     if (body.getExclusive())
                     {
                         queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
-                        queue.setPrincipalHolder(protocolConnection);
+                        queue.setAuthorizationHolder(protocolConnection);
 
                         if(!body.getDurable())
                         {

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/information/management/ServerInformationMBean.java Thu Oct 20 18:42:46 2011
@@ -22,9 +22,11 @@ package org.apache.qpid.server.informati
 
 import java.io.IOException;
 
+import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.management.common.mbeans.ServerInformation;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
 import javax.management.JMException;
 
@@ -34,12 +36,15 @@ public class ServerInformationMBean exte
 {
     private String buildVersion;
     private String productVersion;
+    private ApplicationRegistry registry;
     
-    public ServerInformationMBean(String buildVersion, String productVersion) throws JMException
+    public ServerInformationMBean(ApplicationRegistry applicationRegistry) throws JMException
     {
         super(ServerInformation.class, ServerInformation.TYPE);
-        this.buildVersion = buildVersion;
-        this.productVersion = productVersion;
+
+        registry = applicationRegistry;
+        buildVersion = QpidProperties.getBuildVersion();
+        productVersion = QpidProperties.getReleaseVersion();
     }
 
     public String getObjectInstanceName()
@@ -67,5 +72,75 @@ public class ServerInformationMBean exte
         return productVersion;
     }
 
+
+    public void resetStatistics() throws Exception
+    {
+        registry.resetStatistics();
+    }
+
+    public double getPeakMessageDeliveryRate()
+    {
+        return registry.getMessageDeliveryStatistics().getPeak();
+    }
+
+    public double getPeakDataDeliveryRate()
+    {
+        return registry.getDataDeliveryStatistics().getPeak();
+    }
+
+    public double getMessageDeliveryRate()
+    {
+        return registry.getMessageDeliveryStatistics().getRate();
+    }
+
+    public double getDataDeliveryRate()
+    {
+        return registry.getDataDeliveryStatistics().getRate();
+    }
+
+    public long getTotalMessagesDelivered()
+    {
+        return registry.getMessageDeliveryStatistics().getTotal();
+    }
+
+    public long getTotalDataDelivered()
+    {
+        return registry.getDataDeliveryStatistics().getTotal();
+    }
+
+    public double getPeakMessageReceiptRate()
+    {
+        return registry.getMessageReceiptStatistics().getPeak();
+    }
+
+    public double getPeakDataReceiptRate()
+    {
+        return registry.getDataReceiptStatistics().getPeak();
+    }
+
+    public double getMessageReceiptRate()
+    {
+        return registry.getMessageReceiptStatistics().getRate();
+    }
+
+    public double getDataReceiptRate()
+    {
+        return registry.getDataReceiptStatistics().getRate();
+    }
+
+    public long getTotalMessagesReceived()
+    {
+        return registry.getMessageReceiptStatistics().getTotal();
+    }
+
+    public long getTotalDataReceived()
+    {
+        return registry.getDataReceiptStatistics().getTotal();
+    }
+
+    public boolean isStatisticsEnabled()
+    {
+        return registry.isStatisticsEnabled();
+    }
     
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/actors/ManagementActor.java Thu Oct 20 18:42:46 2011
@@ -20,11 +20,15 @@
  */
 package org.apache.qpid.server.logging.actors;
 
-import org.apache.qpid.server.logging.LogMessage;
-import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.RootMessageLogger;
 
+import java.security.AccessController;
+import java.security.Principal;
 import java.text.MessageFormat;
+import java.util.Set;
+
+import javax.management.remote.JMXPrincipal;
+import javax.security.auth.Subject;
 
 /**
  * NOTE: This actor is not thread safe.
@@ -40,16 +44,23 @@ import java.text.MessageFormat;
  */
 public class ManagementActor extends AbstractActor
 {
+    /**
+     * Holds the principal name to display when principal subject is not available.
+     * <p>
+     * This is useful for cases when users invoke JMX operation over JConsole
+     * attached to the local JVM.
+     */
+    private static final String UNKNOWN_PRINCIPAL = "N/A";
+
     String _lastThreadName = null;
 
     /**
      * LOG FORMAT for the ManagementActor,
-     * Uses a MessageFormat call to insert the requried values according to
-     * these indicies:
+     * Uses a MessageFormat call to insert the required values according to
+     * these indices:
      *
-     * 0 - Connection ID
-     * 1 - User ID
-     * 2 - IP
+     * 0 - User ID
+     * 1 - IP
      */
     public static final String MANAGEMENT_FORMAT = "mng:{0}({1})";
 
@@ -75,19 +86,20 @@ public class ManagementActor extends Abs
             _lastThreadName = currentName;
 
             // Management Thread names have this format.
-            //RMI TCP Connection(2)-169.24.29.116
+            // RMI TCP Connection(2)-169.24.29.116
             // This is true for both LocalAPI and JMX Connections
             // However to be defensive lets test.
 
             String[] split = currentName.split("\\(");
             if (split.length == 2)
             {
-                String connectionID = split[1].split("\\)")[0];
                 String ip = currentName.split("-")[1];
-
-                actor = MessageFormat.format(MANAGEMENT_FORMAT,
-                                             connectionID,
-                                             ip);
+                String principalName = getPrincipalName();
+                if (principalName == null)
+                {
+                    principalName = UNKNOWN_PRINCIPAL;
+                }
+                actor = MessageFormat.format(MANAGEMENT_FORMAT, principalName, ip);
             }
             else
             {
@@ -105,6 +117,30 @@ public class ManagementActor extends Abs
         }
     }
 
+    /**
+     * Returns current JMX principal name.
+     *
+     * @return principal name or null if principal can not be found
+     */
+    protected String getPrincipalName()
+    {
+        String identity = null;
+
+        // retrieve Subject from current AccessControlContext
+        final Subject subject = Subject.getSubject(AccessController.getContext());
+        if (subject != null)
+        {
+            // retrieve JMXPrincipal from Subject
+            final Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class);
+            if (principals != null && !principals.isEmpty())
+            {
+                final Principal principal = principals.iterator().next();
+                identity = principal.getName();
+            }
+        }
+        return identity;
+    }
+
     public String getLogMessage()
     {
         updateLogString();

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Broker_logmessages.properties Thu Oct 20 18:42:46 2011
@@ -32,4 +32,7 @@ STOPPED = BRK-1005 : Stopped
 # 0 - path
 CONFIG = BRK-1006 : Using configuration : {0}
 # 0 - path
-LOG_CONFIG = BRK-1007 : Using logging configuration : {0}
\ No newline at end of file
+LOG_CONFIG = BRK-1007 : Using logging configuration : {0}
+
+STATS_DATA = BRK-1008 : {0,choice,0#delivered|1#received} : {1,number,#.###} kB/s peak : {2,number,#} bytes total
+STATS_MSGS = BRK-1009 : {0,choice,0#delivered|1#received} : {1,number,#.###} msg/s peak : {2,number,#} msgs total
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties Thu Oct 20 18:42:46 2011
@@ -28,3 +28,7 @@ PREFETCH_SIZE = CHN-1004 : Prefetch Size
 # 0 - queue causing flow control
 FLOW_ENFORCED = CHN-1005 : Flow Control Enforced (Queue {0})
 FLOW_REMOVED = CHN-1006 : Flow Control Removed
+# Channel Transactions
+# 0 - time in milliseconds
+OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
+IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/Exchange_logmessages.properties Thu Oct 20 18:42:46 2011
@@ -21,4 +21,5 @@
 # 0 - type
 # 1 - name
 CREATED = EXH-1001 : Create :[ Durable] Type: {0} Name: {1}
-DELETED = EXH-1002 : Deleted
\ No newline at end of file
+DELETED = EXH-1002 : Deleted
+DISCARDMSG = EXH-1003 : Discarded Message : Name: {0} Routing Key: {1}

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ManagementConsole_logmessages.properties Thu Oct 20 18:42:46 2011
@@ -30,4 +30,4 @@ STOPPED = MNG-1005 : Stopped
 # 0 - Path
 SSL_KEYSTORE = MNG-1006 : Using SSL Keystore : {0}
 OPEN = MNG-1007 : Open : User {0}
-CLOSE = MNG-1008 : Close
\ No newline at end of file
+CLOSE = MNG-1008 : Close : User {0}
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/messages/VirtualHost_logmessages.properties Thu Oct 20 18:42:46 2011
@@ -20,4 +20,7 @@
 #
 # 0 - name
 CREATED = VHT-1001 : Created : {0}
-CLOSED = VHT-1002 : Closed
\ No newline at end of file
+CLOSED = VHT-1002 : Closed
+
+STATS_DATA = VHT-1003 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} kB/s peak : {3,number,#} bytes total
+STATS_MSGS = VHT-1004 : {0} : {1,choice,0#delivered|1#received} : {2,number,#.###} msg/s peak : {3,number,#} msgs total`
\ No newline at end of file

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Thu Oct 20 18:42:46 2011
@@ -47,7 +47,7 @@ public class ChannelLogSubject extends A
          */
         setLogStringWithFormat(CHANNEL_FORMAT,
                                session.getSessionID(),
-                               session.getPrincipal().getName(),
+                               session.getAuthorizedPrincipal().getName(),
                                session.getRemoteAddress(),
                                session.getVirtualHost().getName(),
                                channel.getChannelId());

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Thu Oct 20 18:42:46 2011
@@ -56,7 +56,7 @@ public class ConnectionLogSubject extend
     {
         if (!_upToDate)
         {
-            if (_session.getPrincipal() != null)
+            if (_session.getAuthorizedPrincipal() != null)
             {
                 if (_session.getVirtualHost() != null)
                 {
@@ -72,7 +72,7 @@ public class ConnectionLogSubject extend
                      */
                     _logString = "[" + MessageFormat.format(CONNECTION_FORMAT, 
                                                             _session.getSessionID(), 
-                                                            _session.getPrincipal().getName(), 
+                                                            _session.getAuthorizedPrincipal().getName(), 
                                                             _session.getRemoteAddress(),
                                                             _session.getVirtualHost().getName()) 
                                  + "] ";
@@ -83,7 +83,7 @@ public class ConnectionLogSubject extend
                 {
                     _logString = "[" + MessageFormat.format(USER_FORMAT, 
                                                             _session.getSessionID(), 
-                                                            _session.getPrincipal().getName(), 
+                                                            _session.getAuthorizedPrincipal().getName(), 
                                                             _session.getRemoteAddress())
                                  + "] ";
 

Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -3,4 +3,5 @@
 /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
 /qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/management:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1072051-1185907

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Thu Oct 20 18:42:46 2011
@@ -26,8 +26,9 @@ import javax.management.NotCompliantMBea
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
-import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 
 /**
  * Provides implementation of the boilerplate ManagedObject interface. Most managed objects should find it useful
@@ -36,10 +37,14 @@ import org.apache.qpid.server.registry.A
  */
 public abstract class DefaultManagedObject extends StandardMBean implements ManagedObject
 {
+    private static final Logger LOGGER = Logger.getLogger(ApplicationRegistry.class);
+    
     private Class<?> _managementInterface;
 
     private String _typeName;
 
+    private ManagedObjectRegistry _registry;
+
     protected DefaultManagedObject(Class<?> managementInterface, String typeName)
         throws NotCompliantMBeanException
     {
@@ -65,23 +70,26 @@ public abstract class DefaultManagedObje
 
     public void register() throws JMException
     {
-        getManagedObjectRegistry().registerObject(this);
+        _registry = ApplicationRegistry.getInstance().getManagedObjectRegistry();
+        _registry.registerObject(this);
     }
 
-    protected ManagedObjectRegistry getManagedObjectRegistry()
-    {
-        return ApplicationRegistry.getInstance().getManagedObjectRegistry();
-    }
-
-    public void unregister() throws AMQException
+    public void unregister()
     {
         try
         {
-            getManagedObjectRegistry().unregisterObject(this);
+            if(_registry != null)
+            {
+                _registry.unregisterObject(this);
+            }
         }
         catch (JMException e)
         {
-            throw new AMQException("Error unregistering managed object: " + this + ": " + e, e);
+            LOGGER.error("Error unregistering managed object: " + this + ": " + e, e);
+        }
+        finally
+        {
+            _registry = null;
         }
     }
 
@@ -153,32 +161,4 @@ public abstract class DefaultManagedObje
             return "";
     }
 
-    protected static StringBuffer jmxEncode(StringBuffer jmxName, int attrPos)
-    {
-        for (int i = attrPos; i < jmxName.length(); i++)
-        {
-            if (jmxName.charAt(i) == ',')
-            {
-                jmxName.setCharAt(i, ';');
-            }
-            else if (jmxName.charAt(i) == ':')
-            {
-                jmxName.setCharAt(i, '-');
-            }
-            else if (jmxName.charAt(i) == '?' ||
-                    jmxName.charAt(i) == '*' ||
-                    jmxName.charAt(i) == '\\')
-            {
-                jmxName.insert(i, '\\');
-                i++;
-            }
-            else if (jmxName.charAt(i) == '\n')
-            {
-                jmxName.insert(i, '\\');
-                i++;
-                jmxName.setCharAt(i, 'n');
-            }
-        }
-        return jmxName;
-    }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Thu Oct 20 18:42:46 2011
@@ -20,32 +20,6 @@
  */
 package org.apache.qpid.server.management;
 
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
-
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
-import javax.management.ObjectName;
-import javax.management.NotificationListener;
-import javax.management.NotificationFilterSupport;
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXServiceURL;
-import javax.management.remote.MBeanServerForwarder;
-import javax.management.remote.JMXConnectionNotification;
-import javax.management.remote.rmi.RMIConnectorServer;
-import javax.management.remote.rmi.RMIJRMPServerImpl;
-import javax.management.remote.rmi.RMIServerImpl;
-import javax.rmi.ssl.SslRMIClientSocketFactory;
-import javax.rmi.ssl.SslRMIServerSocketFactory;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -64,7 +38,31 @@ import java.rmi.server.RMIClientSocketFa
 import java.rmi.server.RMIServerSocketFactory;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
-import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+import javax.management.NotificationFilterSupport;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectionNotification;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.MBeanServerForwarder;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.management.remote.rmi.RMIJRMPServerImpl;
+import javax.management.remote.rmi.RMIServerImpl;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import javax.rmi.ssl.SslRMIServerSocketFactory;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.security.auth.rmi.RMIPasswordAuthenticator;
 
 /**
  * This class starts up an MBeanserver. If out of the box agent has been enabled then there are no 
@@ -74,15 +72,14 @@ public class JMXManagedObjectRegistry im
 {
     private static final Logger _log = Logger.getLogger(JMXManagedObjectRegistry.class);
     
-    public static final String MANAGEMENT_PORT_CONFIG_PATH = "management.jmxport";
-    public static final int MANAGEMENT_PORT_DEFAULT = 8999;
-    public static final int PORT_EXPORT_OFFSET = 100;
-
     private final MBeanServer _mbeanServer;
     private JMXConnectorServer _cs;
     private Registry _rmiRegistry;
     private boolean _useCustomSocketFactory;
 
+    private final int _jmxPortRegistryServer;
+    private final int _jmxPortConnectorServer;
+
     public JMXManagedObjectRegistry() throws AMQException
     {
         _log.info("Initialising managed object registry using platform MBean server");
@@ -95,8 +92,11 @@ public class JMXManagedObjectRegistry im
         _mbeanServer =
                 platformServer ? ManagementFactory.getPlatformMBeanServer()
                 : MBeanServerFactory.createMBeanServer(ManagedObject.DOMAIN);
-    }
 
+        _jmxPortRegistryServer = appRegistry.getConfiguration().getJMXPortRegistryServer();
+        _jmxPortConnectorServer = appRegistry.getConfiguration().getJMXConnectorServerPort();
+
+    }
 
     public void start() throws IOException, ConfigurationException
     {
@@ -111,14 +111,7 @@ public class JMXManagedObjectRegistry im
         }
 
         IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
-        int port = appRegistry.getConfiguration().getJMXManagementPort();
 
-        //retrieve the Principal Database assigned to JMX authentication duties
-        String jmxDatabaseName = appRegistry.getConfiguration().getJMXPrincipalDatabase();
-        Map<String, PrincipalDatabase> map = appRegistry.getDatabaseManager().getDatabases();        
-        PrincipalDatabase db = map.get(jmxDatabaseName);
-
-        HashMap<String,Object> env = new HashMap<String,Object>();
 
         //Socket factories for the RMIConnectorServer, either default or SLL depending on configuration
         RMIClientSocketFactory csf;
@@ -200,7 +193,8 @@ public class JMXManagedObjectRegistry im
 
         //add a JMXAuthenticator implementation the env map to authenticate the RMI based JMX connector server
         RMIPasswordAuthenticator rmipa = new RMIPasswordAuthenticator();
-        rmipa.setPrincipalDatabase(db);
+        rmipa.setAuthenticationManager(appRegistry.getAuthenticationManager());
+        HashMap<String,Object> env = new HashMap<String,Object>();
         env.put(JMXConnectorServer.AUTHENTICATOR, rmipa);
 
         /*
@@ -211,14 +205,14 @@ public class JMXManagedObjectRegistry im
         System.setProperty("java.rmi.server.randomIDs", "true");
         if(_useCustomSocketFactory)
         {
-            _rmiRegistry = LocateRegistry.createRegistry(port, null, new CustomRMIServerSocketFactory());
+            _rmiRegistry = LocateRegistry.createRegistry(_jmxPortRegistryServer, null, new CustomRMIServerSocketFactory());
         }
         else
         {
-            _rmiRegistry = LocateRegistry.createRegistry(port, null, null);
+            _rmiRegistry = LocateRegistry.createRegistry(_jmxPortRegistryServer, null, null);
         }
         
-        CurrentActor.get().message(ManagementConsoleMessages.LISTENING("RMI Registry", port));
+        CurrentActor.get().message(ManagementConsoleMessages.LISTENING("RMI Registry", _jmxPortRegistryServer));
 
         /*
          * We must now create the RMI ConnectorServer manually, as the JMX Factory methods use RMI calls 
@@ -229,7 +223,7 @@ public class JMXManagedObjectRegistry im
          * The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer
          * on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's. 
          */
-        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env);
+        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(_jmxPortConnectorServer, csf, ssf, env);
         String localHost;
         try
         {
@@ -241,9 +235,9 @@ public class JMXManagedObjectRegistry im
         }
         final String hostname = localHost;
         final JMXServiceURL externalUrl = new JMXServiceURL(
-                "service:jmx:rmi://"+hostname+":"+(port+PORT_EXPORT_OFFSET)+"/jndi/rmi://"+hostname+":"+port+"/jmxrmi");
+                "service:jmx:rmi://"+hostname+":"+(_jmxPortConnectorServer)+"/jndi/rmi://"+hostname+":"+_jmxPortRegistryServer+"/jmxrmi");
 
-        final JMXServiceURL internalUrl = new JMXServiceURL("rmi", hostname, port+PORT_EXPORT_OFFSET);
+        final JMXServiceURL internalUrl = new JMXServiceURL("rmi", hostname, _jmxPortConnectorServer);
         _cs = new RMIConnectorServer(internalUrl, env, rmiConnectorServerStub, _mbeanServer)
         {   
             @Override  
@@ -312,7 +306,7 @@ public class JMXManagedObjectRegistry im
         _cs.start();
 
         String connectorServer = (sslEnabled ? "SSL " : "") + "JMX RMIConnectorServer";
-        CurrentActor.get().message(ManagementConsoleMessages.LISTENING(connectorServer, port + PORT_EXPORT_OFFSET));
+        CurrentActor.get().message(ManagementConsoleMessages.LISTENING(connectorServer, _jmxPortConnectorServer));
 
         CurrentActor.get().message(ManagementConsoleMessages.READY(false));
     }
@@ -407,7 +401,7 @@ public class JMXManagedObjectRegistry im
         if (_rmiRegistry != null)
         {
             // Stopping the RMI registry
-            CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _cs.getAddress().getPort() - PORT_EXPORT_OFFSET));
+            CurrentActor.get().message(ManagementConsoleMessages.SHUTTING_DOWN("RMI Registry", _jmxPortRegistryServer));
             try
             {
                 UnicastRemoteObject.unexportObject(_rmiRegistry, false);

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java Thu Oct 20 18:42:46 2011
@@ -26,8 +26,6 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.security.AccessControlContext;
 import java.security.AccessController;
-import java.security.Principal;
-import java.util.Properties;
 import java.util.Set;
 
 import javax.management.Attribute;
@@ -44,7 +42,6 @@ import javax.management.remote.MBeanServ
 import javax.security.auth.Subject;
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.ManagementActor;
 import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
 import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -52,20 +49,15 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.access.Operation;
 
 /**
- * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. This implements
- * the logic for allowing the users to invoke MBean operations and implements the restrictions for readOnly, readWrite
- * and admin users.
+ * This class can be used by the JMXConnectorServer as an InvocationHandler for the mbean operations. It delegates
+ * JMX access decisions to the SecurityPlugin.
  */
 public class MBeanInvocationHandlerImpl implements InvocationHandler, NotificationListener
 {
     private static final Logger _logger = Logger.getLogger(MBeanInvocationHandlerImpl.class);
 
-    public final static String ADMIN = "admin";
-    public final static String READWRITE = "readwrite";
-    public final static String READONLY = "readonly";
     private final static String DELEGATE = "JMImplementation:type=MBeanServerDelegate";
     private MBeanServer _mbs;
-    private static Properties _userRoles = new Properties();
     private static ManagementActor  _logActor;
     
     public static MBeanServerForwarder newProxyInstance()
@@ -137,14 +129,13 @@ public class MBeanInvocationHandlerImpl 
             Set<JMXPrincipal> principals = subject.getPrincipals(JMXPrincipal.class);
             if (principals == null || principals.isEmpty())
             {
-                throw new SecurityException("Access denied: no principal");
+                throw new SecurityException("Access denied: no JMX principal");
             }
-			
-            // Save the principal
-            Principal principal = principals.iterator().next();
-            SecurityManager.setThreadPrincipal(principal);
-    
-			// Get the component, type and impact, which may be null
+
+            // Save the subject
+            SecurityManager.setThreadSubject(subject);
+   
+            // Get the component, type and impact, which may be null
             String type = getType(method, args);
             String vhost = getVirtualHost(method, args);
             int impact = getImpact(method, args);
@@ -213,6 +204,20 @@ public class MBeanInvocationHandlerImpl 
             ObjectName object = (ObjectName) args[0];
             String vhost = object.getKeyProperty("VirtualHost");
             
+            if(vhost != null)
+            {
+                try
+                {
+                    //if the name is quoted in the ObjectName, unquote it
+                    vhost = ObjectName.unquote(vhost);
+                }
+                catch(IllegalArgumentException e)
+                {
+                    //ignore, this just means the name is not quoted
+                    //and can be left unchanged
+                }
+            }
+
             return vhost;
         }
         return null;
@@ -272,7 +277,7 @@ public class MBeanInvocationHandlerImpl 
             }
             catch (JMException ex)
             {
-                ex.printStackTrace();
+                _logger.error("Unable to determine mbean impact for method : " + mbeanMethod, ex);
             }
         }
 
@@ -308,7 +313,7 @@ public class MBeanInvocationHandlerImpl 
         else if (notification.getType().equals(JMXConnectionNotification.CLOSED) ||
                  notification.getType().equals(JMXConnectionNotification.FAILED))
         {
-            _logActor.message(ManagementConsoleMessages.CLOSE());
+            _logActor.message(ManagementConsoleMessages.CLOSE(user));
         }
     }
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java Thu Oct 20 18:42:46 2011
@@ -37,7 +37,7 @@ public class ContentHeaderBodyAdapter im
 
     private BasicContentHeaderProperties getProperties()
     {
-        return (BasicContentHeaderProperties) _contentHeaderBody.properties;
+        return (BasicContentHeaderProperties) _contentHeaderBody.getProperties();
     }
 
     public String getCorrelationId()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java Thu Oct 20 18:42:46 2011
@@ -29,7 +29,10 @@ import org.apache.qpid.framing.abstracti
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.MessageMetaDataType;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.util.ByteBufferInputStream;
+import org.apache.qpid.server.util.ByteBufferOutputStream;
 
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.Set;
 
@@ -120,38 +123,38 @@ public class MessageMetaData implements 
         return size;
     }
 
+
     public int writeToBuffer(int offset, ByteBuffer dest)
     {
-        ByteBuffer src = ByteBuffer.allocate((int)getStorableSize());
-
-        org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(src);
-        EncodingUtils.writeInteger(minaSrc, _contentHeaderBody.getSize());
-        _contentHeaderBody.writePayload(minaSrc);
-        EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getExchange());
-        EncodingUtils.writeShortStringBytes(minaSrc, _messagePublishInfo.getRoutingKey());
-        byte flags = 0;
-        if(_messagePublishInfo.isMandatory())
-        {
-            flags |= MANDATORY_FLAG;
-        }
-        if(_messagePublishInfo.isImmediate())
+        int oldPosition = dest.position();
+        try
         {
-            flags |= IMMEDIATE_FLAG;
+
+            DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(dest));
+            EncodingUtils.writeInteger(dataOutputStream, _contentHeaderBody.getSize());
+            _contentHeaderBody.writePayload(dataOutputStream);
+            EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getExchange());
+            EncodingUtils.writeShortStringBytes(dataOutputStream, _messagePublishInfo.getRoutingKey());
+            byte flags = 0;
+            if(_messagePublishInfo.isMandatory())
+            {
+                flags |= MANDATORY_FLAG;
+            }
+            if(_messagePublishInfo.isImmediate())
+            {
+                flags |= IMMEDIATE_FLAG;
+            }
+            dest.put(flags);
+            dest.putLong(_arrivalTime);
+
         }
-        EncodingUtils.writeByte(minaSrc, flags);
-        EncodingUtils.writeLong(minaSrc,_arrivalTime);
-        src.position(minaSrc.position());
-        src.flip();
-        src.position(offset);
-        src = src.slice();
-        if(dest.remaining() < src.limit())
+        catch (IOException e)
         {
-            src.limit(dest.remaining());
+            // This shouldn't happen as we are not actually using anything that can throw an IO Exception
+            throw new RuntimeException(e);
         }
-        dest.put(src);
-
 
-        return src.limit();
+        return dest.position()-oldPosition;
     }
 
     public int getContentSize()
@@ -161,7 +164,7 @@ public class MessageMetaData implements 
 
     public boolean isPersistent()
     {
-        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.properties);
+        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) (_contentHeaderBody.getProperties());
         return properties.getDeliveryMode() ==  BasicContentHeaderProperties.PERSISTENT;
     }
 
@@ -173,14 +176,15 @@ public class MessageMetaData implements 
         {
             try
             {
-                org.apache.mina.common.ByteBuffer minaSrc = org.apache.mina.common.ByteBuffer.wrap(buf);
-                int size = EncodingUtils.readInteger(minaSrc);
-                ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(minaSrc, size);
-                final AMQShortString exchange = EncodingUtils.readAMQShortString(minaSrc);
-                final AMQShortString routingKey = EncodingUtils.readAMQShortString(minaSrc);
+                ByteBufferInputStream bbis = new ByteBufferInputStream(buf);
+                DataInputStream dais = new DataInputStream(bbis);
+                int size = EncodingUtils.readInteger(dais);
+                ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(dais, size);
+                final AMQShortString exchange = EncodingUtils.readAMQShortString(dais);
+                final AMQShortString routingKey = EncodingUtils.readAMQShortString(dais);
 
-                final byte flags = EncodingUtils.readByte(minaSrc);
-                long arrivalTime = EncodingUtils.readLong(minaSrc);
+                final byte flags = EncodingUtils.readByte(dais);
+                long arrivalTime = EncodingUtils.readLong(dais);
 
                 MessagePublishInfo publishBody =
                         new MessagePublishInfo()
@@ -216,6 +220,10 @@ public class MessageMetaData implements 
             {
                 throw new RuntimeException(e);
             }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
 
         }
     };
@@ -229,7 +237,7 @@ public class MessageMetaData implements 
     {
         private BasicContentHeaderProperties getProperties()
         {
-            return (BasicContentHeaderProperties) getContentHeaderBody().properties;
+            return (BasicContentHeaderProperties) getContentHeaderBody().getProperties();
         }
 
         public String getCorrelationId()

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Thu Oct 20 18:42:46 2011
@@ -34,7 +34,7 @@ import org.apache.qpid.transport.codec.B
 import java.nio.ByteBuffer;
 import java.lang.ref.SoftReference;
 
-public class MessageMetaData_0_10 implements StorableMessageMetaData
+public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
 {
     private Header _header;
     private DeliveryProperties _deliveryProps;
@@ -194,6 +194,12 @@ public class MessageMetaData_0_10 implem
         return _deliveryProps == null ? 0L : _deliveryProps.getExpiration();
     }
 
+    public boolean isRedelivered()
+    {
+        // The *Message* is never redelivered, only queue entries are...
+        return false;
+    }
+
     public long getArrivalTime()
     {
         return _arrivalTime;
@@ -239,4 +245,6 @@ public class MessageMetaData_0_10 implem
 
         }
     }
+
+
 }

Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Thu Oct 20 18:42:46 2011
@@ -26,6 +26,7 @@
  */
 package org.apache.qpid.server.output.amqp0_8;
 
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
@@ -34,22 +35,18 @@ import org.apache.qpid.server.output.Hea
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.transport.DeliveryProperties;
 
-import java.nio.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
 
     private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
 
-    private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
-            METHOD_REGISTRY.getProtocolVersionMethodConverter();
-
     public static Factory getInstanceFactory()
     {
         return new Factory()
@@ -62,6 +59,7 @@ public class ProtocolOutputConverterImpl
         };
     }
 
+
     private final AMQProtocolSession _protocolSession;
 
     private ProtocolOutputConverterImpl(AMQProtocolSession session)
@@ -78,10 +76,11 @@ public class ProtocolOutputConverterImpl
     public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
-        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+        writeMessageDelivery(entry, channelId, deliverBody);
     }
 
+
     private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
             throws AMQException
     {
@@ -93,65 +92,120 @@ public class ProtocolOutputConverterImpl
         {
             final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
             BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
-            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
-            chb.bodySize = message.getSize(); 
+            ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize();
             return chb;
         }
     }
 
 
-    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+            throws AMQException
     {
-        AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
-        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
     }
 
-    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
             throws AMQException
     {
 
 
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
-
+        int bodySize = (int) message.getSize();
 
-        final int bodySize = (int) message.getSize();
         if(bodySize == 0)
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
+            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+                                                                             contentHeaderBody);
+
             writeFrame(compositeBlock);
         }
         else
         {
             int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
 
-            final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-            ByteBuffer buf = ByteBuffer.allocate(capacity);
 
-            int writtenSize = 0;
+            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+            int writtenSize = capacity;
+
+            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
 
-            writtenSize += message.getContent(buf, writtenSize);
-            buf.flip();
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
+            CompositeAMQBodyBlock
+                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
             writeFrame(compositeBlock);
 
             while(writtenSize < bodySize)
             {
-                buf = java.nio.ByteBuffer.allocate(capacity);
-                writtenSize += message.getContent(buf, writtenSize);
-                buf.flip();
-                writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
+
+                writeFrame(new AMQFrame(channelId, body));
             }
+        }
+    }
 
+    private class MessageContentSourceBody implements AMQBody
+    {
+        public static final byte TYPE = 3;
+        private int _length;
+        private MessageContentSource _message;
+        private int _offset;
+
+        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        {
+            _message = message;
+            _offset = offset;
+            _length = length;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
         }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public void writePayload(DataOutputStream buffer) throws IOException
+        {
+            byte[] data = new byte[_length];
+
+            _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+            buffer.write(data);
+        }
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+    {
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      contentHeaderBody);
+        return contentHeader;
     }
 
 
-    private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+        writeMessageDelivery(entry, channelId, deliver);
+    }
+
+
+    private AMQBody createEncodedDeliverBody(QueueEntry entry,
+                                              final long deliveryTag,
+                                              final AMQShortString consumerTag)
             throws AMQException
     {
+
         final AMQShortString exchangeName;
         final AMQShortString routingKey;
 
@@ -172,21 +226,58 @@ public class ProtocolOutputConverterImpl
 
         final boolean isRedelivered = entry.isRedelivered();
 
+        final AMQBody returnBlock = new AMQBody()
+        {
+
+            public AMQBody _underlyingBody;
 
-        BasicDeliverBody deliverBody =
-                METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
-                                                      deliveryTag,
-                                                      isRedelivered,
-                                                      exchangeName,
-                                                      routingKey);
+            public AMQBody createAMQBody()
+            {
+                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                                                              deliveryTag,
+                                                              isRedelivered,
+                                                              exchangeName,
+                                                              routingKey);
 
-        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
 
-        return deliverFrame;
+
+
+            }
+
+            public byte getFrameType()
+            {
+                return AMQMethodBody.TYPE;
+            }
+
+            public int getSize()
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                return _underlyingBody.getSize();
+            }
+
+            public void writePayload(DataOutputStream buffer) throws IOException
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                _underlyingBody.writePayload(buffer);
+            }
+
+            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                throws AMQException
+            {
+                throw new AMQException("This block should never be dispatched!");
+            }
+        };
+        return returnBlock;
     }
 
-    private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
+    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
             throws AMQException
     {
         final AMQShortString exchangeName;
@@ -215,9 +306,8 @@ public class ProtocolOutputConverterImpl
                                                     exchangeName,
                                                     routingKey,
                                                     queueSize);
-        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
-        return getOkFrame;
+        return getOkBody;
     }
 
     public byte getProtocolMinorVersion()
@@ -230,31 +320,28 @@ public class ProtocolOutputConverterImpl
         return getProtocolSession().getProtocolMajorVersion();
     }
 
-    private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+                                             int replyCode,
+                                             AMQShortString replyText) throws AMQException
     {
+
         BasicReturnBody basicReturnBody =
                 METHOD_REGISTRY.createBasicReturnBody(replyCode,
-                                                     replyText,
-                                                     messagePublishInfo.getExchange(),
-                                                     messagePublishInfo.getRoutingKey());
-        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+                        replyText,
+                        messagePublishInfo.getExchange(),
+                        messagePublishInfo.getRoutingKey());
 
-        return returnFrame;
+
+        return basicReturnBody;
     }
 
-    public void writeReturn(MessagePublishInfo messagePublishInfo,
-                            ContentHeaderBody header,
-                            MessageContentSource content,
-                            int channelId,
-                            int replyCode,
-                            AMQShortString replyText)
+    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
             throws AMQException
     {
 
-        AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
-
-        writeMessageDelivery(content, header, channelId, returnFrame);
+        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
 
+        writeMessageDelivery(message, header, channelId, returnFrame);
     }
 
 
@@ -266,8 +353,68 @@ public class ProtocolOutputConverterImpl
 
     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
     {
+
         BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
         writeFrame(basicCancelOkBody.generateFrame(channelId));
 
     }
+
+
+    public static final class CompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final AMQBody _contentBody;
+        private final int _channel;
+
+
+        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+            _contentBody = contentBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+        }
+
+        public void writePayload(DataOutputStream buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+        }
+    }
+
+    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final int _channel;
+
+
+        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+        }
+
+        public void writePayload(DataOutputStream buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+        }
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message