qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r619823 [10/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Commo...
Date Fri, 08 Feb 2008 10:10:11 GMT
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Feb  8 02:09:37 2008
@@ -20,42 +20,7 @@
  */
 package org.apache.qpid.client;
 
-
-import java.io.Serializable;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
@@ -75,18 +40,55 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -227,6 +229,12 @@
     protected Map<AMQShortString, BasicMessageConsumer> _consumers =
             new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
+    /**
+     * Contains a list of consumers which have been removed but which might still have
+     * messages to acknowledge, eg in client ack or transacted modes
+     */
+    private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
             new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -277,6 +285,11 @@
     private final boolean _strictAMQPFATAL;
     private final Object _messageDeliveryLock = new Object();
 
+    /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
+    private boolean _dirty;
+    /** Has failover occured on this session */
+    private boolean _failedOver;
+
     /**
      * Creates a new session on a connection.
      *
@@ -350,7 +363,22 @@
         }
     }
 
-
+    /**
+     * Creates a new session on a connection with the default message factory factory.
+     *
+     * @param con                 The connection on which to create the session.
+     * @param channelId           The unique identifier for the session.
+     * @param transacted          Indicates whether or not the session is transactional.
+     * @param acknowledgeMode     The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLow  The number of prefetched messages at which to resume the session.
+     */
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+               int defaultPrefetchLow)
+    {
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+             defaultPrefetchLow);
+    }
 
     // ===== JMS Session methods.
 
@@ -364,12 +392,18 @@
         close(-1);
     }
 
+    public BytesMessage createBytesMessage() throws JMSException
+    {
+        checkNotClosed();
+        return new JMSBytesMessage();
+    }
+
     /**
      * Acknowledges all unacknowledged messages on the session, for all message consumers on the session.
      *
      * @throws IllegalStateException If the session is closed.
      */
-    public void acknowledge() throws IllegalStateException
+    public void acknowledge() throws JMSException
     {
         if (isClosed())
         {
@@ -391,7 +425,26 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    {
+
+        BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
+
+        final AMQFrame ackFrame = body.generateFrame(_channelId);
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+        }
+
+        getProtocolHandler().writeFrame(ackFrame);
+    }
+
+    public MethodRegistry getMethodRegistry()
+    {
+        MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+        return methodRegistry;
+    }
 
     /**
      * Binds the named queue, with the specified routing key, to the named exchange.
@@ -417,7 +470,12 @@
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendQueueBind(queueName,routingKey,arguments,exchangeName);
+                    QueueBindBody body = getMethodRegistry().createQueueBindBody(getTicket(),queueName,exchangeName,routingKey,false,arguments);
+
+                    AMQFrame queueBind = body.generateFrame(_channelId);
+
+                    getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+
                 return null;
             }
         }, _connection).execute();
@@ -459,16 +517,16 @@
     {
         if (_logger.isInfoEnabled())
         {
+            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
             _logger.info("Closing session: " + this + ":"
-                         + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                         + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
         }
 
-        synchronized (_messageDeliveryLock)
+        synchronized (_connection.getFailoverMutex())
         {
-
             // We must close down all producers and consumers in an orderly fashion. This is the only method
             // that can be called from a different thread of control from the one controlling the session.
-            synchronized (_connection.getFailoverMutex())
+            synchronized (_messageDeliveryLock)
             {
                 // Ensure we only try and close an open session.
                 if (!_closed.getAndSet(true))
@@ -478,7 +536,17 @@
 
                     try
                     {
-                        sendClose(timeout);
+
+                    ChannelCloseBody body = getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+                            new AMQShortString("JMS client closing channel"), 0, 0);
+
+                    final AMQFrame frame = body.generateFrame(getChannelId());
+
+                    getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+                     
+
+                        // When control resumes at this point, a reply will have been received that
+                        // indicates the broker has closed the channel successfully.
                     }
                     catch (AMQException e)
                     {
@@ -511,27 +579,35 @@
      */
     public void closed(Throwable e) throws JMSException
     {
-        synchronized (_messageDeliveryLock)
-        {
-            synchronized (_connection.getFailoverMutex())
-            {
-                // An AMQException has an error code and message already and will be passed in when closure occurs as a
-                // result of a channel close request
-                _closed.set(true);
-                AMQException amqe;
-                if (e instanceof AMQException)
-                {
-                    amqe = (AMQException) e;
-                }
-                else
-                {
-                    amqe = new AMQException(null, "Closing session forcibly", e);
-                }
-
-                _connection.deregisterSession(_channelId);
-                closeProducersAndConsumers(amqe);
-            }
-        }
+    	synchronized (_connection.getFailoverMutex())
+    	{
+    		if (e instanceof AMQDisconnectedException)
+    		{
+    			if (_dispatcher != null)
+    			{
+    				// Failover failed and ain't coming back. Knife the dispatcher.
+    				_dispatcher.interrupt();
+    			}
+    		}
+    		synchronized (_messageDeliveryLock)
+    		{
+    			// An AMQException has an error code and message already and will be passed in when closure occurs as a
+    			// result of a channel close request
+    			_closed.set(true);
+    			AMQException amqe;
+    			if (e instanceof AMQException)
+    			{
+    				amqe = (AMQException) e;
+    			}
+    			else
+    			{
+    				amqe = new AMQException(null, "Closing session forcibly", e);
+    			}
+
+    			_connection.deregisterSession(_channelId);
+    			closeProducersAndConsumers(amqe);
+    		}
+    	}
     }
 
     /**
@@ -549,29 +625,72 @@
      */
     public void commit() throws JMSException
     {
-        checkTransacted();
-
-        try
-        {
-            // Acknowledge up to message last delivered (if any) for each consumer.
-            // need to send ack for messages delivered to consumers so far
-            for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
-            {
-                // Sends acknowledgement to server
-                i.next().acknowledgeLastDelivered();
-            }
+    	checkTransacted();
 
-            // Commits outstanding messages sent and outstanding acknowledgements.
-            sendCommit();
-        }
-        catch (AMQException e)
-        {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
-        }
-        catch (FailoverException e)
-        {
-            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
-        }
+    	new FailoverNoopSupport<Object, JMSException>(new FailoverProtectedOperation<Object, JMSException>()
+    	{
+    	    public Object execute() throws JMSException, FailoverException
+    	    {
+    	        // Check that we are clean to commit.
+    	        if (_failedOver && _dirty)
+    	        {
+    	            rollback();
+    	            
+    	            throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+    	                                                     "Forced rollback");
+    	        }
+
+    	        try
+    	        {
+    	            // Acknowledge up to message last delivered (if any)
+    	            // on this session.
+    				// We only need to find the highest value and ack that as
+                    // commit is session level.
+    				Long lastTag = -1L;
+
+    				for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+    				{
+    					i.next().acknowledgeDelivered();
+    				}
+
+    				if (_transacted)
+    				{
+    					// Do the above, but for consumers which have been
+                        // de-registered since the
+    					// last commit
+    					for (int i = 0; i < _removedConsumers.size(); i++)
+    					{
+    						// Sends acknowledgement to server
+    						_removedConsumers.get(i).acknowledgeDelivered();
+    						_removedConsumers.remove(i);
+    					}
+    				}
+
+    				// Commits outstanding messages sent and outstanding
+                    // acknowledgements.
+    				final AMQProtocolHandler handler = getProtocolHandler();
+
+    				TxCommitBody body = getMethodRegistry().createTxCommitBody();
+
+    				handler.syncWrite(body.generateFrame(_channelId), TxCommitOkBody.class);
+
+    				markClean();
+    				// Commits outstanding messages sent and outstanding
+                    // acknowledgements.
+                    sendCommit();
+
+    			}
+    			catch (AMQException e)
+    			{
+    				throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+    			}
+    			catch (FailoverException e)
+    			{
+    				throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+    			}
+                return null;
+    		}
+    	}, _connection).execute();
     }
 
     public abstract void sendCommit() throws AMQException, FailoverException;
@@ -600,7 +719,6 @@
                 else
                 {
                     _logger.info("Dispatcher is null so created stopped dispatcher");
-
                     startDistpatcherIfNecessary(true);
                 }
 
@@ -655,16 +773,6 @@
                                   messageSelector, null, true, true);
     }
 
-    public BytesMessage createBytesMessage() throws JMSException
-    {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return new JMSBytesMessage();
-        }
-    }
-
     public MessageConsumer createConsumer(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
@@ -695,7 +803,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, null, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -711,7 +819,7 @@
     {
         checkValidDestination(destination);
 
-        return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, rawSelector, false, false);
+        return createConsumerImpl(destination, prefetch, prefetch / 2, noLocal, exclusive, selector, rawSelector, false, false);
     }
 
     public MessageConsumer createConsumer(Destination destination, int prefetchHigh, int prefetchLow, boolean noLocal,
@@ -805,12 +913,8 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return new JMSMapMessage();
-        }
+        checkNotClosed();
+        return new JMSMapMessage();
     }
 
     public javax.jms.Message createMessage() throws JMSException
@@ -820,12 +924,8 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
-        {
-            checkNotClosed();
-
-            return (ObjectMessage) new JMSObjectMessage();
-        }
+        checkNotClosed();
+        return (ObjectMessage) new JMSObjectMessage();
     }
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -909,7 +1009,12 @@
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendCreateQueue(name, autoDelete,durable,exclusive);
+                    QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+
+                    AMQFrame queueDeclare = body.generateFrame(_channelId);
+
+                    getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
                 return null;
             }
         }, _connection).execute();
@@ -1216,16 +1321,6 @@
         }
     }
 
-    public void declareAndBind(AMQDestination amqd)
-            throws
-            AMQException
-    {
-        AMQProtocolHandler protocolHandler = getProtocolHandler();
-        declareExchange(amqd, protocolHandler, false);
-        AMQShortString queueName = declareQueue(amqd, protocolHandler);
-        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
-    }
-
     /**
      * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
      *
@@ -1280,7 +1375,33 @@
                 _dispatcher.rollback();
             }
 
-            sendRecover();
+            if (isStrictAMQP())
+            {
+                // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+
+                BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+                _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
+                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+            }
+            else
+            {
+                // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
+                // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
+                if(getProtocolVersion().equals(ProtocolVersion.v8_0))
+                {
+                    BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+                    _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+                }
+                else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
+                {
+                    BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
+                    _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+                }
+                else
+                {
+                    throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
+                }
+            }
 
             if (!isSuspended)
             {
@@ -1297,7 +1418,10 @@
         }
     }
 
-    public abstract void sendRecover() throws AMQException, FailoverException;
+    private ProtocolVersion getProtocolVersion()
+    {
+        return getProtocolHandler().getProtocolVersion();
+    }
 
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
@@ -1321,7 +1445,21 @@
 
     }
 
-    public abstract void rejectMessage(long deliveryTag, boolean requeue);
+    public void rejectMessage(long deliveryTag, boolean requeue)
+    {
+        if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
+            }
+
+            BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
+            AMQFrame frame = body.generateFrame(_channelId);
+
+            _connection.getProtocolHandler().writeFrame(frame);
+        }
+    }
 
     /**
      * Commits all messages done in this transaction and releases any locks currently held.
@@ -1356,7 +1494,11 @@
                     _dispatcher.rollback();
                 }
 
-                sendRollback();
+                TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
+                AMQFrame frame = body.generateFrame(getChannelId());
+                getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
+
+                markClean();
 
                 if (!isSuspended)
                 {
@@ -1491,6 +1633,7 @@
 
                         AMQDestination amqd = (AMQDestination) destination;
 
+                        final AMQProtocolHandler protocolHandler = getProtocolHandler();
                         // TODO: Define selectors in AMQP
                         // TODO: construct the rawSelector from the selector string if rawSelector == null
                         final FieldTable ft = FieldTableFactory.newFieldTable();
@@ -1578,6 +1721,13 @@
                     _destinationConsumerCount.remove(dest);
                 }
             }
+
+            // Consumers that are closed in a transaction must be stored
+            // so that messages they have received can be acknowledged on commit
+            if (_transacted)
+            {
+                _removedConsumers.add(consumer);
+            }
         }
     }
 
@@ -1611,8 +1761,35 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
-            throws JMSException;
+    boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+            throws JMSException
+    {
+        try
+        {
+            AMQMethodEvent response =
+                new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+                    new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+                    {
+                        public AMQMethodEvent execute() throws AMQException, FailoverException
+                        {
+                            ExchangeBoundBody body = getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName);
+                            AMQFrame boundFrame = body.generateFrame(_channelId);
+
+                                    return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+
+                                }
+                            }, _connection).execute();
+
+            // Extract and return the response code from the query.
+            ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+            return (responseBody.getReplyCode() == 0);
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
+        }
+    }
 
     /**
      * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
@@ -1633,6 +1810,7 @@
      */
     void resubscribe() throws AMQException
     {
+        _failedOver = true;
         resubscribeProducers();
         resubscribeConsumers();
     }
@@ -1671,9 +1849,16 @@
         }
     }
 
-    synchronized void startDistpatcherIfNecessary()
+    void startDistpatcherIfNecessary()
     {
+        //If we are the dispatcher then we don't need to check we are started
+        if (Thread.currentThread() == _dispatcher)
+        {
+            return;
+        }
+
         // If IMMEDIATE_PREFETCH is not set then we need to start fetching
+        // This is final per session so will be multi-thread safe.
         if (!_immediatePrefetch)
         {
             // We do this now if this is the first call on a started connection
@@ -1849,7 +2034,7 @@
             }
             else
             {
-                con.close();
+                con.close(false);
             }
         }
         // at this point the _consumers map will be empty
@@ -1919,15 +2104,51 @@
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
                                   AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
-        //need to generate a consumer tag on the client so we can exploit the nowait flag
+        // need to generate a consumer tag on the client so we can exploit the nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
+
+        FieldTable arguments = FieldTableFactory.newFieldTable();
+        if ((messageSelector != null) && !messageSelector.equals(""))
+        {
+            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+        }
+
+        if (consumer.isAutoClose())
+        {
+            arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+        }
+
+        if (consumer.isNoConsume())
+        {
+            arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+        }
+
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
 
         try
         {
-            sendConsume(consumer,queueName,protocolHandler,nowait,messageSelector,tag);
+            BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
+                                                                               queueName,
+                                                                               tag,
+                                                                               consumer.isNoLocal(),
+                                                                               consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+                                                                               consumer.isExclusive(),
+                                                                               nowait,
+                                                                               arguments);
+
+
+            AMQFrame jmsConsume = body.generateFrame(_channelId);
+
+            if (nowait)
+            {
+                protocolHandler.writeFrame(jmsConsume);
+            }
+            else
+            {
+                protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+            }
         }
         catch (AMQException e)
         {
@@ -1973,6 +2194,70 @@
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
     }
 
+
+    /**
+     * Returns the number of messages currently queued for the given destination.
+     *
+     * <p/>Note that this operation automatically retries in the event of fail-over.
+     *
+     * @param amqd            The destination to be checked
+     *
+     * @return the number of queued messages.
+     *
+     * @throws AMQException If the queue cannot be declared for any reason.
+     */
+    public long getQueueDepth(final AMQDestination amqd)
+            throws AMQException
+    {
+
+        class QueueDeclareOkHandler extends SpecificMethodFrameListener
+        {
+
+            private long _messageCount;
+            private long _consumerCount;
+
+            public QueueDeclareOkHandler()
+            {
+                super(getChannelId(), QueueDeclareOkBody.class);
+            }
+
+            public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+            {
+                boolean matches = super.processMethod(channelId, frame);
+                QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+                _messageCount = declareOk.getMessageCount();
+                _consumerCount = declareOk.getConsumerCount();
+                return matches;
+            }
+
+        }
+
+        return new FailoverNoopSupport<Long, AMQException>(
+                new FailoverProtectedOperation<Long, AMQException>()
+                {
+                    public Long execute() throws AMQException, FailoverException
+                    {
+
+                    	AMQFrame queueDeclare =
+                    		getMethodRegistry().createQueueDeclareBody(getTicket(),
+                    												   amqd.getAMQQueueName(),
+                    												   true,
+                    												   amqd.isDurable(),
+                    												   amqd.isExclusive(),
+                    												   amqd.isAutoDelete(),
+                    												   false,
+                    												   null).generateFrame(_channelId);
+                        QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+                        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+
+                        return okHandler._messageCount;
+                    }
+                }, _connection).execute();
+
+    }
+
+
+
     /**
      * Declares the named exchange and type of exchange.
      *
@@ -1988,13 +2273,17 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
-            final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+                                 final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
     {
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                sendExchangeDeclare(name, type, protocolHandler, nowait);
+                ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null);
+                AMQFrame exchangeDeclare = body.generateFrame(_channelId);
+
+                protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+
                 return null;
             }
         }, _connection).execute();
@@ -2025,7 +2314,7 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
+    private AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)
             throws AMQException
     {
         /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
@@ -2040,7 +2329,11 @@
                             amqd.setQueueName(protocolHandler.generateQueueName());
                         }
 
-                        sendQueueDeclare(amqd,protocolHandler);
+                        QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+
+                        AMQFrame queueDeclare = body.generateFrame(_channelId);
+
+                        protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
 
                         return amqd.getAMQQueueName();
                     }
@@ -2068,7 +2361,15 @@
             {
                 public Object execute() throws AMQException, FailoverException
                 {
-                    sendQueueDelete(queueName);
+                        QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
+                                                           queueName,
+                                                           false,
+                                                           false,
+                                                           true);
+                        AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
+
+                    getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+
                     return null;
                 }
             }, _connection).execute();
@@ -2148,6 +2449,16 @@
         }
     }
 
+    public void declareAndBind(AMQDestination amqd)
+            throws
+            AMQException
+    {
+        AMQProtocolHandler protocolHandler = getProtocolHandler();
+        declareExchange(amqd, protocolHandler, false);
+        AMQShortString queueName = declareQueue(amqd, protocolHandler);
+        bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName());
+    }
+
     /**
      * Callers must hold the failover mutex before calling this method.
      *
@@ -2254,7 +2565,7 @@
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Removing message(" + System.identityHashCode(message) + ") from _queue DT:"
-                                  + message.getDeliveryTag());
+                        + message.getDeliveryTag());
                 }
 
                 messages.remove();
@@ -2277,6 +2588,7 @@
         for (Iterator it = consumers.iterator(); it.hasNext();)
         {
             BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+            consumer.failedOver();
             registerConsumer(consumer, true);
         }
     }
@@ -2303,11 +2615,10 @@
                     // Bounced message is processed here, away from the mina thread
                     AbstractJMSMessage bouncedMessage =
                             _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
-                                                                  msg.getExchange(), msg.getContentHeader(), msg.getBodies());
-
-                    AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
-                    AMQShortString reason = msg.getReplyText();
-                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+                            		msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+                        AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+                        AMQShortString reason = msg.getReplyText();
+                        _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
 
                     // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
                     if (errorCode == AMQConstant.NO_CONSUMERS)
@@ -2345,7 +2656,7 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    protected  void suspendChannel(boolean suspend) throws AMQException // , FailoverException
+    protected void suspendChannel(boolean suspend) throws AMQException // , FailoverException
     {
         synchronized (_suspensionLock)
         {
@@ -2357,7 +2668,12 @@
                 }
 
                 _suspended = suspend;
-                sendSuspendChannel(suspend);
+
+                ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
+
+                AMQFrame channelFlowFrame = body.generateFrame(_channelId);
+
+                _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
             }
             catch (FailoverException e)
             {
@@ -2366,8 +2682,51 @@
         }
     }
 
+    Object getMessageDeliveryLock()
+    {
+        return _messageDeliveryLock;
+    }
+    
     public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException;
 
+    /** Signifies that the session has pending sends to commit. */
+    public void markDirty()
+    {
+        _dirty = true;
+    }
+
+    /** Signifies that the session has no pending sends to commit. */
+    public void markClean()
+    {
+        _dirty = false;
+        _failedOver = false;
+    }
+
+    /**
+     * Check to see if failover has occured since the last call to markClean(commit or rollback).
+     *
+     * @return boolean true if failover has occured.
+     */
+    public boolean hasFailedOver()
+    {
+        return _failedOver;
+    }
+
+    /**
+     * Check to see if any message have been sent in this transaction and have not been commited.
+     *
+     * @return boolean true if a message has been sent but not commited
+     */
+    public boolean isDirty()
+    {
+        return _dirty;
+    }
+
+    public void setTicket(int ticket)
+    {
+        _ticket = ticket;
+    }
+
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     private class Dispatcher extends Thread
     {
@@ -2377,6 +2736,7 @@
 
         private final Object _lock = new Object();
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
+        private String dispatcherID = "" + System.identityHashCode(this);
 
         public Dispatcher()
         {
@@ -2408,10 +2768,11 @@
                 }
 
                 // Reject messages on pre-receive queue
-                consumer.rollback();
+                consumer.rollbackPendingMessages();
 
                 // Reject messages on pre-dispatch queue
                 rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+                //Let the dispatcher deal with this when it gets to them.
 
                 // closeConsumer
                 consumer.markClosed();
@@ -2452,6 +2813,13 @@
 
                 }
 
+                for (int i = 0; i < _removedConsumers.size(); i++)
+                {
+                    // Sends acknowledgement to server
+                    _removedConsumers.get(i).rollback();
+                    _removedConsumers.remove(i);
+                }
+
                 setConnectionStopped(isStopped);
             }
 
@@ -2504,6 +2872,18 @@
                             {
                                 dispatchMessage(message);
                             }
+
+                            if (message.getDeliveryTag() <= _rollbackMark.get())
+                            {
+                                rejectMessage(message, true);
+                            }
+                            else
+                            {
+                                synchronized (_messageDeliveryLock)
+                                {
+                                    dispatchMessage(message);
+                                }
+                            }
                         }
 
                     }
@@ -2552,7 +2932,7 @@
             //if (message.getDeliverBody() != null)
             //{
                 final BasicMessageConsumer consumer =
-                        (BasicMessageConsumer) _consumers.get(new AMQShortString(message.getConsumerTag()));
+                    (BasicMessageConsumer) _consumers.get(message.getConsumerTag());
 
                 if ((consumer == null) || consumer.isClosed())
                 {
@@ -2560,15 +2940,15 @@
                     {
                         if (consumer == null)
                         {
-                            _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                                   + message.getDeliveryTag() + "] from queue "
-                                                   + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
+                            _dispatcherLogger.info("Dispatcher(" + dispatcherID + ")Received a message(" + System.identityHashCode(message) + ")" + "["
+                                + message.getDeliveryTag() + "] from queue "
+                                + message.getConsumerTag() + " )without a handler - rejecting(requeue)...");
                         }
                         else
                         {
                             _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" + "["
-                                                   + message.getDeliveryTag() + "] from queue " + " consumer("
-                                                   + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
+                                + message.getDeliveryTag() + "] from queue " + " consumer("
+                                + consumer.debugIdentity() + ") is closed rejecting(requeue)...");
                         }
                     }
                     // Don't reject if we're already closing

Added: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQSessionDirtyException represents all failures to send data on a transacted session that is
+ * no longer in a state that the client expects. i.e. failover has occured so previously sent messages
+ * will not be part of the transaction.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent attempt to perform additional sends on a dirty session.
+ * </table>
+ */
+public class AMQSessionDirtyException extends AMQException
+{
+    public AMQSessionDirtyException(String msg)
+    {
+        super(AMQConstant.RESOURCE_ERROR, msg, null);
+    }
+}

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Feb  8 02:09:37 2008
@@ -33,6 +33,7 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -83,8 +84,7 @@
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         final AMQFrame ackFrame =
-            BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                                        multiple);
+            getProtocolHandler().getMethodRegistry().createBasicAckBody(deliveryTag, multiple).generateFrame(_channelId);
 
         if (_logger.isDebugEnabled())
         {
@@ -97,28 +97,17 @@
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
             final AMQShortString exchangeName) throws AMQException, FailoverException
     {
-        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
-                exchangeName, // exchange
-                false, // nowait
-                queueName, // queue
-                routingKey, // routingKey
-                getTicket()); // ticket
-
-        getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+        getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
+                                        (getTicket(),queueName,exchangeName,routingKey,false,arguments).
+                                        generateFrame(_channelId), QueueBindOkBody.class);
     }
 
     public void sendClose(long timeout) throws AMQException, FailoverException
     {
         getProtocolHandler().closeSession(this);
-
-        final AMQFrame frame = ChannelCloseBody.createAMQFrame
-            (getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
-             0, // classId
-             0, // methodId
-             AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-             new AMQShortString("JMS client closing channel")); // replyText
-
-        getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+        getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+                new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId), 
+                                       ChannelCloseOkBody.class, timeout);
         // When control resumes at this point, a reply will have been received that
         // indicates the broker has closed the channel successfully.
     }
@@ -127,21 +116,13 @@
     {
         final AMQProtocolHandler handler = getProtocolHandler();
 
-        handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxCommitOkBody.class);
+        handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitOkBody().generateFrame(_channelId), TxCommitOkBody.class);
     }
 
     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
             FailoverException
     {
-        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
-                autoDelete, // autoDelete
-                durable, // durable
-                exclusive, // exclusive
-                false, // nowait
-                false, // passive
-                name, // queue
-                getTicket()); // ticket
-
+        AMQFrame queueDeclare = getProtocolHandler().getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null).generateFrame(_channelId); 
         getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
 
@@ -150,16 +131,29 @@
         if (isStrictAMQP())
         {
             // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
-            _connection.getProtocolHandler().writeFrame(
-                    BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+
+            BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+            _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
             _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
         }
         else
         {
-
-            _connection.getProtocolHandler().syncWrite(
-                    BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
-                    , BasicRecoverOkBody.class);
+            // in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
+            // in 0-9 we used the cleaner addition of a new sync recover method with its own ok
+            if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
+            {
+                BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
+                _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+            }
+            else if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v0_9))
+            {
+                BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
+                _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+            }
+            else
+            {
+                throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolHandler().getProtocolVersion());
+            }
         }
     }
 
@@ -172,8 +166,7 @@
                 _logger.debug("Rejecting delivery tag:" + deliveryTag);
             }
 
-            AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                    requeue);
+            AMQFrame basicRejectBody = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue).generateFrame(_channelId);
 
             _connection.getProtocolHandler().writeFrame(basicRejectBody);
         }
@@ -189,10 +182,8 @@
                     {
                         public AMQMethodEvent execute() throws AMQException, FailoverException
                         {
-                            AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                    exchangeName, // exchange
-                                    queueName, // queue
-                                    routingKey); // routingKey
+                            AMQFrame boundFrame = getMethodRegistry().createExchangeBoundBody
+                                                    (exchangeName, routingKey, queueName).generateFrame(_channelId);
 
                             return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
 
@@ -202,7 +193,7 @@
             // Extract and return the response code from the query.
             ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
 
-            return (responseBody.replyCode == 0);
+            return (responseBody.getReplyCode() == 0);
         }
         catch (AMQException e)
         {
@@ -234,14 +225,14 @@
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
         // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
-                tag, // consumerTag
-                consumer.isExclusive(), // exclusive
-                consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
-                consumer.isNoLocal(), // noLocal
-                nowait, // nowait
-                queueName, // queue
-                getTicket()); // ticket
+        AMQFrame jmsConsume = getMethodRegistry().createBasicConsumeBody(getTicket(),
+                queueName,
+                tag,
+                consumer.isNoLocal(),
+                consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+                consumer.isExclusive(),
+                nowait,
+                arguments).generateFrame(_channelId);
 
         if (nowait)
         {
@@ -256,49 +247,36 @@
     public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
             final boolean nowait) throws AMQException, FailoverException
     {
-        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
-                false, // autoDelete
-                false, // durable
-                name, // exchange
-                false, // internal
-                nowait, // nowait
-                false, // passive
-                getTicket(), // ticket
-                type); // type
+        AMQFrame exchangeDeclare = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,false,false,false,false,nowait,null).
+                                            generateFrame(_channelId);
 
         protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
     {
-        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
-                amqd.isAutoDelete(), // autoDelete
-                amqd.isDurable(), // durable
-                amqd.isExclusive(), // exclusive
-                false, // nowait
-                false, // passive
-                amqd.getAMQQueueName(), // queue
-                getTicket()); // ticket
+        AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null).generateFrame(_channelId);
 
         protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
 
     public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
     {
-        AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false, // ifEmpty
-                false, // ifUnused
-                true, // nowait
-                queueName, // queue
-                getTicket()); // ticket
+        QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
+                queueName,
+                false,
+                false,
+                true);
+        AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
 
         getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
     }
 
     public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
     {
-        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), !suspend);
-
-        _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+        _connection.getProtocolHandler().syncWrite(_connection.getProtocolHandler().getMethodRegistry().
+                                                   createChannelFlowBody(!suspend).generateFrame(_channelId),
+                                                   ChannelFlowOkBody.class);
     }
 
     public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
@@ -323,8 +301,8 @@
 
     public void sendRollback() throws AMQException, FailoverException
     {
-        _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
-            getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+        _connection.getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createTxRollbackBody().generateFrame(_channelId), 
+                                                    TxRollbackOkBody.class);
     }
 
      public TemporaryQueue createTemporaryQueue() throws JMSException

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Fri Feb  8 02:09:37 2008
@@ -24,6 +24,7 @@
 
 import javax.jms.JMSException;
 import javax.jms.TemporaryTopic;
+import java.util.UUID;
 
 /**
  * AMQ implementation of TemporaryTopic.
@@ -38,7 +39,7 @@
      */
     public AMQTemporaryTopic(AMQSession session)
     {
-        super(session.getTemporaryTopicExchangeName(),new AMQShortString("TempQueue" + Long.toString(System.currentTimeMillis())));
+        super(session.getTemporaryTopicExchangeName(),new AMQShortString("tmp_" + UUID.randomUUID()));
         _session = session;
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Fri Feb  8 02:09:37 2008
@@ -96,8 +96,7 @@
 
     public boolean isNameRequired()
     {
-        // Topics always rely on a server generated queue name.
-        return false;
+        return !isDurable();
     }
 
     /**
@@ -111,5 +110,18 @@
      */
     public void setQueueName(String queueName)
     {
+    }
+
+    public boolean equals(Object o)
+    {
+        return (o instanceof AMQTopic)
+               && ((AMQTopic)o).getExchangeName().equals(getExchangeName())
+               && ((AMQTopic)o).getRoutingKey().equals(getRoutingKey());
+
+    }
+
+    public int hashCode()
+    {
+        return getExchangeName().hashCode() + getRoutingKey().hashCode();
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Feb  8 02:09:37 2008
@@ -20,38 +20,40 @@
  */
 package org.apache.qpid.client;
 
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.AMQException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
-    /**
-     * The connection being used by this consumer
-     */
+    /** The connection being used by this consumer */
     protected AMQConnection _connection;
 
     private String _messageSelector;
@@ -69,14 +71,10 @@
      */
     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
-    /**
-     * The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker
-     */
+    /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
     protected AMQShortString _consumerTag;
 
-    /**
-     * We need to know the channel id when constructing frames
-     */
+    /** We need to know the channel id when constructing frames */
     protected int _channelId;
 
     /**
@@ -87,7 +85,7 @@
 
     protected MessageFactoryRegistry _messageFactory;
 
-    protected final AMQSession _session;
+    private final AMQSession _session;
 
     protected AMQProtocolHandler _protocolHandler;
 
@@ -261,7 +259,7 @@
 
             if (messageListener != null)
             {
-                // handle case where connection has already been started, and the dispatcher has alreaded started
+                //todo: handle case where connection has already been started, and the dispatcher has alreaded started
                 // putting values on the _synchronousQueue
 
                 synchronized (_session)
@@ -277,16 +275,56 @@
     protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
     {
 
-        if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+        switch (_session.getAcknowledgeMode())
         {
-            _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                _logger.info("Recording tag for acking on close:" + jmsMsg.getDeliveryTag());
+                _receivedDeliveryTags.add(jmsMsg.getDeliveryTag());
+                break;
+
+            case Session.CLIENT_ACKNOWLEDGE:
+                _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
+                break;
+
+            case Session.SESSION_TRANSACTED:
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(jmsMsg.getDeliveryTag(), false);
+                }
+                else
+                {
+                    _logger.info("Recording tag for commit:" + jmsMsg.getDeliveryTag());
+                    _receivedDeliveryTags.add(jmsMsg.getDeliveryTag());
+                }
+
+                break;
         }
 
         _session.setInRecovery(false);
     }
 
-    private void acquireReceiving() throws JMSException
+    /**
+     * @param immediate if true then return immediately if the connection is failing over
+     *
+     * @return boolean if the acquisition was successful
+     *
+     * @throws JMSException
+     * @throws InterruptedException
+     */
+    private boolean acquireReceiving(boolean immediate) throws JMSException, InterruptedException
     {
+        if (_connection.isFailingOver())
+        {
+            if (immediate)
+            {
+                return false;
+            }
+            else
+            {
+                _connection.blockUntilNotFailingOver();
+            }
+        }
+
         if (!_receiving.compareAndSet(false, true))
         {
             throw new javax.jms.IllegalStateException("Another thread is already receiving.");
@@ -298,6 +336,7 @@
         }
 
         _receivingThread = Thread.currentThread();
+        return true;
     }
 
     private void releaseReceiving()
@@ -351,7 +390,18 @@
 
         checkPreConditions();
 
-        acquireReceiving();
+        try
+        {
+            acquireReceiving(false);
+        }
+        catch (InterruptedException e)
+        {
+            _logger.warn("Interrupted: " + e);
+            if (isClosed())
+            {
+                return null;
+            }
+        }
 
         _session.startDistpatcherIfNecessary();
 
@@ -370,7 +420,6 @@
                 preApplicationProcessing(m);
                 postDeliver(m);
             }
-
             return m;
         }
         catch (InterruptedException e)
@@ -405,7 +454,25 @@
     {
         checkPreConditions();
 
-        acquireReceiving();
+        try
+        {
+            if (!acquireReceiving(true))
+            {
+                //If we couldn't acquire the receiving thread then return null.
+                // This will occur if failing over.
+                return null;
+            }
+        }
+        catch (InterruptedException e)
+        {
+            /*
+             *  This seems slightly shoddy but should never actually be executed
+             *  since we told acquireReceiving to return immediately and it shouldn't
+             *  block on anything.
+             */
+
+            return null;
+        }
 
         _session.startDistpatcherIfNecessary();
 
@@ -486,22 +553,41 @@
             {
                 if (_logger.isTraceEnabled())
                 {
+                    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                     if (_closedStack != null)
                     {
-                        _logger.trace(_consumerTag + " close():" + Arrays.asList(Thread.currentThread().getStackTrace())
-                                .subList(3, 6));
                         _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
                     }
                     else
                     {
-                        _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
+                        _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
                     }
                 }
 
                 if (sendClose)
                 {
-                    // TODO: Be aware of possible changes to parameter order as versions change.
-                    sendCancel();
+                    BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
+
+                    final AMQFrame cancelFrame = body.generateFrame(_channelId);
+
+                    try
+                    {
+                        _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+
+                        if (_logger.isDebugEnabled())
+                        {
+                            _logger.debug("CancelOk'd for consumer:" + debugIdentity());
+                        }
+
+                    }
+                    catch (AMQException e)
+                    {
+                        throw new JMSAMQException("Error closing consumer: " + e, e);
+                    }
+                    catch (FailoverException e)
+                    {
+                        throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
+                    }
                 }
                 else
                 {
@@ -573,7 +659,12 @@
 
         try
         {
-            AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
+            AbstractJMSMessage jmsMessage =
+                _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
+                                              messageFrame.getExchange(), messageFrame.getRoutingKey(),
+                                              (ContentHeaderBody) messageFrame.getContentHeader(), 
+                                              messageFrame.getBodies());
+
             if (debug)
             {
                 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
@@ -688,12 +779,13 @@
                 break;
 
             case Session.DUPS_OK_ACKNOWLEDGE:
-                if (++_outstanding >= _prefetchHigh)
+            /*(    if (++_outstanding >= _prefetchHigh)
                 {
                     _dups_ok_acknowledge_send = true;
                 }
 
-                if (_outstanding <= _prefetchLow)
+                //Can't use <= as _prefetchHigh may equal _prefetchLow so no acking would occur.
+                if (_outstanding < _prefetchLow)
                 {
                     _dups_ok_acknowledge_send = false;
                 }
@@ -703,11 +795,12 @@
                     if (!_session.isInRecovery())
                     {
                         _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+                        _outstanding = 0;
                     }
                 }
 
                 break;
-
+             */
             case Session.AUTO_ACKNOWLEDGE:
                 // we do not auto ack a message if the application code called recover()
                 if (!_session.isInRecovery())
@@ -716,7 +809,6 @@
                 }
 
                 break;
-
             case Session.SESSION_TRANSACTED:
                 if (isNoConsume())
                 {
@@ -731,14 +823,17 @@
         }
     }
 
+
     /**
      * Acknowledge up to last message delivered (if any). Used when commiting.
+     *
+     * @return the lastDeliveryTag to acknowledge
      */
-    void acknowledgeLastDelivered()
+    Long getLastDelivered()
     {
         if (!_receivedDeliveryTags.isEmpty())
         {
-            long lastDeliveryTag = _receivedDeliveryTags.poll();
+            Long lastDeliveryTag = _receivedDeliveryTags.poll();
 
             while (!_receivedDeliveryTags.isEmpty())
             {
@@ -747,10 +842,24 @@
 
             assert _receivedDeliveryTags.isEmpty();
 
-            _session.acknowledgeMessage(lastDeliveryTag, true);
+            return lastDeliveryTag;
         }
+
+        return null;
     }
 
+    /**
+     * Acknowledge up to last message delivered (if any). Used when commiting.
+     */
+    void acknowledgeDelivered()
+    {
+    	while (!_receivedDeliveryTags.isEmpty())
+        {
+    		_session.acknowledgeMessage(_receivedDeliveryTags.poll(), false);
+        }
+    }
+
+
     void notifyError(Throwable cause)
     {
         // synchronized (_closed)
@@ -758,15 +867,16 @@
             _closed.set(true);
             if (_logger.isTraceEnabled())
             {
+                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                 if (_closedStack != null)
                 {
-                    _logger.trace(_consumerTag + " notifyError():" + Arrays
-                            .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+                    _logger.trace(_consumerTag + " notifyError():"
+                                  + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
                     _logger.trace(_consumerTag + " previously" + _closedStack.toString());
                 }
                 else
                 {
-                    _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+                    _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
                 }
             }
         }
@@ -821,11 +931,18 @@
         }
     }
 
-    public void acknowledge() // throws JMSException
+    public void acknowledge() throws JMSException
     {
-        if (!isClosed())
+        if (isClosed())
+        {
+            throw new IllegalStateException("Consumer is closed");
+        }
+        else if (_session.hasFailedOver())
+        {
+            throw new JMSException("has failed over");
+        }
+        else
         {
-
             Iterator<Long> tags = _unacknowledgedDeliveryTags.iterator();
             while (tags.hasNext())
             {
@@ -833,10 +950,6 @@
                 tags.remove();
             }
         }
-        else
-        {
-            throw new IllegalStateException("Consumer is closed");
-        }
     }
 
     /**
@@ -863,6 +976,7 @@
 
         if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
         {
+            _closed.set(true);
             _receivingThread.interrupt();
         }
 
@@ -879,7 +993,11 @@
 
         rollbackReceivedMessages();
 
-        // rollback pending messages
+        rollbackPendingMessages();
+    }
+
+    public void rollbackPendingMessages()
+    {
         if (_synchronousQueue.size() > 0)
         {
             if (_logger.isDebugEnabled())
@@ -890,6 +1008,9 @@
 
             Iterator iterator = _synchronousQueue.iterator();
 
+            int initialSize = _synchronousQueue.size();
+
+            boolean removed = false;
             while (iterator.hasNext())
             {
 
@@ -904,16 +1025,24 @@
                     }
 
                     iterator.remove();
+                    removed = true;
 
                 }
                 else
                 {
-                    _logger.error("Queue contained a :" + o
-                            .getClass() + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+                    _logger.error("Queue contained a :" + o.getClass()
+                                  + " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
                     iterator.remove();
+                    removed = true;
+                }
                 }
+
+            if (removed && (initialSize == _synchronousQueue.size()))
+            {
+                _logger.error("Queue had content removed but didn't change in size." + initialSize);
             }
 
+
             if (_synchronousQueue.size() != 0)
             {
                 _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
@@ -959,7 +1088,7 @@
 
     public String debugIdentity()
     {
-        return String.valueOf(_consumerTag);
+        return String.valueOf(_consumerTag) + "[" + System.identityHashCode(this) + "]";
     }
 
     public void clearReceiveQueue()
@@ -967,7 +1096,6 @@
         _synchronousQueue.clear();
     }
 
-
     public void start()
     {
         // do nothing as this is a 0_10 feature
@@ -992,5 +1120,12 @@
     public void addBindingKey(AMQDestination amqd, String routingKey) throws AMQException 
     {
         _session.addBindingKey(this,amqd,routingKey);
+    }
+
+    /** to be called when a failover has occured */
+    public void failedOver()
+    {
+        clearReceiveQueue();
+        clearUnackedMessages();
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Fri Feb  8 02:09:37 2008
@@ -53,10 +53,10 @@
 
     public void sendCancel() throws JMSAMQException
     {
-        final AMQFrame cancelFrame =
-            BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
-                false); // nowait
+        final AMQFrame cancelFrame = _connection.getProtocolHandler().getMethodRegistry().
+                                        createBasicCancelBody(_consumerTag, // consumerTag
+                                                              false). // nowait
+                                        generateFrame(_channelId);
 
         try
         {



Mime
View raw message