qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r619823 [11/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Commo...
Date Fri, 08 Feb 2008 10:10:11 GMT
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Fri Feb  8 02:09:37 2008
@@ -36,11 +36,18 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.CompositeAMQDataBlock;
 import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +58,7 @@
     private AMQConnection _connection;
 
     /**
-     * If true, messages will not get a timestamp.
+     * If true, messages will not get a timestamp. 
      */
     protected boolean _disableTimestamps;
 
@@ -103,7 +110,7 @@
     private long _producerId;
 
     /**
-     * The session used to create this producer
+     * The session used to create this producer 
      */
     protected AMQSession _session;
 
@@ -118,8 +125,8 @@
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-        AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
-        boolean waitUntilSent)
+                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+                                   boolean waitUntilSent)
     {
         _connection = connection;
         _destination = destination;
@@ -146,7 +153,24 @@
         }
     }
 
-    public abstract void declareDestination(AMQDestination destination);
+    private void declareDestination(AMQDestination destination)
+    {
+        ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
+                                                                                              destination.getExchangeName(),
+                                                                                              destination.getExchangeClass(),
+                                                                                              false,
+                                                                                              false,
+                                                                                              false,
+                                                                                              false,
+                                                                                              true,
+                                                                                              null);
+        // Declare the exchange
+        // Note that the durable and internal arguments are ignored since passive is set to false
+
+        AMQFrame declare = body.generateFrame(_channelId);
+
+        _protocolHandler.writeFrame(declare);
+    }
 
     public void setDisableMessageID(boolean b) throws JMSException
     {
@@ -181,7 +205,7 @@
         if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
         {
             throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
-                + " is illegal");
+                                   + " is illegal");
         }
 
         _deliveryMode = i;
@@ -293,7 +317,7 @@
         {
             validateDestination(destination);
             sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
-                _immediate);
+                     _immediate);
         }
     }
 
@@ -310,7 +334,7 @@
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory) throws JMSException
+                     boolean mandatory) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -322,7 +346,7 @@
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate) throws JMSException
+                     boolean mandatory, boolean immediate) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -334,7 +358,7 @@
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
+                     boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -342,7 +366,7 @@
         {
             validateDestination(destination);
             sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
-                waitUntilSent);
+                     waitUntilSent);
         }
     }
 
@@ -388,7 +412,7 @@
             else
             {
                 throw new JMSException("Unable to send message, due to class conversion error: "
-                    + message.getClass().getName());
+                                       + message.getClass().getName());
             }
         }
     }
@@ -398,14 +422,14 @@
         if (!(destination instanceof AMQDestination))
         {
             throw new JMSException("Unsupported destination class: "
-                + ((destination != null) ? destination.getClass() : null));
+                                   + ((destination != null) ? destination.getClass() : null));
         }
 
         declareDestination((AMQDestination) destination);
     }
 
     protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate) throws JMSException
+                            boolean mandatory, boolean immediate) throws JMSException
     {
         sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
     }
@@ -420,16 +444,27 @@
      * @param timeToLive
      * @param mandatory
      * @param immediate
+     *
      * @throws JMSException
      */
     protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate, boolean wait) throws JMSException
+                            boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
 
+        if (_transacted)
+        {
+            if (_session.hasFailedOver() && _session.isDirty())
+            {
+                throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
+                                          new AMQSessionDirtyException("Failover has occurred and session is dirty " +
+                                                                       "so unable to send."));
+            }
+        }
+
         if (_disableMessageId)
         {
             message.setJMSMessageID(null);
@@ -458,8 +493,82 @@
 
       //  message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
 
-        sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive,
-                 mandatory, immediate, wait);
+
+        BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+                                                                                        destination.getExchangeName(),
+                                                                                        destination.getRoutingKey(),
+                                                                                        mandatory,
+                                                                                        immediate);
+
+        AMQFrame publishFrame = body.generateFrame(_channelId);
+
+        message.prepareForSending();
+        ByteBuffer payload = message.getData();
+        BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+
+        if (!_disableTimestamps)
+        {
+            final long currentTime = System.currentTimeMillis();
+            contentHeaderProperties.setTimestamp(currentTime);
+
+            if (timeToLive > 0)
+            {
+                contentHeaderProperties.setExpiration(currentTime + timeToLive);
+            }
+            else
+            {
+                contentHeaderProperties.setExpiration(0);
+            }
+        }
+
+        contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
+        contentHeaderProperties.setPriority((byte) priority);
+
+        final int size = (payload != null) ? payload.limit() : 0;
+        final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+        final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+        if (payload != null)
+        {
+            createContentBodies(payload, frames, 2, _channelId);
+        }
+
+        if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+        {
+            _logger.debug("Sending content body frames to " + destination);
+        }
+
+
+        // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
+        int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
+
+        AMQFrame contentHeaderFrame =
+            ContentHeaderBody.createAMQFrame(_channelId,
+                                             classIfForBasic, 0, contentHeaderProperties, size);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending content header frame to " + destination);
+        }
+
+        frames[0] = publishFrame;
+        frames[1] = contentHeaderFrame;
+        CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+        _protocolHandler.writeFrame(compositeFrame, wait);
+
+        if (message != origMessage)
+        {
+            _logger.debug("Updating original message");
+            origMessage.setJMSPriority(message.getJMSPriority());
+            origMessage.setJMSTimestamp(message.getJMSTimestamp());
+            _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+            origMessage.setJMSExpiration(message.getJMSExpiration());
+            origMessage.setJMSMessageID(message.getJMSMessageID());
+        }
+
+        if (_transacted)
+        {
+            _session.markDirty();
+        }
     }
 
     public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode,
@@ -485,6 +594,60 @@
         }
     }
 
+    /**
+     * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+     * maximum frame size.
+     *
+     * @param payload
+     * @param frames
+     * @param offset
+     * @param channelId @return the array of content bodies
+     */
+    private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
+    {
+
+        if (frames.length == (offset + 1))
+        {
+            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+        }
+        else
+        {
+
+            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+            long remaining = payload.remaining();
+            for (int i = offset; i < frames.length; i++)
+            {
+                payload.position((int) framePayloadMax * (i - offset));
+                int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+                payload.limit(payload.position() + length);
+                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+
+                remaining -= length;
+            }
+        }
+
+    }
+
+    private int calculateContentBodyFrameCount(ByteBuffer payload)
+    {
+        // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+        // (0xCE byte).
+        int frameCount;
+        if ((payload == null) || (payload.remaining() == 0))
+        {
+            frameCount = 0;
+        }
+        else
+        {
+            int dataLength = payload.remaining();
+            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+            frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+        }
+
+        return frameCount;
+    }
+
     public void setMimeType(String mimeType) throws JMSException
     {
         checkNotClosed();
@@ -520,7 +683,7 @@
         if ((_destination != null) && (suppliedDestination != null))
         {
             throw new UnsupportedOperationException(
-                "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+                    "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
         }
 
         if (suppliedDestination == null)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Fri Feb  8 02:09:37 2008
@@ -47,20 +47,20 @@
 
     public void declareDestination(AMQDestination destination)
     {
+        ExchangeDeclareBody body = getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
+                destination.getExchangeName(),
+                destination.getExchangeClass(),
+                false,
+                false,
+                false,
+                false,
+                true,
+                null);
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since passive is set to false
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame declare =
-            ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), null, // arguments
-                false, // autoDelete
-                false, // durable
-                destination.getExchangeName(), // exchange
-                false, // internal
-                true, // nowait
-                false, // passive
-                _session.getTicket(), // ticket
-                destination.getExchangeClass()); // type
+
+        AMQFrame declare = body.generateFrame(_channelId);
+
         _protocolHandler.writeFrame(declare);
     }
 
@@ -70,13 +70,13 @@
 //      AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        AMQFrame publishFrame =
-            BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
-                immediate, // immediate
-                mandatory, // mandatory
-                destination.getRoutingKey(), // routingKey
-                _session.getTicket()); // ticket
+        BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+                destination.getExchangeName(),
+                destination.getRoutingKey(),
+                mandatory,
+                immediate);
+
+        AMQFrame publishFrame = body.generateFrame(_channelId);
 
         message.prepareForSending();
         ByteBuffer payload = message.getData();
@@ -114,13 +114,17 @@
             _logger.debug("Sending content body frames to " + destination);
         }
 
+        // TODO: This is a hacky way of getting the AMQP class-id for the Basic class
+        int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
+
         // weight argument of zero indicates no child content headers, just bodies
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+
         AMQFrame contentHeaderFrame =
             ContentHeaderBody.createAMQFrame(_channelId,
-                BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
-                    _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
+                                             classIfForBasic, 0, contentHeaderProperties, size);
+
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending content header frame to " + destination);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Fri Feb  8 02:09:37 2008
@@ -29,9 +29,10 @@
 public enum CustomJMSXProperty
 {
     JMS_AMQP_NULL,
-    JMS_QPID_DESTTYPE,    
+    JMS_QPID_DESTTYPE,
     JMSXGroupID,
-    JMSXGroupSeq;
+    JMSXGroupSeq,
+    JMSXUserID;
 
 
     private final AMQShortString _nameAsShortString;
@@ -47,7 +48,7 @@
     }
 
     private static Enumeration _names;
-    
+
     public static synchronized Enumeration asEnumeration()
     {
         if(_names == null)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Fri Feb  8 02:09:37 2008
@@ -21,18 +21,18 @@
 package org.apache.qpid.client;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.jms.TopicSubscriber;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
 
 /**
  * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
  *
  */
-class TopicSubscriberAdaptor implements  org.apache.qpid.jms.TopicSubscriber
+class TopicSubscriberAdaptor implements TopicSubscriber
 {
     private final Topic _topic;
     private final BasicMessageConsumer _consumer;

Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,36 @@
+package org.apache.qpid.client.handler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
+{
+    private static final Logger _logger = LoggerFactory.getLogger(AccessRequestOkMethodHandler.class);
+
+    private static AccessRequestOkMethodHandler _handler = new AccessRequestOkMethodHandler();
+
+    public static AccessRequestOkMethodHandler getInstance()
+    {
+        return _handler;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+        throws AMQException
+    {
+        _logger.debug("AccessRequestOk method received");
+        final AMQProtocolSession session = stateManager.getProtocolSession();
+        session.setTicket(method.getTicket(), channelId);
+
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -25,12 +25,13 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BasicCancelOkMethodHandler implements StateAwareMethodListener
+public class BasicCancelOkMethodHandler implements StateAwareMethodListener<BasicCancelOkBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicCancelOkMethodHandler.class);
 
@@ -44,16 +45,18 @@
     private BasicCancelOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+    public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId)
         throws AMQException
     {
-        BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+        AMQProtocolSession session = stateManager.getProtocolSession();
+
+
 
         if (_logger.isInfoEnabled())
         {
-            _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag);
+            _logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag());
         }
 
-        protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+        session.confirmConsumerCancelled(channelId, body.getConsumerTag());
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -30,7 +30,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BasicDeliverMethodHandler implements StateAwareMethodListener
+public class BasicDeliverMethodHandler implements StateAwareMethodListener<BasicDeliverBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicDeliverMethodHandler.class);
 
@@ -41,18 +41,12 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+    public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId)
         throws AMQException
     {
-        BasicDeliverBody deliveryBody = (BasicDeliverBody) evt.getMethod();
-        final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
-                           evt.getChannelId(),
-                           deliveryBody.deliveryTag,
-                           deliveryBody.consumerTag.asString(),
-                           deliveryBody.getExchange(),
-                           deliveryBody.getRoutingKey(),
-                           deliveryBody.getRedelivered());
+        final AMQProtocolSession session = stateManager.getProtocolSession();
+        final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
         _logger.debug("New JmsDeliver method received");
-        protocolSession.unprocessedMessageReceived(msg);
+        session.unprocessedMessageReceived(msg);
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -21,16 +21,15 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.ReturnMessage;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BasicReturnMethodHandler implements StateAwareMethodListener
+public class BasicReturnMethodHandler implements StateAwareMethodListener<BasicReturnBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicReturnMethodHandler.class);
 
@@ -41,18 +40,16 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+    public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
         throws AMQException
     {
-        BasicReturnBody returnBody = (BasicReturnBody)evt.getMethod();
         _logger.debug("New JmsBounce method received");
-        final ReturnMessage msg = new ReturnMessage(evt.getChannelId(),
-                                                    returnBody.getExchange(),
-                                                    returnBody.getRoutingKey(),
-                                                    returnBody.getReplyText(),
-                                                    returnBody.getReplyCode()
-                                                    );
+        final AMQProtocolSession session = stateManager.getProtocolSession();
+        /** FIXME: TGM AS SRSLY 4RL */
+        final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
 
-        protocolSession.unprocessedMessageReceived(msg);
+        session.unprocessedMessageReceived(msg);
     }
+
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -38,7 +38,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ChannelCloseMethodHandler implements StateAwareMethodListener
+public class ChannelCloseMethodHandler implements StateAwareMethodListener<ChannelCloseBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandler.class);
 
@@ -49,22 +49,26 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+    public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
         throws AMQException
     {
         _logger.debug("ChannelClose method received");
-        ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
 
-        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
-        AMQShortString reason = method.replyText;
+
+        AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+        AMQShortString reason = method.getReplyText();
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
         }
 
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
-        protocolSession.writeFrame(frame);
+
+
+        ChannelCloseOkBody body = session.getMethodRegistry().createChannelCloseOkBody();
+        AMQFrame frame = body.generateFrame(channelId);
+        session.writeFrame(frame);
+
         if (errorCode != AMQConstant.REPLY_SUCCESS)
         {
             if (_logger.isDebugEnabled())
@@ -100,6 +104,6 @@
         }
         // fixme why is this only done when the close is expected...
         // should the above forced closes not also cause a close?
-        protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+        session.channelClosed(channelId, errorCode, String.valueOf(reason));
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -29,7 +30,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ChannelCloseOkMethodHandler implements StateAwareMethodListener
+public class ChannelCloseOkMethodHandler implements StateAwareMethodListener<ChannelCloseOkBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseOkMethodHandler.class);
 
@@ -40,11 +41,12 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+    public void methodReceived(AMQStateManager stateManager,  ChannelCloseOkBody method, int channelId)
         throws AMQException
     {
-        _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+        _logger.info("Received channel-close-ok for channel-id " + channelId);
 
+        final AMQProtocolSession session = stateManager.getProtocolSession();
         // todo this should do the local closure
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -30,7 +30,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ChannelFlowOkMethodHandler implements StateAwareMethodListener
+public class ChannelFlowOkMethodHandler implements StateAwareMethodListener<ChannelFlowOkBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ChannelFlowOkMethodHandler.class);
     private static final ChannelFlowOkMethodHandler _instance = new ChannelFlowOkMethodHandler();
@@ -43,10 +43,12 @@
     private ChannelFlowOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
-        throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId)
+            throws AMQException
     {
-        ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
-        _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+
+        _logger.debug("Received Channel.Flow-Ok message, active = " + body.getActive());
     }
+
+
 }

Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,528 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl implements MethodDispatcher
+{
+
+
+    private static final BasicCancelOkMethodHandler      _basicCancelOkMethodHandler      = BasicCancelOkMethodHandler.getInstance();
+    private static final BasicDeliverMethodHandler       _basicDeliverMethodHandler       = BasicDeliverMethodHandler.getInstance();
+    private static final BasicReturnMethodHandler        _basicReturnMethodHandler        = BasicReturnMethodHandler.getInstance();
+    private static final ChannelCloseMethodHandler       _channelCloseMethodHandler       = ChannelCloseMethodHandler.getInstance();
+    private static final ChannelFlowOkMethodHandler      _channelFlowOkMethodHandler      = ChannelFlowOkMethodHandler.getInstance();
+    private static final ConnectionCloseMethodHandler    _connectionCloseMethodHandler    = ConnectionCloseMethodHandler.getInstance();
+    private static final ConnectionOpenOkMethodHandler   _connectionOpenOkMethodHandler   = ConnectionOpenOkMethodHandler.getInstance();
+    private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
+    private static final ConnectionSecureMethodHandler   _connectionSecureMethodHandler   = ConnectionSecureMethodHandler.getInstance();
+    private static final ConnectionStartMethodHandler    _connectionStartMethodHandler    = ConnectionStartMethodHandler.getInstance();
+    private static final ConnectionTuneMethodHandler     _connectionTuneMethodHandler     = ConnectionTuneMethodHandler.getInstance();
+    private static final ExchangeBoundOkMethodHandler    _exchangeBoundOkMethodHandler    = ExchangeBoundOkMethodHandler.getInstance();
+    private static final QueueDeleteOkMethodHandler      _queueDeleteOkMethodHandler      = QueueDeleteOkMethodHandler.getInstance();
+
+
+
+    private static interface DispatcherFactory
+    {
+        public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+    }
+
+    private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
+            new HashMap<ProtocolVersion, DispatcherFactory>();
+
+    static
+    {
+        _dispatcherFactories.put(ProtocolVersion.v8_0,
+                                 new DispatcherFactory()
+                                 {
+                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+                                     {
+                                         return new ClientMethodDispatcherImpl_8_0(stateManager);
+                                     }
+                                 });
+
+        _dispatcherFactories.put(ProtocolVersion.v0_9,
+                                 new DispatcherFactory()
+                                 {
+                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+                                     {
+                                         return new ClientMethodDispatcherImpl_0_9(stateManager);
+                                     }
+                                 });
+
+    }
+
+
+    public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+    {
+        DispatcherFactory factory = _dispatcherFactories.get(version);
+        return factory.createMethodDispatcher(stateManager);
+    }
+    
+
+
+
+    private AMQStateManager _stateManager;
+
+    public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+    {
+        _stateManager = stateManager;
+    }
+
+
+    public AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
+
+    public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
+    {
+        _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
+    {
+        _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
+    {
+        _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
+    {
+        _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
+    {
+        _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
+    {
+        _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
+    {
+        _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
+    {
+        _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
+    {
+        _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
+    {
+        _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
+    {
+        _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
+    {
+        _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamCancelOk(StreamCancelOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamConsumeOk(StreamConsumeOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchDtxSelect(DtxSelectBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchDtxStart(DtxStartBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileAck(FileAckBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileCancel(FileCancelBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileConsume(FileConsumeBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFilePublish(FilePublishBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileQos(FileQosBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileReject(FileRejectBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchStreamCancel(StreamCancelBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchStreamConsume(StreamConsumeBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchStreamPublish(StreamPublishBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchStreamQos(StreamQosBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchTunnelRequest(TunnelRequestBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchTxCommit(TxCommitBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchTxRollback(TxRollbackBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchTxSelect(TxSelectBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchDtxSelectOk(DtxSelectOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchDtxStartOk(DtxStartOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
+    {
+        _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        return true;
+    }
+
+    public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchFileCancelOk(FileCancelOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileConsumeOk(FileConsumeOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileDeliver(FileDeliverBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileOpen(FileOpenBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileOpenOk(FileOpenOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileQosOk(FileQosOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileReturn(FileReturnBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchFileStage(FileStageBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchStreamDeliver(StreamDeliverBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchStreamQosOk(StreamQosOkBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchStreamReturn(StreamReturnBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
+    {
+        return false;  
+    }
+
+}

Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQMethodNotImplementedException;
+
+
+public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
+{
+    public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+    {
+        super(stateManager);
+    }
+
+
+    public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchBasicRecoverSync(BasicRecoverSyncBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchChannelOk(ChannelOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelPing(ChannelPingBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelPong(ChannelPongBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelResume(ChannelResumeBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchMessageAppend(MessageAppendBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageCancel(MessageCancelBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchMessageCheckpoint(MessageCheckpointBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageClose(MessageCloseBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageConsume(MessageConsumeBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchMessageEmpty(MessageEmptyBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageGet(MessageGetBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchMessageOffset(MessageOffsetBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOk(MessageOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageOpen(MessageOpenBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageQos(MessageQosBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchMessageRecover(MessageRecoverBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchMessageReject(MessageRejectBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageResume(MessageResumeBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchMessageTransfer(MessageTransferBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
+    {
+        throw new AMQMethodNotImplementedException(body);
+    }
+
+    public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
+    {
+        return false;  
+    }
+
+
+}

Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,65 @@
+package org.apache.qpid.client.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.AMQStateManager;
+
+public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
+{
+    public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+    {
+        super(stateManager);
+    }
+
+    public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchChannelAlert(ChannelAlertBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestContent(TestContentBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestContentOk(TestContentOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestInteger(TestIntegerBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestIntegerOk(TestIntegerOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestString(TestStringBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestStringOk(TestStringOkBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestTable(TestTableBody body, int channelId) throws AMQException
+    {
+        return false;
+    }
+
+    public boolean dispatchTestTableOk(TestTableOkBody body, int channelId) throws AMQException
+    {
+        return false;  
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -36,7 +36,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConnectionCloseMethodHandler implements StateAwareMethodListener
+public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ConnectionCloseMethodHandler.class);
 
@@ -50,24 +50,26 @@
     private ConnectionCloseMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
-        throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId)
+                throws AMQException
     {
         _logger.info("ConnectionClose frame received");
-        ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
+        
 
         // does it matter
         // stateManager.changeState(AMQState.CONNECTION_CLOSING);
 
-        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
-        AMQShortString reason = method.replyText;
+        AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+        AMQShortString reason = method.getReplyText();
 
         try
         {
+
+            ConnectionCloseOkBody closeOkBody = session.getMethodRegistry().createConnectionCloseOkBody();
             // TODO: check whether channel id of zero is appropriate
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(),
-                    method.getMinor()));
+            session.writeFrame(closeOkBody.generateFrame(0));
 
             if (errorCode != AMQConstant.REPLY_SUCCESS)
             {
@@ -75,7 +77,7 @@
                 {
                     _logger.info("Authentication Error:" + Thread.currentThread().getName());
 
-                    protocolSession.closeProtocolSession();
+                    session.closeProtocolSession();
 
                     // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
                     stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
@@ -94,9 +96,11 @@
         {
             // this actually closes the connection in the case where it is not an error.
 
-            protocolSession.closeProtocolSession();
+            session.closeProtocolSession();
 
             stateManager.changeState(AMQState.CONNECTION_CLOSED);
         }
     }
+
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -21,13 +21,14 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
-public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener
+public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody>
 {
     private static final ConnectionOpenOkMethodHandler _instance = new ConnectionOpenOkMethodHandler();
 
@@ -40,9 +41,11 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId)
+                throws AMQException            
     {
         stateManager.changeState(AMQState.CONNECTION_OPEN);
     }
+
 
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -30,7 +30,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
+public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ConnectionRedirectMethodHandler.class);
 
@@ -46,13 +46,13 @@
     private ConnectionRedirectMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
-        throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId)
+            throws AMQException
     {
         _logger.info("ConnectionRedirect frame received");
-        ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
+        final AMQProtocolSession session = stateManager.getProtocolSession();         
 
-        String host = method.host.toString();
+        String host = method.getHost().toString();
         // the host is in the form hostname:port with the port being optional
         int portIndex = host.indexOf(':');
 
@@ -68,6 +68,7 @@
 
         }
 
-        protocolSession.failover(host, port);
+        session.failover(host, port);
     }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -32,7 +32,7 @@
 import org.apache.qpid.framing.ConnectionSecureOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
-public class ConnectionSecureMethodHandler implements StateAwareMethodListener
+public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody>
 {
     private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
 
@@ -41,27 +41,26 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId)
+                throws AMQException
     {
-        SaslClient client = protocolSession.getSaslClient();
+        final AMQProtocolSession session = stateManager.getProtocolSession(); 
+        SaslClient client = session.getSaslClient();
         if (client == null)
         {
             throw new AMQException(null, "No SASL client set up - cannot proceed with authentication", null);
         }
 
-        ConnectionSecureBody body = (ConnectionSecureBody) evt.getMethod();
+
 
         try
         {
             // Evaluate server challenge
-            byte[] response = client.evaluateChallenge(body.challenge);
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
-                body.getMajor(), body.getMinor(),
-                response);	// response
-            protocolSession.writeFrame(responseFrame);
+            byte[] response = client.evaluateChallenge(body.getChallenge());
+
+            ConnectionSecureOkBody secureOkBody = session.getMethodRegistry().createConnectionSecureOkBody(response);
+
+            session.writeFrame(secureOkBody.generateFrame(channelId));
         }
         catch (SaslException e)
         {
@@ -70,4 +69,6 @@
 
 
     }
+
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -48,7 +48,7 @@
 import java.util.HashSet;
 import java.util.StringTokenizer;
 
-public class ConnectionStartMethodHandler implements StateAwareMethodListener
+public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
 {
     private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
 
@@ -62,15 +62,16 @@
     private ConnectionStartMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
-        throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId)
+            throws AMQException
     {
         _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
             + "AMQMethodEvent evt): called");
 
-        ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
+
 
-        ProtocolVersion pv = new ProtocolVersion((byte) body.versionMajor, (byte) body.versionMinor);
+        ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
 
         // For the purposes of interop, we can make the client accept the broker's version string.
         // If it does, it then internally records the version as being the latest one that it understands.
@@ -83,26 +84,26 @@
 
         if (pv.isSupported())
         {
-            protocolSession.setProtocolVersion(pv.getMajorVersion(), pv.getMinorVersion());
+            session.setProtocolVersion(pv);
 
             try
             {
                 // Used to hold the SASL mechanism to authenticate with.
                 String mechanism;
 
-                if (body.mechanisms == null)
+                if (body.getMechanisms()== null)
                 {
                     throw new AMQException(null, "mechanism not specified in ConnectionStart method frame", null);
                 }
                 else
                 {
-                    mechanism = chooseMechanism(body.mechanisms);
+                    mechanism = chooseMechanism(body.getMechanisms());
                     _log.debug("mechanism = " + mechanism);
                 }
 
                 if (mechanism == null)
                 {
-                    throw new AMQException(null, "No supported security mechanism found, passed: " + new String(body.mechanisms), null);
+                    throw new AMQException(null, "No supported security mechanism found, passed: " + new String(body.getMechanisms()), null);
                 }
 
                 byte[] saslResponse;
@@ -110,7 +111,7 @@
                 {
                     SaslClient sc =
                         Sasl.createSaslClient(new String[] { mechanism }, null, "AMQP", "localhost", null,
-                            createCallbackHandler(mechanism, protocolSession));
+                            createCallbackHandler(mechanism, session));
                     if (sc == null)
                     {
                         throw new AMQException(null, "Client SASL configuration error: no SaslClient could be created for mechanism " + mechanism
@@ -118,21 +119,21 @@
                             + " details of how to register non-standard SASL client providers.", null);
                     }
 
-                    protocolSession.setSaslClient(sc);
+                    session.setSaslClient(sc);
                     saslResponse = (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null);
                 }
                 catch (SaslException e)
                 {
-                    protocolSession.setSaslClient(null);
+                    session.setSaslClient(null);
                     throw new AMQException(null, "Unable to create SASL client: " + e, e);
                 }
 
-                if (body.locales == null)
+                if (body.getLocales() == null)
                 {
                     throw new AMQException(null, "Locales is not defined in Connection Start method", null);
                 }
 
-                final String locales = new String(body.locales, "utf8");
+                final String locales = new String(body.getLocales(), "utf8");
                 final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
                 String selectedLocale = null;
                 if (tokenizer.hasMoreTokens())
@@ -148,23 +149,20 @@
                 FieldTable clientProperties = FieldTableFactory.newFieldTable();
 
                 clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
-                    protocolSession.getClientID());
+                    session.getClientID());
                 clientProperties.setString(new AMQShortString(ClientProperties.product.toString()),
                     QpidProperties.getProductName());
                 clientProperties.setString(new AMQShortString(ClientProperties.version.toString()),
                     QpidProperties.getReleaseVersion());
                 clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
 
+
+                ConnectionStartOkBody connectionStartOkBody = session.getMethodRegistry().createConnectionStartOkBody(clientProperties,new AMQShortString(mechanism),saslResponse,new AMQShortString(locales));
                 // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
                 // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
                 // Be aware of possible changes to parameter order as versions change.
-                protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
-                        protocolSession.getProtocolMajorVersion(), protocolSession.getProtocolMinorVersion(),
-                        clientProperties, // clientProperties
-                        new AMQShortString(selectedLocale), // locale
-                        new AMQShortString(mechanism), // mechanism
-                        saslResponse)); // response
-
+                session.writeFrame(connectionStartOkBody.generateFrame(channelId));
+                        
             }
             catch (UnsupportedEncodingException e)
             {
@@ -173,10 +171,10 @@
         }
         else
         {
-            _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor
+            _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
                 + "] which is not supported by this version of the client library");
 
-            protocolSession.closeProtocolSession();
+            session.closeProtocolSession();
         }
     }
 
@@ -235,4 +233,5 @@
             throw new AMQException(null, "Unable to create callback handler: " + e, e);
         }
     }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -26,17 +26,13 @@
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConnectionTuneMethodHandler implements StateAwareMethodListener
+public class ConnectionTuneMethodHandler implements StateAwareMethodListener<ConnectionTuneBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ConnectionTuneMethodHandler.class);
 
@@ -50,48 +46,41 @@
     protected ConnectionTuneMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
-        throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId)
+                throws AMQException
     {
         _logger.debug("ConnectionTune frame received");
-        ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
+        final MethodRegistry methodRegistry = session.getMethodRegistry();
 
-        ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
+
+        ConnectionTuneParameters params = session.getConnectionTuneParameters();
         if (params == null)
         {
             params = new ConnectionTuneParameters();
         }
 
-        params.setFrameMax(frame.frameMax);
-        params.setChannelMax(frame.channelMax);
-        params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
-        protocolSession.setConnectionTuneParameters(params);
+        params.setFrameMax(frame.getFrameMax());
+        params.setChannelMax(frame.getChannelMax());
+        params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
+        session.setConnectionTuneParameters(params);
 
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
-        protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params, frame.getMajor(), frame.getMinor()));
 
-        String host = protocolSession.getAMQConnection().getVirtualHost();
+        ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
+                                                                                    params.getFrameMax(),
+                                                                                    params.getHeartbeat());
+        // Be aware of possible changes to parameter order as versions change.
+        session.writeFrame(tuneOkBody.generateFrame(channelId));
+
+        String host = session.getAMQConnection().getVirtualHost();
         AMQShortString virtualHost = new AMQShortString("/" + host);
 
-        protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true, frame.getMajor(),
-                frame.getMinor()));
-    }
+        ConnectionOpenBody openBody = methodRegistry.createConnectionOpenBody(virtualHost,null,true);
 
-    protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities,
-        boolean insist, byte major, byte minor)
-    {
         // Be aware of possible changes to parameter order as versions change.
-        return ConnectionOpenBody.createAMQFrame(channel, major, minor, // AMQP version (major, minor)
-                capabilities, // capabilities
-                insist, // insist
-                path); // virtualHost
+        session.writeFrame(openBody.generateFrame(channelId));
     }
 
-    protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
-    {
-        // Be aware of possible changes to parameter order as versions change.
-        return ConnectionTuneOkBody.createAMQFrame(channel, major, minor, params.getChannelMax(), // channelMax
-                params.getFrameMax(), // frameMax
-                params.getHeartbeat()); // heartbeat
-    }
+
 }



Mime
View raw message