qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rupertlssm...@apache.org
Subject svn commit: r540165 [1/3] - in /incubator/qpid/trunk/qpid/java: broker/src/main/grammar/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main...
Date Mon, 21 May 2007 15:11:27 GMT
Author: rupertlssmith
Date: Mon May 21 08:11:23 2007
New Revision: 540165

URL: http://svn.apache.org/viewvc?view=rev&rev=540165
Log:
Refactored exceptions to have single constructors and made room for wrapped causes.

Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingExclusiveSubscriptionException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ExistingSubscriptionPreventsExclusiveException.java
Removed:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/IllegalStateTransitionException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/IllegalStateTransitionException.java
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FailedDequeueException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQAuthenticationException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQNoConsumersException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQNoRouteException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQNoTransportForProtocolException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/AMQTransportConnectionException.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/vmbroker/AMQVMBrokerCreationException.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQConnectionWaitException.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedBodyTypeException.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/AMQUnexpectedFrameTypeException.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelClosedException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQInvalidRoutingKeyException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQTimeoutException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQUndeliveredException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/AMQUnresolvedAddressException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/PropertyUtils.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrameDecodingException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolClassException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolHeaderException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolInstanceException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQProtocolVersionException.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/grammar/SelectorParser.jj Mon May 21 08:11:23 2007
@@ -94,7 +94,7 @@
             return this.JmsSelector();
         }
         catch (Throwable e) {
-	        throw (AMQInvalidArgumentException)new AMQInvalidArgumentException(sql).initCause(e);
+	        throw new AMQInvalidArgumentException(sql, e);
         }
 
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon May 21 08:11:23 2007
@@ -212,7 +212,7 @@
     {
         if (_currentMessage == null)
         {
-            throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+            throw new AMQException(null, "Received content header without previously receiving a BasicPublish frame", null);
         }
         else
         {
@@ -239,7 +239,7 @@
     {
         if (_currentMessage == null)
         {
-            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+            throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null);
         }
 
         if (_log.isTraceEnabled())
@@ -883,7 +883,7 @@
     {
         if (!isTransactional())
         {
-            throw new AMQException("Fatal error: commit called on non-transactional channel");
+            throw new AMQException(null, "Fatal error: commit called on non-transactional channel", null);
         }
 
         _txnContext.commit();

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java Mon May 21 08:11:23 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,16 @@
  */
 package org.apache.qpid.server;
 
-public class ConsumerTagNotUniqueException  extends Exception
-{
-}
+/**
+ * ConsumerTagNotUniqueException indicates that a client has attempted to connect with a consumer tag that is already
+ * used.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents error when clients connects with a non-unique tag.
+ * </table>
+ *
+ * @todo Consider replacing with an AMQNotAllowedException, as this is the status code returned when this happens.
+ */
+public class ConsumerTagNotUniqueException extends Exception
+{ }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Mon May 21 08:11:23 2007
@@ -41,9 +41,9 @@
 {
     private final AMQMessage _amqMessage;
 
-    public RequiredDeliveryException(String message, AMQMessage payload)
+    public RequiredDeliveryException(String message, AMQMessage payload, Throwable cause)
     {
-        super(message);
+        super(null, message, cause);
 
         // Increment the reference as this message is in the routing phase
         // and so will have the ref decremented as routing fails.

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Mon May 21 08:11:23 2007
@@ -181,8 +181,8 @@
                 if (unacked.getKey() > deliveryTag)
                 {
                     //This should not occur now.
-                    throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
-                                           " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
+                    throw new AMQException(null, "UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
+                                           " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString(), null);
                 }
 
                 it.remove();

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Mon May 21 08:11:23 2007
@@ -55,7 +55,7 @@
         if (exchClass == null)
         {
 
-            throw new AMQUnknownExchangeType("Unknown exchange type: " + type);
+            throw new AMQUnknownExchangeType("Unknown exchange type: " + type, null);
         }
         try
         {
@@ -65,11 +65,11 @@
         }
         catch (InstantiationException e)
         {
-            throw new AMQException("Unable to create exchange: " + e, e);
+            throw new AMQException(null, "Unable to create exchange: " + e, e);
         }
         catch (IllegalAccessException e)
         {
-            throw new AMQException("Unable to create exchange: " + e, e);
+            throw new AMQException(null, "Unable to create exchange: " + e, e);
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Mon May 21 08:11:23 2007
@@ -71,7 +71,7 @@
                 getMessageStore().createExchange(exchange);
             } catch (InternalErrorException e)
             {
-                throw new AMQException("problem registering excahgne " + exchange, e);
+                throw new AMQException(null, "problem registering excahgne " + exchange, e);
             }
         }
     }
@@ -99,14 +99,14 @@
                     getMessageStore().removeExchange(e);
                 } catch (InternalErrorException e1)
                 {
-                    throw new AMQException("Problem unregistering Exchange " + name, e1);
+                    throw new AMQException(null, "Problem unregistering Exchange " + name, e1);
                 }
             }
             e.close();
         }
         else
         {
-            throw new AMQException("Unknown exchange " + name);
+            throw new AMQException(null, "Unknown exchange " + name, null);
         }
     }
 
@@ -138,7 +138,7 @@
         // TODO: check where the exchange is validated
         if (exch == null)
         {
-            throw new AMQException("Exchange '" + exchange + "' does not exist");
+            throw new AMQException(null, "Exchange '" + exchange + "' does not exist", null);
         }
         exch.route(payload);
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Mon May 21 08:11:23 2007
@@ -126,7 +126,7 @@
         catch (JMException ex)
         {
             _logger.error("Exception occured in creating the direct exchange mbean", ex);
-            throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+            throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
         }
     }
 
@@ -156,8 +156,8 @@
 
         if (!_index.remove(routingKey, queue))
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
-                                   " with routing key " + routingKey + ". No queue was registered with that routing key");
+            throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey + ". No queue was registered with that routing key", null);
         }
     }
 
@@ -171,7 +171,7 @@
             String msg = "Routing key " + routingKey + " is not known to " + this;
             if (info.isMandatory())
             {
-                throw new NoRouteException(msg, payload);
+                throw new NoRouteException(msg, payload, null);
             }
             else
             {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Mon May 21 08:11:23 2007
@@ -216,7 +216,7 @@
             if (info.isMandatory())
             {
                 String msg = "Topic " + routingKey + " is not known to " + this;
-                throw new NoRouteException(msg, payload);
+                throw new NoRouteException(msg, payload, null);
             }
             else
             {
@@ -276,15 +276,15 @@
         List<AMQQueue> queues = _routingKey2queues.get(routingKey);
         if (queues == null)
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
-                                   " with routing key " + routingKey + ". No queue was registered with that routing key");
+            throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey + ". No queue was registered with that routing key", null);
 
         }
         boolean removedQ = queues.remove(queue);
         if (!removedQ)
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
-                                   " with routing key " + routingKey);
+            throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey, null);
         }
         if (queues.isEmpty())
         {
@@ -301,7 +301,7 @@
         catch (JMException ex)
         {
             _logger.error("Exception occured in creating the topic exchenge mbean", ex);
-            throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
+            throw new AMQException(null, "Exception occured in creating the topic exchenge mbean", ex);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInUseException.java Mon May 21 08:11:23 2007
@@ -38,8 +38,8 @@
  */
 public class ExchangeInUseException extends AMQException
 {
-    public ExchangeInUseException(String exchangeName)
+    public ExchangeInUseException(String exchangeName, Throwable cause)
     {
-        super("Exchange " + exchangeName + " is currently in use");
+        super(null, "Exchange " + exchangeName + " is currently in use", cause);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Mon May 21 08:11:23 2007
@@ -98,7 +98,7 @@
         catch (JMException ex)
         {
             _logger.error("Exception occured in creating the direct exchange mbean", ex);
-            throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
+            throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
         }
     }
 
@@ -129,8 +129,8 @@
 
         if (!_queues.remove(queue))
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
-                                   ". ");
+            throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   ". ", null);
         }
     }
 
@@ -143,7 +143,7 @@
             String msg = "No queues bound to " + this;
             if (publishInfo.isMandatory())
             {
-                throw new NoRouteException(msg, payload);
+                throw new NoRouteException(msg, payload, null);
             }
             else
             {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Mon May 21 08:11:23 2007
@@ -231,7 +231,7 @@
 
             if (payload.getMessagePublishInfo().isMandatory())
             {
-                throw new NoRouteException(msg, payload);
+                throw new NoRouteException(msg, payload, null);
             }
             else
             {
@@ -284,7 +284,7 @@
         catch (JMException ex)
         {
             _logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
-            throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex);
+            throw new AMQException(null, "Exception occured in creating the HeadersExchangeMBean", ex);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java Mon May 21 08:11:23 2007
@@ -36,9 +36,9 @@
  */
 public class NoRouteException extends RequiredDeliveryException
 {
-    public NoRouteException(String msg, AMQMessage message)
+    public NoRouteException(String msg, AMQMessage message, Throwable cause)
     {
-        super(msg, message);
+        super(msg, message, cause);
     }
 
     public AMQConstant getReplyCode()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Mon May 21 08:11:23 2007
@@ -33,6 +33,8 @@
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.ExistingExclusiveSubscriptionException;
+import org.apache.qpid.server.queue.ExistingSubscriptionPreventsExclusiveException;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -146,14 +148,14 @@
                         AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
                         msg));	// replyText
                 }
-                catch (AMQQueue.ExistingExclusiveSubscription e)
+                catch (ExistingExclusiveSubscriptionException e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "
                                                    + queue.getName()
                                                    + " as it already has an existing exclusive consumer");
                 }
-                catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+                catch (ExistingSubscriptionPreventsExclusiveException e)
                 {
                     throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
                                                    "Cannot subscribe to queue "

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Mon May 21 08:11:23 2007
@@ -69,7 +69,7 @@
         SaslServer ss = session.getSaslServer();
         if (ss == null)
         {
-            throw new AMQException("No SASL context set up in session");
+            throw new AMQException(null, "No SASL context set up in session", null);
         }
 
         AuthenticationResult authResult = authMgr.authenticate(ss, body.response);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Mon May 21 08:11:23 2007
@@ -138,7 +138,7 @@
         catch (SaslException e)
         {
             disposeSaslServer(session);
-            throw new AMQException("SASL error: " + e, e);
+            throw new AMQException(null, "SASL error: " + e, e);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Mon May 21 08:11:23 2007
@@ -79,7 +79,7 @@
         AMQShortString routingKey = body.routingKey;
         if (exchangeName == null)
         {
-            throw new AMQException("Exchange exchange must not be null");
+            throw new AMQException(null, "Exchange exchange must not be null", null);
         }
         Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
         AMQFrame response;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Mon May 21 08:11:23 2007
@@ -96,7 +96,7 @@
             else if (!exchange.getType().equals(body.type))
             {
 
-                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());    
+                throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(), null);    
             }
 
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Mon May 21 08:11:23 2007
@@ -110,7 +110,7 @@
                             store.createQueue(queue);
                         } catch (Exception e)
                         {
-                           throw new AMQException("Problem when creating queue " + queue,  e);
+                           throw new AMQException(null, "Problem when creating queue " + queue,  e);
                         }
                     }
                     queueRegistry.registerQueue(queue);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Mon May 21 08:11:23 2007
@@ -114,7 +114,7 @@
                         store.destroyQueue(queue);
                     } catch (Exception e)
                     {
-                      throw new AMQException("problem when destroying queue " + queue, e);
+                      throw new AMQException(null, "problem when destroying queue " + queue, e);
                     }
                 }
                 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Mon May 21 08:11:23 2007
@@ -71,7 +71,7 @@
         }
         catch (JMException e)
         {
-            throw new AMQException("Error registering managed object " + this + ": " + e, e);
+            throw new AMQException(null, "Error registering managed object " + this + ": " + e, e);
         }
     }
 
@@ -88,7 +88,7 @@
         }
         catch (JMException e)
         {
-            throw new AMQException("Error unregistering managed object: " + this + ": " + e, e);
+            throw new AMQException(null, "Error unregistering managed object: " + this + ": " + e, e);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon May 21 08:11:23 2007
@@ -168,7 +168,7 @@
         catch (JMException ex)
         {
             _logger.error("AMQProtocolSession MBean creation has failed ", ex);
-            throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
+            throw new AMQException(null, "AMQProtocolSession MBean creation has failed ", ex);
         }
     }
 
@@ -199,7 +199,7 @@
         }
         else
         {
-            throw new UnknnownMessageTypeException(message);
+            throw new UnknnownMessageTypeException(message, null);
         }
     }
 
@@ -321,7 +321,7 @@
                 }
                 if (!wasAnyoneInterested)
                 {
-                    throw new AMQNoMethodHandlerException(evt);
+                    throw new AMQNoMethodHandlerException(evt, null);
                 }
             }
             catch (AMQChannelException e)
@@ -425,7 +425,7 @@
         AMQChannel channel = getChannel(channelId);
         if (channel == null)
         {
-            throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+            throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null);
         }
         return channel;
     }
@@ -454,14 +454,14 @@
     {
         if (_closed)
         {
-            throw new AMQException("Session is closed");
+            throw new AMQException(null, "Session is closed", null);
         }
 
         final int channelId = channel.getChannelId();
 
         if (_closingChannelsList.contains(channelId))
         {
-            throw new AMQException("Session is marked awaiting channel close");
+            throw new AMQException(null, "Session is marked awaiting channel close", null);
         }
 
         if (_channelMap.size() == _maxNoOfChannels)
@@ -469,7 +469,7 @@
             String errorMessage = toString() + ": maximum number of channels has been reached (" +
                                   _maxNoOfChannels + "); can't create channel";
             _logger.error(errorMessage);
-            throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
+            throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null);
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQNoMethodHandlerException.java Mon May 21 08:11:23 2007
@@ -39,8 +39,8 @@
  */
 public class AMQNoMethodHandlerException extends AMQException
 {
-    public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt)
+    public AMQNoMethodHandlerException(AMQMethodEvent<AMQMethodBody> evt, Throwable cause)
     {
-        super("AMQMethodEvent " + evt + " was not processed by any listener on Broker.");
+        super(null, "AMQMethodEvent " + evt + " was not processed by any listener on Broker.", cause);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/UnknnownMessageTypeException.java Mon May 21 08:11:23 2007
@@ -39,8 +39,8 @@
  */
 public class UnknnownMessageTypeException extends AMQException
 {
-    public UnknnownMessageTypeException(AMQDataBlock message)
+    public UnknnownMessageTypeException(AMQDataBlock message, Throwable cause)
     {
-        super("Unknown message type: " + message.getClass().getName() + ": " + message);
+        super(null, "Unknown message type: " + message.getClass().getName() + ": " + message, cause);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=540165&r1=540164&r2=540165
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon May 21 08:11:23 2007
@@ -20,31 +20,33 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/** Combines the information that make up a deliverable message into a more manageable form. */
+
+import org.apache.log4j.Logger;
+
+import org.apache.mina.common.ByteBuffer;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.messageStore.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.messageStore.StorableMessage;
 import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-
-/** Combines the information that make up a deliverable message into a more manageable form. */
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.TransactionalContext;
 
 /**
  * Combines the information that make up a deliverable message into a more manageable form.
@@ -56,7 +58,7 @@
     // The ordered list of queues into which this message is enqueued.
     private List<StorableQueue> _queues = new LinkedList<StorableQueue>();
     // Indicates whether this message is staged
-     private boolean _isStaged = false;
+    private boolean _isStaged = false;
 
     /**
      * Used in clustering
@@ -89,18 +91,15 @@
      */
     private boolean _immediate;
 
-    //    private Subscription _takenBySubcription;
-    //    private AtomicBoolean _taken = new AtomicBoolean(false);
+    // private Subscription _takenBySubcription;
+    // private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
-
     private Set<Subscription> _rejectedBy = null;
 
-
     private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
     private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
-
     private final int hashcode = System.identityHashCode(this);
     private long _expiration;
 
@@ -111,8 +110,10 @@
 
     public void setExpiration()
     {
-        long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
-        long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+        long expiration =
+            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+        long timestamp =
+            ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
 
         if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
         {
@@ -125,10 +126,10 @@
             {
                 if (timestamp != 0L)
                 {
-                    //todo perhaps use arrival time
+                    // todo perhaps use arrival time
                     long diff = (System.currentTimeMillis() - timestamp);
 
-                    if (diff > 1000L || diff < 1000L)
+                    if ((diff > 1000L) || (diff < 1000L))
                     {
                         _expiration = expiration + diff;
                     }
@@ -159,11 +160,12 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+                return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
             }
             catch (AMQException e)
             {
                 _log.error("Unable to get body count: " + e, e);
+
                 return false;
             }
         }
@@ -173,7 +175,10 @@
             try
             {
 
-                AMQBody cb = getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(), _messageId, ++_index));
+                AMQBody cb =
+                    getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+                            _messageId, ++_index));
+
                 return new AMQFrame(_channel, cb);
             }
             catch (AMQException e)
@@ -209,11 +214,12 @@
         {
             try
             {
-                return _index < _messageHandle.getBodyCount(getStoreContext(), _messageId) - 1;
+                return _index < (_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
             }
             catch (AMQException e)
             {
                 _log.error("Error getting body count: " + e, e);
+
                 return false;
             }
         }
@@ -236,8 +242,7 @@
         }
     }
 
-    public AMQMessage(Long messageId, MessagePublishInfo info,
-                      TransactionalContext txnContext)
+    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext)
     {
         _messageId = messageId;
         _txnContext = txnContext;
@@ -257,8 +262,7 @@
      * @throws AMQException
      */
     public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory, TransactionalContext txnConext)
-            throws
-            AMQException
+        throws AMQException
     {
         _messageId = messageId;
         _messageHandle = factory.createMessageHandle(store, this, true);
@@ -274,10 +278,8 @@
      * @param txnContext
      * @param contentHeader
      */
-    public AMQMessage(Long messageId, MessagePublishInfo info,
-                      TransactionalContext txnContext, ContentHeaderBody contentHeader)
-            throws
-            AMQException
+    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+        ContentHeaderBody contentHeader) throws AMQException
     {
         this(messageId, info, txnContext);
         setContentHeaderBody(contentHeader);
@@ -294,13 +296,9 @@
      * @param contentBodies
      * @throws AMQException
      */
-    public AMQMessage(Long messageId, MessagePublishInfo info,
-                      TransactionalContext txnContext,
-                      ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
-                      List<ContentChunk> contentBodies, MessageStore messageStore, StoreContext storeContext,
-                      MessageHandleFactory messageHandleFactory)
-            throws
-            AMQException
+    public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
+        ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
+        MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
     {
         this(messageId, info, txnContext, contentHeader);
         _transientMessageData.setDestinationQueues(destinationQueues);
@@ -311,9 +309,7 @@
         }
     }
 
-    protected AMQMessage(AMQMessage msg)
-            throws
-            AMQException
+    protected AMQMessage(AMQMessage msg) throws AMQException
     {
         _messageId = msg._messageId;
         _messageHandle = msg._messageHandle;
@@ -322,9 +318,9 @@
         _transientMessageData = msg._transientMessageData;
     }
 
-    //========================================================================
+    // ========================================================================
     // Interface  StorableMessage
-    //========================================================================
+    // ========================================================================
 
     public long getMessageId()
     {
@@ -342,10 +338,12 @@
             result = new byte[headerBody.getSize()];
             bufferedResult = ByteBuffer.wrap(result);
             headerBody.writePayload(bufferedResult);
-        } catch (AMQException e)
+        }
+        catch (AMQException e)
         {
             _log.error("Error when getting message header", e);
         }
+
         return result;
     }
 
@@ -355,10 +353,12 @@
         try
         {
             result = _messageHandle.getContentHeaderBody(_txnContext.getStoreContext(), _messageId).getSize();
-        } catch (AMQException e)
+        }
+        catch (AMQException e)
         {
             _log.error("Error when getting message header size", e);
         }
+
         return result;
     }
 
@@ -372,7 +372,7 @@
         return _messageHandle.getMessagePayload().length;
     }
 
-       public boolean isEnqueued()
+    public boolean isEnqueued()
     {
         return _queues.size() > 0;
     }
@@ -401,6 +401,7 @@
         {
             _log.debug("The queue position is " + _queues.indexOf(queue));
         }
+
         return _queues.indexOf(queue);
     }
 
@@ -424,44 +425,40 @@
         return new BodyContentIterator();
     }
 
-    public ContentHeaderBody getContentHeaderBody()
-            throws
-            AMQException
+    public ContentHeaderBody getContentHeaderBody() throws AMQException
     {
         if (_transientMessageData != null)
         {
             return _transientMessageData.getContentHeaderBody();
-        } else
+        }
+        else
         {
             return _messageHandle.getContentHeaderBody(getStoreContext(), _messageId);
         }
     }
 
-    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
-            throws
-            AMQException
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
     {
         _transientMessageData.setContentHeaderBody(contentHeaderBody);
     }
 
     public void routingComplete(MessageStore store, StoreContext storeContext, MessageHandleFactory factory)
-            throws
-            AMQException
+        throws AMQException
     {
         final boolean persistent = isPersistent();
         _messageHandle = factory.createMessageHandle(store, this, persistent);
-        //if (persistent)
-       // {
-            _txnContext.beginTranIfNecessary();
-       // }
+        // if (persistent)
+        // {
+        _txnContext.beginTranIfNecessary();
+        // }
 
         // enqueuing the messages ensure that if required the destinations are recorded to a
         // persistent store
 
-       // for (AMQQueue q : _transientMessageData.getDestinationQueues())
-       // {
-       //     _messageHandle.enqueue(storeContext, _messageId, q);
-       // }
+        // for (AMQQueue q : _transientMessageData.getDestinationQueues())
+        // {
+        // _messageHandle.enqueue(storeContext, _messageId, q);
+        // }
 
         if (_transientMessageData.getContentHeaderBody().bodySize == 0)
         {
@@ -469,9 +466,7 @@
         }
     }
 
-    public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk)
-            throws
-            AMQException
+    public boolean addContentBodyFrame(StoreContext storeContext, ContentChunk contentChunk) throws AMQException
     {
         _transientMessageData.addBodyLength(contentChunk.getSize());
         final boolean allContentReceived = isAllContentReceived();
@@ -479,21 +474,20 @@
         if (allContentReceived)
         {
             deliver(storeContext);
+
             return true;
-        } else
+        }
+        else
         {
             return false;
         }
     }
 
-    public boolean isAllContentReceived()
-            throws
-            AMQException
+    public boolean isAllContentReceived() throws AMQException
     {
         return _transientMessageData.isAllContentReceived();
     }
 
-
     /**
      * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
      * operation.
@@ -501,6 +495,7 @@
     public AMQMessage takeReference()
     {
         _referenceCount.incrementAndGet();
+
         return this;
     }
 
@@ -510,10 +505,10 @@
     protected void incrementReference()
     {
         _referenceCount.incrementAndGet();
-//        if (_log.isDebugEnabled())
-//        {
-//            _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-//        }
+        // if (_log.isDebugEnabled())
+        // {
+        // _log.debug("Ref count on message " + debugIdentity() + " incremented " + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+        // }
     }
 
     /**
@@ -524,9 +519,7 @@
      * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
      *                                 failed
      */
-    public void decrementReference(StoreContext storeContext)
-            throws
-            MessageCleanupException
+    public void decrementReference(StoreContext storeContext) throws MessageCleanupException
     {
         int count = _referenceCount.decrementAndGet();
 
@@ -538,10 +531,10 @@
         {
             try
             {
-//                if (_log.isDebugEnabled())
-//                {
-//                    _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-//                }
+                // if (_log.isDebugEnabled())
+                // {
+                // _log.debug("Decremented ref count on message " + debugIdentity() + " is zero; removing message" + Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+                // }
 
                 // must check if the handle is null since there may be cases where we decide to throw away a message
                 // and the handle has not yet been constructed
@@ -552,15 +545,17 @@
             }
             catch (AMQException e)
             {
-                //to maintain consistency, we revert the count
+                // to maintain consistency, we revert the count
                 incrementReference();
-                throw new MessageCleanupException(_messageId, e);
+                throw new MessageCleanupException("Failed to cleanup message with id " + _messageId, e);
             }
-        } else
+        }
+        else
         {
             if (count < 0)
             {
-                throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.");
+                throw new MessageCleanupException("Reference count for message id " + debugIdentity() + " has gone below 0.",
+                    null);
             }
         }
     }
@@ -587,7 +582,7 @@
 
     public boolean isTaken(AMQQueue queue)
     {
-        //return _taken.get();
+        // return _taken.get();
 
         synchronized (this)
         {
@@ -604,15 +599,15 @@
 
     public boolean taken(AMQQueue queue, Subscription sub)
     {
-//        if (_taken.getAndSet(true))
-//        {
-//            return true;
-//        }
-//        else
-//        {
-//            _takenBySubcription = sub;
-//            return false;
-//        }
+        // if (_taken.getAndSet(true))
+        // {
+        // return true;
+        // }
+        // else
+        // {
+        // _takenBySubcription = sub;
+        // return false;
+        // }
 
         synchronized (this)
         {
@@ -625,10 +620,12 @@
             if (taken.getAndSet(true))
             {
                 return true;
-            } else
+            }
+            else
             {
                 _takenMap.put(queue, taken);
                 _takenBySubcriptionMap.put(queue, sub);
+
                 return false;
             }
         }
@@ -641,9 +638,8 @@
             _log.trace("Releasing Message:" + debugIdentity());
         }
 
-//        _taken.set(false);
-//        _takenBySubcription = null;
-
+        // _taken.set(false);
+        // _takenBySubcription = null;
 
         synchronized (this)
         {
@@ -651,7 +647,8 @@
             if (taken == null)
             {
                 taken = new AtomicBoolean(false);
-            } else
+            }
+            else
             {
                 taken.set(false);
             }
@@ -672,9 +669,11 @@
         if (_tokens.contains(token))
         {
             return true;
-        } else
+        }
+        else
         {
             _tokens.add(token);
+
             return false;
         }
     }
@@ -687,28 +686,23 @@
      * @param queue the queue
      * @throws org.apache.qpid.AMQException if there is an error enqueuing the message
      */
-    public void enqueue(AMQQueue queue)
-            throws
-            AMQException
+    public void enqueue(AMQQueue queue) throws AMQException
     {
         _transientMessageData.addDestinationQueue(queue);
     }
 
-    public void dequeue(StoreContext storeContext, AMQQueue queue)
-            throws
-            AMQException
+    public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
     {
         _messageHandle.dequeue(storeContext, _messageId, queue);
     }
 
-    public boolean isPersistent()
-            throws
-            AMQException
+    public boolean isPersistent() throws AMQException
     {
         if (_transientMessageData != null)
         {
             return _transientMessageData.isPersistent();
-        } else
+        }
+        else
         {
             return _messageHandle.isPersistent(getStoreContext(), _messageId);
         }
@@ -720,29 +714,27 @@
      * @throws NoConsumersException if the message is marked for immediate delivery but has not been marked as delivered
      *                              to a consumer
      */
-    public void checkDeliveredToConsumer()
-            throws
-            NoConsumersException
+    public void checkDeliveredToConsumer() throws NoConsumersException
     {
 
         if (_immediate && !_deliveredToConsumer)
         {
-            throw new NoConsumersException(this);
+            throw new NoConsumersException(this, null);
         }
     }
 
-    public MessagePublishInfo getMessagePublishInfo()
-            throws
-            AMQException
+    public MessagePublishInfo getMessagePublishInfo() throws AMQException
     {
         MessagePublishInfo pb;
         if (_transientMessageData != null)
         {
             pb = _transientMessageData.getMessagePublishInfo();
-        } else
+        }
+        else
         {
             pb = _messageHandle.getMessagePublishInfo(getStoreContext(), _messageId);
         }
+
         return pb;
     }
 
@@ -773,7 +765,7 @@
      */
     public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
     {
-        //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+        // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
 
         if (_expiration != 0L)
         {
@@ -782,6 +774,7 @@
             if (now > _expiration)
             {
                 dequeue(storecontext, queue);
+
                 return true;
             }
         }
@@ -795,9 +788,7 @@
         _deliveredToConsumer = true;
     }
 
-    private void deliver(StoreContext storeContext)
-            throws
-            AMQException
+    private void deliver(StoreContext storeContext) throws AMQException
     {
         // we get a reference to the destination queues now so that we can clear the
         // transient message data as quickly as possible
@@ -806,12 +797,13 @@
         {
             _log.debug("Delivering message " + debugIdentity() + " to " + destinationQueues);
         }
+
         try
         {
             // first we allow the handle to know that the message has been fully received. This is useful if it is
             // maintaining any calculated values based on content chunks
-            _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId, _transientMessageData.getMessagePublishInfo(),
-                    _transientMessageData.getContentHeaderBody());
+            _messageHandle.setPublishAndContentHeaderBody(storeContext, _messageId,
+                _transientMessageData.getMessagePublishInfo(), _transientMessageData.getContentHeaderBody());
 
             // we then allow the transactional context to do something with the message content
             // now that it has all been received, before we attempt delivery
@@ -821,9 +813,9 @@
 
             for (AMQQueue q : destinationQueues)
             {
-                //Increment the references to this message for each queue delivery.
+                // Increment the references to this message for each queue delivery.
                 incrementReference();
-                //normal deliver so add this message at the end.
+                // normal deliver so add this message at the end.
                 _txnContext.deliver(this, q, false);
             }
         }
@@ -835,182 +827,181 @@
         }
     }
 
-/*
-    public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      getContentHeaderBody());
-
-        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-        if (bodyCount == 0)
+    /*
+        public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+                throws AMQException
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                       contentHeader);
+            ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
+            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                          getContentHeaderBody());
 
-            protocolSession.writeFrame(compositeBlock);
-        }
-        else
-        {
+            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+            if (bodyCount == 0)
+            {
+                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                                           contentHeader);
 
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+                protocolSession.writeFrame(compositeBlock);
+            }
+            else
+            {
+
+                //
+                // Optimise the case where we have a single content body. In that case we create a composite block
+                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+                //
+                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+                protocolSession.writeFrame(compositeBlock);
+
+                //
+                // Now start writing out the other content bodies
+                //
+                for (int i = 1; i < bodyCount; i++)
+                {
+                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                }
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-            protocolSession.writeFrame(compositeBlock);
 
-            //
-            // Now start writing out the other content bodies
-            //
-            for (int i = 1; i < bodyCount; i++)
-            {
-                cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
         }
 
+        public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
+        {
+            ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
+            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                          getContentHeaderBody());
 
-    }
+            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
+            if (bodyCount == 0)
+            {
+                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+                                                                                           contentHeader);
+                protocolSession.writeFrame(compositeBlock);
+            }
+            else
+            {
 
-    public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-        ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      getContentHeaderBody());
+                //
+                // Optimise the case where we have a single content body. In that case we create a composite block
+                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+                //
+                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
 
-        final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-        if (bodyCount == 0)
-        {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                       contentHeader);
-            protocolSession.writeFrame(compositeBlock);
-        }
-        else
-        {
+                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+                protocolSession.writeFrame(compositeBlock);
 
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+                //
+                // Now start writing out the other content bodies
+                //
+                for (int i = 1; i < bodyCount; i++)
+                {
+                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
+                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                }
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-            protocolSession.writeFrame(compositeBlock);
 
-            //
-            // Now start writing out the other content bodies
-            //
-            for (int i = 1; i < bodyCount; i++)
-            {
-                cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
         }
 
 
-    }
-
-
-    private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        MessagePublishInfo pb = getMessagePublishInfo();
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
-                                                                deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+        private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+                throws AMQException
+        {
+            MessagePublishInfo pb = getMessagePublishInfo();
+            AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
+                                                                    deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+                                                                    pb.getRoutingKey());
+            ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+            deliverFrame.writePayload(buf);
+            buf.flip();
+            return buf;
+        }
+
+        private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
+                throws AMQException
+        {
+            MessagePublishInfo pb = getMessagePublishInfo();
+            AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+                                                                protocolSession.getProtocolMajorVersion(),
+                                                                protocolSession.getProtocolMinorVersion(),
+                                                                deliveryTag, pb.getExchange(),
+                                                                queueSize,
+                                                                _messageHandle.isRedelivered(),
                                                                 pb.getRoutingKey());
-        ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
-        deliverFrame.writePayload(buf);
-        buf.flip();
-        return buf;
-    }
-
-    private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        MessagePublishInfo pb = getMessagePublishInfo();
-        AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
-                                                            protocolSession.getProtocolMajorVersion(),
-                                                            protocolSession.getProtocolMinorVersion(),
-                                                            deliveryTag, pb.getExchange(),
-                                                            queueSize,
-                                                            _messageHandle.isRedelivered(),
-                                                            pb.getRoutingKey());
-        ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
-        getOkFrame.writePayload(buf);
-        buf.flip();
-        return buf;
-    }
-
-    private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
-    {
-        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
-                                                              protocolSession.getProtocolMajorVersion(),
-                                                              protocolSession.getProtocolMinorVersion(),
-                                                              getMessagePublishInfo().getExchange(),
-                                                              replyCode, replyText,
-                                                              getMessagePublishInfo().getRoutingKey());
-        ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
-        returnFrame.writePayload(buf);
-        buf.flip();
-        return buf;
-    }
-
-    public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-        ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      getContentHeaderBody());
-
-        Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
-            protocolSession.writeFrame(compositeBlock);
+            ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
+            getOkFrame.writePayload(buf);
+            buf.flip();
+            return buf;
         }
-        else
+
+        private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
         {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
-                                                                             new AMQDataBlock[]{contentHeader});
-            protocolSession.writeFrame(compositeBlock);
+            AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+                                                                  protocolSession.getProtocolMajorVersion(),
+                                                                  protocolSession.getProtocolMinorVersion(),
+                                                                  getMessagePublishInfo().getExchange(),
+                                                                  replyCode, replyText,
+                                                                  getMessagePublishInfo().getRoutingKey());
+            ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
+            returnFrame.writePayload(buf);
+            buf.flip();
+            return buf;
         }
 
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
+        public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
+                throws AMQException
         {
-            protocolSession.writeFrame(bodyFrameIterator.next());
+            ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
+
+            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                          getContentHeaderBody());
+
+            Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
+            //
+            // Optimise the case where we have a single content body. In that case we create a composite block
+            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+            //
+            if (bodyFrameIterator.hasNext())
+            {
+                AMQDataBlock firstContentBody = bodyFrameIterator.next();
+                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+                protocolSession.writeFrame(compositeBlock);
+            }
+            else
+            {
+                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
+                                                                                 new AMQDataBlock[]{contentHeader});
+                protocolSession.writeFrame(compositeBlock);
+            }
+
+            //
+            // Now start writing out the other content bodies
+            // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
+            //
+            while (bodyFrameIterator.hasNext())
+            {
+                protocolSession.writeFrame(bodyFrameIterator.next());
+            }
         }
-    }
-*/
+     */
 
     public AMQMessageHandle getMessageHandle()
     {
         return _messageHandle;
     }
 
-
     public long getSize()
     {
         try
@@ -1022,15 +1013,13 @@
         catch (AMQException e)
         {
             _log.error(e.toString(), e);
+
             return 0;
         }
 
     }
 
-
-    public void restoreTransientMessageData()
-            throws
-            AMQException
+    public void restoreTransientMessageData() throws AMQException
     {
         TransientMessageData transientMessageData = new TransientMessageData();
         transientMessageData.setMessagePublishInfo(getMessagePublishInfo());
@@ -1039,25 +1028,23 @@
         _transientMessageData = transientMessageData;
     }
 
-
     public void clearTransientMessageData()
     {
         _transientMessageData = null;
     }
 
-
     public String toString()
     {
-//        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
-//               _taken + " by :" + _takenBySubcription;
+        // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
+        // _taken + " by :" + _takenBySubcription;
 
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
-                _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
+            + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
     }
 
     public Subscription getDeliveredSubscription(AMQQueue queue)
     {
-//        return _takenBySubcription;
+        // return _takenBySubcription;
         synchronized (this)
         {
             return _takenBySubcriptionMap.get(queue);
@@ -1074,7 +1061,8 @@
             }
 
             _rejectedBy.add(subscription);
-        } else
+        }
+        else
         {
             _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
         }
@@ -1084,10 +1072,11 @@
     {
         boolean rejected = _rejectedBy != null;
 
-        if (rejected)  // We have subscriptions that rejected this message
+        if (rejected) // We have subscriptions that rejected this message
         {
             return _rejectedBy.contains(subscription);
-        } else // This messasge hasn't been rejected yet.
+        }
+        else // This messasge hasn't been rejected yet.
         {
             return rejected;
         }



Mime
View raw message