qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r1209052 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/management/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/transport/ broker/src/test/java/org/apache/qpid/s...
Date Thu, 01 Dec 2011 12:39:16 GMT
Author: robbie
Date: Thu Dec  1 12:39:14 2011
New Revision: 1209052

URL: http://svn.apache.org/viewvc?rev=1209052&view=rev
Log:
QPID-2243: 0-10 protocol connections do not have a matching JMX MBean to allow management.

Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
    qpid/trunk/qpid/java/test-profiles/Java010Excludes

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java?rev=1209052&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java Thu Dec  1 12:39:14 2011
@@ -0,0 +1,71 @@
+package org.apache.qpid.server.management;
+
+import javax.management.Notification;
+
+import javax.management.JMException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.monitor.MonitorNotification;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularType;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+
+public abstract class AbstractAMQManagedConnectionObject extends AMQManagedObject implements ManagedConnection
+{
+    protected final String _name;
+
+    protected static final OpenType[] _channelAttributeTypes = { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN };
+    protected static final CompositeType _channelType;
+    protected static final TabularType _channelsType;
+
+    protected static final String BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION_STR =
+                                    "Broker Management Console has closed the connection.";
+
+    static
+    {
+        try
+        {
+            _channelType = new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
+                            COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes);
+            _channelsType = new TabularType("Channels", "Channels", _channelType, (String[]) TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]));
+        }
+        catch (JMException ex)
+        {
+            // This is not expected to ever occur.
+            throw new RuntimeException("Got JMException in static initializer.", ex);
+        }
+    }
+
+    protected AbstractAMQManagedConnectionObject(final String remoteAddress) throws NotCompliantMBeanException
+    {
+        super(ManagedConnection.class, ManagedConnection.TYPE);
+        _name = "anonymous".equals(remoteAddress) ? (remoteAddress + hashCode()) : remoteAddress;
+    }
+
+    @Override
+    public String getObjectInstanceName()
+    {
+        return ObjectName.quote(_name);
+    }
+
+    public void notifyClients(String notificationMsg)
+    {
+        final Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
+                                                System.currentTimeMillis(), notificationMsg);
+        _broadcaster.sendNotification(n);
+    }
+
+    @Override
+    public MBeanNotificationInfo[] getNotificationInfo()
+    {
+        String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
+        String name = MonitorNotification.class.getName();
+        String description = "Channel count has reached threshold value";
+        MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
+
+        return new MBeanNotificationInfo[] { info1 };
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=1209052&r1=1209051&r2=1209052&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Thu Dec  1 12:39:14 2011
@@ -39,89 +39,44 @@ package org.apache.qpid.server.protocol;
 
 import java.util.Date;
 import java.util.List;
-
 import javax.management.JMException;
 import javax.management.MBeanException;
-import javax.management.MBeanNotificationInfo;
 import javax.management.NotCompliantMBeanException;
-import javax.management.Notification;
-import javax.management.ObjectName;
-import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
 import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
 import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.ManagementActor;
-import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.AbstractAMQManagedConnectionObject;
 import org.apache.qpid.server.management.ManagedObject;
 
 /**
  * This MBean class implements the management interface. In order to make more attributes, operations and notifications
  * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
  */
-@MBeanDescription("Management Bean for an AMQ Broker Connection")
-public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
+@MBeanDescription("Management Bean for an AMQ Broker 0-9-1/0-9/0-8 Connections")
+public class AMQProtocolSessionMBean extends AbstractAMQManagedConnectionObject
 {
     private AMQProtocolSession _protocolSession = null;
-    private String _name = null;
 
-    // openmbean data types for representing the channel attributes
-
-    private static final OpenType[] _channelAttributeTypes =
-        { SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER, SimpleType.BOOLEAN };
-    private static CompositeType _channelType = null; // represents the data type for channel data
-    private static TabularType _channelsType = null; // Data type for list of channels type
     private static final AMQShortString BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION =
-        new AMQShortString("Broker Management Console has closed the connection.");
+                                        new AMQShortString(BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION_STR);
 
-    @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
+    @MBeanConstructor("Creates an MBean exposing an AMQ Broker 0-9-1/0-9/0-8 Connection")
     public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
     {
-        super(ManagedConnection.class, ManagedConnection.TYPE);
+        super(amqProtocolSession.getRemoteAddress().toString());
         _protocolSession = amqProtocolSession;
-        String remote = getRemoteAddress();
-        _name = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
-        init();
-    }
-
-    static
-    {
-        try
-        {
-            init();
-        }
-        catch (JMException ex)
-        {
-            // This is not expected to ever occur.
-            throw new RuntimeException("Got JMException in static initializer.", ex);
-        }
-    }
-
-    /**
-     * initialises the openmbean data types
-     */
-    private static void init() throws OpenDataException
-    {
-        _channelType =
-            new CompositeType("Channel", "Channel Details", COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]),
-                    COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), _channelAttributeTypes);
-        _channelsType = new TabularType("Channels", "Channels", _channelType, TABULAR_UNIQUE_INDEX.toArray(new String[TABULAR_UNIQUE_INDEX.size()]));
     }
 
     public String getClientId()
@@ -169,16 +124,6 @@ public class AMQProtocolSessionMBean ext
         return _protocolSession.getMaximumNumberOfChannels();
     }
 
-    public void setMaximumNumberOfChannels(Long value)
-    {
-        _protocolSession.setMaximumNumberOfChannels(value);
-    }
-
-    public String getObjectInstanceName()
-    {
-        return ObjectName.quote(_name);
-    }
-
     /**
      * commits transactions for a transactional channel
      *
@@ -321,25 +266,6 @@ public class AMQProtocolSessionMBean ext
         }
     }
 
-    @Override
-    public MBeanNotificationInfo[] getNotificationInfo()
-    {
-        String[] notificationTypes = new String[] { MonitorNotification.THRESHOLD_VALUE_EXCEEDED };
-        String name = MonitorNotification.class.getName();
-        String description = "Channel count has reached threshold value";
-        MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
-
-        return new MBeanNotificationInfo[] { info1 };
-    }
-
-    public void notifyClients(String notificationMsg)
-    {
-        Notification n =
-            new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this, ++_notificationSequenceNumber,
-                System.currentTimeMillis(), notificationMsg);
-        _broadcaster.sendNotification(n);
-    }
-
     public void resetStatistics() throws Exception
     {
         _protocolSession.resetStatistics();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1209052&r1=1209051&r2=1209052&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Thu Dec  1 12:39:14 2011
@@ -24,6 +24,14 @@ import static org.apache.qpid.server.log
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
 
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+
+import org.apache.qpid.server.management.ManagedObject;
+
+import org.apache.qpid.server.management.Managable;
+
 import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -55,7 +63,7 @@ import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolEvent;
 import org.apache.qpid.transport.Session;
 
-public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements Managable, AMQConnectionModel, LogSubject, AuthorizationHolder
 {
     private ConnectionConfig _config;
     private Runnable _onOpenTask;
@@ -67,6 +75,10 @@ public class ServerConnection extends Co
     private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
     private final long _connectionId;
+
+    private ServerConnectionMBean _mBean;
+    private VirtualHost _virtualHost;
+    private AtomicLong _lastIoTime = new AtomicLong();
     
     public ServerConnection(final long connectionId)
     {
@@ -133,9 +145,6 @@ public class ServerConnection extends Co
         super.setConnectionDelegate(delegate);
     }
 
-    private VirtualHost _virtualHost;
-
-
     public VirtualHost getVirtualHost()
     {
         return _virtualHost;
@@ -144,8 +153,18 @@ public class ServerConnection extends Co
     public void setVirtualHost(VirtualHost virtualHost)
     {
         _virtualHost = virtualHost;
-        
+
         initialiseStatistics();
+
+        try
+        {
+            _mBean = new ServerConnectionMBean(this);
+            _mBean.register();
+        }
+        catch (JMException jme)
+        {
+            log.error("Unable to create mBean for ServerConnection",jme);
+        }
     }
 
     public void setConnectionConfig(final ConnectionConfig config)
@@ -190,6 +209,7 @@ public class ServerConnection extends Co
     @Override
     public void received(ProtocolEvent event)
     {
+        _lastIoTime.set(System.currentTimeMillis());
         if (event.isConnectionControl())
         {
             CurrentActor.set(_actor);
@@ -260,6 +280,11 @@ public class ServerConnection extends Co
     public void close(AMQConstant cause, String message) throws AMQException
     {
         closeSubscriptions();
+        if (_mBean != null)
+        {
+            _mBean.unregister();
+            _mBean = null;
+        }
         ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
         try
         {
@@ -405,6 +430,11 @@ public class ServerConnection extends Co
     public void closed()
     {
         closeSubscriptions();
+        if (_mBean != null)
+        {
+            _mBean.unregister();
+            _mBean = null;
+        }
         super.closed();
     }
 
@@ -416,4 +446,30 @@ public class ServerConnection extends Co
         }
     }
 
+    @Override
+    public ManagedObject getManagedObject()
+    {
+        return _mBean;
+    }
+
+    @Override
+    public void send(ProtocolEvent event)
+    {
+        _lastIoTime.set(System.currentTimeMillis());
+        super.send(event);
+    }
+
+    public AtomicLong getLastIoTime()
+    {
+        return _lastIoTime;
+    }
+
+    void checkForNotification()
+    {
+        int channelsCount = getSessionModels().size();
+        if (_mBean != null && channelsCount >= getConnectionDelegate().getChannelMax())
+        {
+            _mBean.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1209052&r1=1209051&r2=1209052&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Thu Dec  1 12:39:14 2011
@@ -28,10 +28,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
-
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
-
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerConfig;
@@ -49,6 +47,7 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.ConnectionCloseCode;
 import org.apache.qpid.transport.ConnectionOpen;
 import org.apache.qpid.transport.ConnectionOpenOk;
+import org.apache.qpid.transport.ConnectionStartOk;
 import org.apache.qpid.transport.ConnectionTuneOk;
 import org.apache.qpid.transport.ServerDelegate;
 import org.apache.qpid.transport.Session;
@@ -62,6 +61,8 @@ public class ServerConnectionDelegate ex
 {
     private final String _localFQDN;
     private final IApplicationRegistry _appRegistry;
+    private int _maxNoOfChannels;
+    private Map<String,Object> _clientProperties;
 
     public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
     {
@@ -77,6 +78,7 @@ public class ServerConnectionDelegate ex
         
         _appRegistry = appRegistry;
         _localFQDN = localFQDN;
+        _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
     }
 
     private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
@@ -154,7 +156,7 @@ public class ServerConnectionDelegate ex
     public void connectionOpen(Connection conn, ConnectionOpen open)
     {
         final ServerConnection sconn = (ServerConnection) conn;
-        
+
         VirtualHost vhost;
         String vhostName;
         if(open.hasVirtualHost())
@@ -222,7 +224,12 @@ public class ServerConnectionDelegate ex
     @Override
     protected int getChannelMax()
     {
-        return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+        return _maxNoOfChannels;
+    }
+
+    protected void setChannelMax(int channelMax)
+    {
+        _maxNoOfChannels = channelMax;
     }
 
     @Override public void sessionDetach(Connection conn, SessionDetach dtc)
@@ -253,6 +260,7 @@ public class ServerConnectionDelegate ex
         {
             ssn = sessionAttachImpl(conn, atc);
             conn.registerSession(ssn);
+            ((ServerConnection)conn).checkForNotification();
         }
         else
         {
@@ -279,4 +287,16 @@ public class ServerConnectionDelegate ex
         }
         return true;
     }
+
+    @Override
+    public void connectionStartOk(Connection conn, ConnectionStartOk ok)
+    {
+        _clientProperties = ok.getClientProperties();
+        super.connectionStartOk(conn, ok);
+    }
+
+    public Map<String,Object> getClientProperties()
+    {
+        return _clientProperties;
+    }
 }

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java?rev=1209052&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionMBean.java Thu Dec  1 12:39:14 2011
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import javax.management.JMException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
+import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.ManagementActor;
+import org.apache.qpid.server.management.AbstractAMQManagedConnectionObject;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+
+/**
+ * This MBean class implements the management interface. In order to make more attributes, operations and notifications
+ * available over JMX simply augment the ManagedConnection interface and add the appropriate implementation here.
+ */
+@MBeanDescription("Management Bean for an AMQ Broker 0-10 Connection")
+public class ServerConnectionMBean extends AbstractAMQManagedConnectionObject
+{
+    private final ServerConnection _serverConnection;
+
+    @MBeanConstructor("Creates an MBean exposing an AMQ Broker 0-10 Connection")
+    protected ServerConnectionMBean(final ServerConnection serverConnection) throws NotCompliantMBeanException
+    {
+        super(serverConnection.getConfig().getAddress());
+        _serverConnection = serverConnection;
+    }
+
+    @Override
+    public ManagedObject getParentObject()
+    {
+        return _serverConnection.getVirtualHost().getManagedObject();
+    }
+
+    @Override
+    public String getClientId()
+    {
+        return _serverConnection.getClientId();
+    }
+
+    @Override
+    public String getAuthorizedId()
+    {
+        return _serverConnection.getAuthorizedPrincipal().getName();
+    }
+
+    @Override
+    public String getVersion()
+    {
+        return String.valueOf(_serverConnection.getConnectionDelegate().getClientProperties().get(ClientProperties.version.toString()));
+    }
+
+    @Override
+    public String getRemoteAddress()
+    {
+        return _serverConnection.getConfig().getAddress();
+    }
+
+    @Override
+    public Date getLastIoTime()
+    {
+        return new Date(_serverConnection.getLastIoTime().longValue());
+    }
+
+    @Override
+    public Long getMaximumNumberOfChannels()
+    {
+        return (long) _serverConnection.getConnectionDelegate().getChannelMax();
+    }
+
+    @Override
+    public TabularData channels() throws IOException, JMException
+    {
+        final TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
+        final List<AMQSessionModel> list = _serverConnection.getSessionModels();
+
+        for (final AMQSessionModel channel : list)
+        {
+            final ServerSession session = (ServerSession)channel;
+            Object[] itemValues =
+                {
+                    session.getChannel(),
+                    session.isTransactional(),
+                    null,
+                    session.getUnacknowledgedMessageCount(),
+                    session.getBlocking()
+                };
+
+            final CompositeData channelData = new CompositeDataSupport(_channelType,
+                    COMPOSITE_ITEM_NAMES_DESC.toArray(new String[COMPOSITE_ITEM_NAMES_DESC.size()]), itemValues);
+            channelsList.put(channelData);
+        }
+        return channelsList;
+    }
+
+    @Override
+    public void commitTransactions(int channelId) throws JMException
+    {
+        final ServerSession session = (ServerSession)_serverConnection.getSession(channelId);
+        if (session == null)
+        {
+            throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+        }
+        else if (session.isTransactional())
+        {
+            CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+            try
+            {
+                session.commit();
+            }
+            finally
+            {
+                CurrentActor.remove();
+            }
+        }
+    }
+
+    @Override
+    public void rollbackTransactions(int channelId) throws JMException
+    {
+        final ServerSession session = (ServerSession)_serverConnection.getSession(channelId);
+        if (session == null)
+        {
+            throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
+        }
+        else if (session.isTransactional())
+        {
+            CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
+            try
+            {
+                session.rollback();
+            }
+            finally
+            {
+                CurrentActor.remove();
+            }
+        }
+    }
+
+    @Override
+    public void closeConnection() throws Exception
+    {
+        _serverConnection.mgmtClose();
+    }
+
+    @Override
+    public void resetStatistics() throws Exception
+    {
+        _serverConnection.resetStatistics();
+    }
+
+    @Override
+    public double getPeakMessageDeliveryRate()
+    {
+        return _serverConnection.getMessageDeliveryStatistics().getPeak();
+    }
+
+    @Override
+    public double getPeakDataDeliveryRate()
+    {
+        return _serverConnection.getDataDeliveryStatistics().getPeak();
+    }
+
+    @Override
+    public double getMessageDeliveryRate()
+    {
+        return _serverConnection.getMessageDeliveryStatistics().getRate();
+    }
+
+    @Override
+    public double getDataDeliveryRate()
+    {
+        return _serverConnection.getDataDeliveryStatistics().getRate();
+    }
+
+    @Override
+    public long getTotalMessagesDelivered()
+    {
+        return _serverConnection.getMessageDeliveryStatistics().getTotal();
+    }
+
+    @Override
+    public long getTotalDataDelivered()
+    {
+        return _serverConnection.getDataDeliveryStatistics().getTotal();
+    }
+
+    @Override
+    public double getPeakMessageReceiptRate()
+    {
+        return _serverConnection.getMessageReceiptStatistics().getPeak();
+    }
+
+    @Override
+    public double getPeakDataReceiptRate()
+    {
+        return _serverConnection.getDataReceiptStatistics().getPeak();
+    }
+
+    @Override
+    public double getMessageReceiptRate()
+    {
+        return _serverConnection.getMessageReceiptStatistics().getRate();
+    }
+
+    @Override
+    public double getDataReceiptRate()
+    {
+        return _serverConnection.getDataReceiptStatistics().getRate();
+    }
+
+    @Override
+    public long getTotalMessagesReceived()
+    {
+        return _serverConnection.getMessageReceiptStatistics().getTotal();
+    }
+
+    @Override
+    public long getTotalDataReceived()
+    {
+        return _serverConnection.getDataReceiptStatistics().getTotal();
+    }
+
+    @Override
+    public boolean isStatisticsEnabled()
+    {
+        return _serverConnection.isStatisticsEnabled();
+    }
+
+    @Override
+    public void setStatisticsEnabled(boolean enabled)
+    {
+        _serverConnection.setStatisticsEnabled(enabled);
+    }
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1209052&r1=1209051&r2=1209052&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Dec  1 12:39:14 2011
@@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.security.auth.Subject;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
@@ -698,4 +696,14 @@ public class ServerSession extends Sessi
             unregister(subscription_0_10);
         }
     }
+
+    public int getUnacknowledgedMessageCount()
+    {
+        return _messageDispositionListenerMap.size();
+    }
+
+    public boolean getBlocking()
+    {
+        return false; //TODO: Blocking not implemented on 0-10 yet.
+    }
 }

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java?rev=1209052&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java Thu Dec  1 12:39:14 2011
@@ -0,0 +1,151 @@
+package org.apache.qpid.server.configuration;
+
+import java.util.UUID;
+
+public class MockConnectionConfig implements ConnectionConfig
+{
+
+    public MockConnectionConfig(UUID _id, ConnectionConfigType _configType,
+                    ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent, boolean _durable,
+                    long _createTime, VirtualHostConfig _virtualHost, String _address, Boolean _incoming,
+                    Boolean _systemConnection, Boolean _federationLink, String _authId, String _remoteProcessName,
+                    Integer _remotePID, Integer _remoteParentPID, ConfigStore _configStore, Boolean _shadow)
+    {
+        super();
+        this._id = _id;
+        this._configType = _configType;
+        this._parent = _parent;
+        this._durable = _durable;
+        this._createTime = _createTime;
+        this._virtualHost = _virtualHost;
+        this._address = _address;
+        this._incoming = _incoming;
+        this._systemConnection = _systemConnection;
+        this._federationLink = _federationLink;
+        this._authId = _authId;
+        this._remoteProcessName = _remoteProcessName;
+        this._remotePID = _remotePID;
+        this._remoteParentPID = _remoteParentPID;
+        this._configStore = _configStore;
+        this._shadow = _shadow;
+    }
+
+    private UUID _id;
+    private ConnectionConfigType _configType;
+    private ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent;
+    private boolean _durable;
+    private long _createTime;
+    private VirtualHostConfig _virtualHost;
+    private String _address;
+    private Boolean _incoming;
+    private Boolean _systemConnection;
+    private Boolean _federationLink;
+    private String _authId;
+    private String _remoteProcessName;
+    private Integer _remotePID;
+    private Integer _remoteParentPID;
+    private ConfigStore _configStore;
+    private Boolean _shadow;
+
+    @Override
+    public UUID getId()
+    {
+        return _id;
+    }
+
+    @Override
+    public ConnectionConfigType getConfigType()
+    {
+        return _configType;
+    }
+
+    @Override
+    public ConfiguredObject<ConnectionConfigType, ConnectionConfig> getParent()
+    {
+        return _parent;
+    }
+
+    @Override
+    public boolean isDurable()
+    {
+        return _durable;
+    }
+
+    @Override
+    public long getCreateTime()
+    {
+        return _createTime;
+    }
+
+    @Override
+    public VirtualHostConfig getVirtualHost()
+    {
+        return _virtualHost;
+    }
+
+    @Override
+    public String getAddress()
+    {
+        return _address;
+    }
+
+    @Override
+    public Boolean isIncoming()
+    {
+        return _incoming;
+    }
+
+    @Override
+    public Boolean isSystemConnection()
+    {
+        return _systemConnection;
+    }
+
+    @Override
+    public Boolean isFederationLink()
+    {
+        return _federationLink;
+    }
+
+    @Override
+    public String getAuthId()
+    {
+        return _authId;
+    }
+
+    @Override
+    public String getRemoteProcessName()
+    {
+        return _remoteProcessName;
+    }
+
+    @Override
+    public Integer getRemotePID()
+    {
+        return _remotePID;
+    }
+
+    @Override
+    public Integer getRemoteParentPID()
+    {
+        return _remoteParentPID;
+    }
+
+    @Override
+    public ConfigStore getConfigStore()
+    {
+        return _configStore;
+    }
+
+    @Override
+    public Boolean isShadow()
+    {
+        return _shadow;
+    }
+
+    @Override
+    public void mgmtClose()
+    {
+    }
+
+}

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=1209052&r1=1209051&r2=1209052&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Thu Dec  1 12:39:14 2011
@@ -20,23 +20,21 @@
  */
 package org.apache.qpid.server.protocol;
 
-import junit.framework.TestCase;
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.management.common.mbeans.ManagedConnection;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
-
-import javax.management.JMException;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
@@ -67,7 +65,7 @@ public class AMQProtocolSessionMBeanTest
         assertTrue(channelCount == 2);
 
         // general properties test
-        _mbean.setMaximumNumberOfChannels(1000L);
+        _protocolSession.setMaximumNumberOfChannels(1000L);
         assertTrue(_mbean.getMaximumNumberOfChannels() == 1000L);
 
         // check APIs

Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java?rev=1209052&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java (added)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transport/ServerConnectionMBeanTest.java Thu Dec  1 12:39:14 2011
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.JMException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.server.configuration.MockConnectionConfig;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.Session;
+
+public class ServerConnectionMBeanTest extends InternalBrokerBaseCase
+{
+    private ServerConnection _serverConnection;
+    private ServerSessionMock _serverSession;
+    private ServerConnectionMBean _mbean;
+    private List<Session> _sessions = new ArrayList<Session>();
+
+    @Override
+    public  void setUp() throws Exception
+    {
+        super.setUp();
+
+        final VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+        _serverConnection = new ServerConnection(1)
+        {
+            protected Collection<Session> getChannels()
+            {
+                return _sessions;
+            }
+            public Session getSession(int channelId)
+            {
+                for(Session session : _sessions)
+                {
+                    if (session.getChannel() == channelId)
+                    {
+                        return session;
+                    }
+                }
+                return null;
+            }
+            @Override
+            public AtomicLong getLastIoTime()
+            {
+                return new AtomicLong(1);
+            }
+        };
+        final MockConnectionConfig config = new MockConnectionConfig(UUID.randomUUID(), null, null,
+                                                                    false, 1, vhost, "address", Boolean.TRUE, Boolean.TRUE, Boolean.TRUE,
+                                                                    "authid", "remoteProcessName", new Integer(1967), new Integer(1970), vhost.getConfigStore(), Boolean.FALSE);
+        _serverConnection.setConnectionConfig(config);
+        _serverConnection.setVirtualHost(vhost);
+        _serverConnection.setConnectionDelegate(new ServerConnectionDelegate(getRegistry(), ""));
+        _serverSession = new ServerSessionMock(_serverConnection, 1);
+        _mbean = (ServerConnectionMBean) _serverConnection.getManagedObject();
+    }
+
+    public void testChannels() throws Exception
+    {
+        // check the channel count is correct
+        TabularData tabularData = _mbean.channels();
+
+        int channelCount = tabularData.size();
+        assertEquals("Unexpected number of channels",1,channelCount);
+        _sessions.add(new ServerSession(_serverConnection, new ServerSessionDelegate(),
+                        new Binary(getName().getBytes()), 2 , _serverConnection.getConfig()));
+
+        channelCount = _mbean.channels().size();
+        assertEquals("Unexpected number of channels",2,channelCount);
+
+        final CompositeData chanresult = tabularData.get(new Integer[]{1});
+        assertNotNull(chanresult);
+        assertEquals("Unexpected channel id", new Integer(1),(Integer)chanresult.get(ManagedConnection.CHAN_ID));
+        assertNull("Unexpected default queue", chanresult.get(ManagedConnection.DEFAULT_QUEUE));
+        assertFalse("Unexpected transactional flag", (Boolean)chanresult.get(ManagedConnection.TRANSACTIONAL));
+        assertFalse("Flow should have been blocked", (Boolean)chanresult.get(ManagedConnection.FLOW_BLOCKED));
+        assertEquals("Unexpected unack'd count", new Integer(1967), (Integer)chanresult.get(ManagedConnection.UNACKED_COUNT));
+    }
+
+    public void testMaxChannels() throws Exception
+    {
+        _serverConnection.getConnectionDelegate().setChannelMax(10001);
+        assertEquals("Max channels not got correctly", new Long(10001), _mbean.getMaximumNumberOfChannels());
+    }
+
+    public void testRollback() throws Exception
+    {
+        _mbean.rollbackTransactions(1);
+        assertFalse("Rollback performed despite not being transacted", _serverSession.isRolledback());
+
+        _serverSession.setTransactional(true);
+        _mbean.rollbackTransactions(1);
+        assertTrue("Rollback not performed", _serverSession.isRolledback());
+
+        try
+        {
+            _mbean.rollbackTransactions(2);
+            fail("Exception expected");
+        }
+        catch (JMException jme)
+        {
+            //pass
+        }
+    }
+
+    public void testCommit() throws Exception
+    {
+        _mbean.commitTransactions(1);
+        assertFalse("Commit performed despite not being transacted", _serverSession.isCommitted());
+
+        _serverSession.setTransactional(true);
+        _mbean.commitTransactions(1);
+        assertTrue("Commit not performed", _serverSession.isCommitted());
+
+        try
+        {
+            _mbean.commitTransactions(2);
+            fail("Exception expected");
+        }
+        catch (JMException jme)
+        {
+            //pass
+        }
+    }
+
+    public void testGetName()
+    {
+        assertEquals("Unexpected Object Instance Name", "\"address\"", _mbean.getObjectInstanceName());
+    }
+
+    public void testEnableStatistics()
+    {
+        assertFalse("Unexpected statistics enable flag", _mbean.isStatisticsEnabled());
+        _mbean.setStatisticsEnabled(true);
+        assertTrue("Unexpected statistics enable flag", _mbean.isStatisticsEnabled());
+    }
+
+    public void testLastIOTime()
+    {
+        assertEquals("Unexpected last IO time", new Date(1), _mbean.getLastIoTime());
+    }
+
+    private class ServerSessionMock extends ServerSession
+    {
+        private int _channelId = 0;
+        private boolean _committed = false;
+        private boolean _rolledback = false;
+        private boolean _transacted = false;
+
+        ServerSessionMock(Connection connection, int channelId)
+        {
+            super(connection, new ServerSessionDelegate(), new Binary(String.valueOf(channelId).getBytes()), 1 , _serverConnection.getConfig());
+            _channelId = channelId;
+            _sessions.add(this);
+        }
+
+        public int getChannel()
+        {
+            return _channelId;
+        }
+
+        @Override
+        public void commit()
+        {
+            _committed = true;
+        }
+
+        @Override
+        public void rollback()
+        {
+            _rolledback = true;
+        }
+
+        public boolean isCommitted()
+        {
+            return _committed;
+        }
+
+        public boolean isRolledback()
+        {
+            return _rolledback;
+        }
+
+        @Override
+        public int getUnacknowledgedMessageCount()
+        {
+            return 1967;
+        }
+
+        public boolean isTransactional()
+        {
+            return _transacted;
+        }
+
+        public void setTransactional(boolean transacted)
+        {
+            _transacted = transacted;
+        }
+    }
+}

Modified: qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java?rev=1209052&r1=1209051&r2=1209052&view=diff
==============================================================================
--- qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java (original)
+++ qpid/trunk/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java Thu Dec  1 12:39:14 2011
@@ -80,33 +80,12 @@ public interface ManagedConnection
     Date getLastIoTime();
 
     /**
-     * Tells the total number of bytes written till now.
-     * @return number of bytes written.
-     *
-    @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
-    Long getWrittenBytes();
-    */
-    /**
-     * Tells the total number of bytes read till now.
-     * @return number of bytes read.
-     *
-    @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
-    Long getReadBytes();
-    */
-
-    /**
      * Threshold high value for no of channels.  This is useful in setting notifications or
      * taking required action is there are more channels being created.
      * @return threshold limit for no of channels
      */
-    Long getMaximumNumberOfChannels();
-
-    /**
-     * Sets the threshold high value for number of channels for a connection
-     * @param value
-     */
     @MBeanAttribute(name="MaximumNumberOfChannels", description="The threshold high value for number of channels for this connection")
-    void setMaximumNumberOfChannels(Long value);
+    Long getMaximumNumberOfChannels();
 
     //********** Operations *****************//
 

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java?rev=1209052&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java Thu Dec  1 12:39:14 2011
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.management.jmx;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.JMException;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.TabularData;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
+{
+    /**
+     * JMX helper.
+     */
+    private JMXTestUtils _jmxUtils;
+    private Connection _connection;
+
+    public void setUp() throws Exception
+    {
+        _jmxUtils = new JMXTestUtils(this);
+        _jmxUtils.setUp();
+        super.setUp();
+        _jmxUtils.open();
+        _connection = getConnection();
+    }
+
+    public void tearDown() throws Exception
+    {
+        if (_jmxUtils != null)
+        {
+            _jmxUtils.close();
+        }
+        super.tearDown();
+    }
+
+    public void testChannels() throws Exception
+    {
+        final String queueName = getTestQueueName();
+
+        final Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        final Destination destination = session.createQueue(queueName);
+        final MessageConsumer consumer = session.createConsumer(destination);
+
+        final int numberOfMessages = 2;
+        sendMessage(session, destination, numberOfMessages);
+        _connection.start();
+
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            final Message m = consumer.receive(1000l);
+            assertNotNull("Message " + i + " is not received", m);
+        }
+
+        List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+        assertNotNull("Connection MBean is not found", connections);
+        assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+        final ManagedConnection mBean = connections.get(0);
+        assertNotNull("Connection MBean is null", mBean);
+
+        TabularData channelsData = mBean.channels();
+        assertNotNull("Channels data are null", channelsData);
+        assertEquals("Unexpected number of rows in channel table", 1, channelsData.size());
+
+        final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
+        final CompositeDataSupport row = rowItr.next();
+        Number unackCount = (Number) row.get(ManagedConnection.UNACKED_COUNT);
+        final Boolean transactional = (Boolean) row.get(ManagedConnection.TRANSACTIONAL);
+        final Boolean flowBlocked = (Boolean) row.get(ManagedConnection.FLOW_BLOCKED);
+        assertNotNull("Channel should have unacknowledged messages", unackCount);
+        assertEquals("Unexpected number of unacknowledged messages", 2, unackCount.intValue());
+        assertNotNull("Channel should have transaction flag", transactional);
+        assertTrue("Unexpected transaction flag", transactional);
+        assertNotNull("Channel should have flow blocked flag", flowBlocked);
+        assertFalse("Unexpected value of flow blocked flag", flowBlocked);
+
+        final Date initialLastIOTime = mBean.getLastIoTime();
+        session.commit();
+        assertTrue("Last IO time should have been updated", mBean.getLastIoTime().after(initialLastIOTime));
+
+        channelsData = mBean.channels();
+        assertNotNull("Channels data are null", channelsData);
+        assertEquals("Unexpected number of rows in channel table", 1, channelsData.size());
+
+        final Iterator<CompositeDataSupport> rowItr2 = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
+        final CompositeDataSupport row2 = rowItr2.next();
+        unackCount = (Number) row2.get(ManagedConnection.UNACKED_COUNT);
+        assertNotNull("Channel should have unacknowledged messages", unackCount);
+        assertEquals("Unexpected number of anacknowledged messages", 0, unackCount.intValue());
+
+        _connection.close();
+
+        connections = _jmxUtils.getManagedConnections("test");
+        assertNotNull("Connection MBean is not found", connections);
+        assertEquals("Unexpected number of connection mbeans", 0, connections.size());
+    }
+
+    public void testCommit() throws Exception
+    {
+        final String queueName = getTestQueueName();
+
+        final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        final Destination destination = producerSession.createQueue(queueName);
+        final MessageConsumer consumer = consumerSession.createConsumer(destination);
+        final MessageProducer producer = producerSession.createProducer(destination);
+
+        _connection.start();
+
+        List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+        assertNotNull("Connection MBean is not found", connections);
+        assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+        final ManagedConnection mBean = connections.get(0);
+        assertNotNull("Connection MBean is null", mBean);
+
+        final int numberOfMessages = 2;
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            producer.send(producerSession.createTextMessage("Test " + i));
+        }
+
+        Message m = consumer.receive(500l);
+        assertNull("Unexpected message received", m);
+
+        Number channelId = getFirstTransactedChannelId(mBean, 2);
+        mBean.commitTransactions(channelId.intValue());
+
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            m = consumer.receive(1000l);
+            assertNotNull("Message " + i + " is not received", m);
+            assertEquals("Unexpected message received at " + i, "Test " + i, ((TextMessage) m).getText());
+        }
+        producerSession.commit();
+        m = consumer.receive(500l);
+        assertNull("Unexpected message received", m);
+    }
+
+    protected Number getFirstTransactedChannelId(final ManagedConnection mBean, int channelNumber) throws IOException, JMException
+    {
+        TabularData channelsData = mBean.channels();
+        assertNotNull("Channels data are null", channelsData);
+        assertEquals("Unexpected number of rows in channel table", channelNumber, channelsData.size());
+        final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
+        while (rowItr.hasNext())
+        {
+            final CompositeDataSupport row = rowItr.next();
+            Boolean transacted = (Boolean) row.get(ManagedConnection.TRANSACTIONAL);
+            if (transacted.booleanValue())
+            {
+                return (Number) row.get(ManagedConnection.CHAN_ID);
+            }
+        }
+        return null;
+    }
+
+    public void testRollback() throws Exception
+    {
+        final String queueName = getTestQueueName();
+
+        final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        final Destination destination = producerSession.createQueue(queueName);
+        final MessageConsumer consumer = consumerSession.createConsumer(destination);
+        final MessageProducer producer = producerSession.createProducer(destination);
+
+        List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+        assertNotNull("Connection MBean is not found", connections);
+        assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+        final ManagedConnection mBean = connections.get(0);
+        assertNotNull("Connection MBean is null", mBean);
+
+        final int numberOfMessages = 2;
+        for (int i = 0; i < numberOfMessages; i++)
+        {
+            producer.send(producerSession.createTextMessage("Test " + i));
+        }
+
+        Number channelId = getFirstTransactedChannelId(mBean, 2);
+        mBean.rollbackTransactions(channelId.intValue());
+
+        Message m = consumer.receive(1000l);
+        assertNull("Unexpected message received", m);
+
+        producerSession.commit();
+
+        _connection.start();
+        m = consumer.receive(1000l);
+        assertNull("Unexpected message received", m);
+    }
+
+    public void testAuthorisedId() throws Exception
+    {
+        List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
+        assertNotNull("Connection MBean is not found", connections);
+        assertEquals("Unexpected number of connection mbeans", 1, connections.size());
+        final ManagedConnection mBean = connections.get(0);
+        assertNotNull("Connection MBean is null", mBean);
+        assertEquals("Unexpected authorized id", "guest", mBean.getAuthorizedId());
+    }
+}

Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1209052&r1=1209051&r2=1209052&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Thu Dec  1 12:39:14 2011
@@ -42,10 +42,6 @@ org.apache.qpid.server.logging.ChannelLo
 org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted
 org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend
 
-// 0-10 Broker does not have a JMX connection MBean
-org.apache.qpid.management.jmx.ManagementActorLoggingTest#testConnectionCloseViaManagement
-org.apache.qpid.management.jmx.MessageConnectionStatisticsTest#*
-
 // 0-10 is not supported by the MethodRegistry
 org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message