qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r829675 [8/11] - in /qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/bin/ broker/src/main/java/org/apac...
Date Sun, 25 Oct 2009 22:59:05 GMT
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Oct 25 22:58:57 2009
@@ -25,6 +25,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -33,6 +35,8 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.SubscriptionActor;
 import org.apache.qpid.server.logging.messages.SubscriptionMessages;
@@ -45,7 +49,6 @@
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.StoreContext;
 
 /**
  * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
@@ -65,11 +68,16 @@
 
 
     private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
-    private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
+    private AMQQueue.Context _queueContext;
+
     private final ClientDeliveryMethod _deliveryMethod;
     private final RecordDeliveryMethod _recordMethod;
 
-    private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+    private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+
+    private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+
     private final Lock _stateChangeLock;
 
     private static final AtomicLong idGenerator = new AtomicLong(0);
@@ -78,6 +86,7 @@
     private LogSubject _logSubject;
     private LogActor _logActor;
 
+
     static final class BrowserSubscription extends SubscriptionImpl
     {
         public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
@@ -153,38 +162,28 @@
         @Override
         public void send(QueueEntry entry) throws AMQException
         {
+            // if we do not need to wait for client acknowledgements
+            // we can decrement the reference count immediately.
 
-            StoreContext storeContext = getChannel().getStoreContext();
-            try
-            { // if we do not need to wait for client acknowledgements
-                // we can decrement the reference count immediately.
-
-                // By doing this _before_ the send we ensure that it
-                // doesn't get sent if it can't be dequeued, preventing
-                // duplicate delivery on recovery.
+            // By doing this _before_ the send we ensure that it
+            // doesn't get sent if it can't be dequeued, preventing
+            // duplicate delivery on recovery.
+
+            // The send may of course still fail, in which case, as
+            // the message is unacked, it will be lost.
+            entry.dequeue();
 
-                // The send may of course still fail, in which case, as
-                // the message is unacked, it will be lost.
-                entry.dequeue(storeContext);
 
+            synchronized (getChannel())
+            {
+                long deliveryTag = getChannel().getNextDeliveryTag();
 
-                synchronized (getChannel())
-                {
-                    long deliveryTag = getChannel().getNextDeliveryTag();
-
-                    sendToClient(entry, deliveryTag);
+                sendToClient(entry, deliveryTag);
 
-                }
-                entry.dispose(storeContext);
             }
-            finally
-            {
-                //Only set delivered if it actually was writen successfully..
-                // using a try->finally would set it even if an error occured.
-                // Is this what we want?
+            entry.dispose();
+
 
-                entry.setDeliveredToSubscription();
-            }
         }
 
         @Override
@@ -225,39 +224,30 @@
         public void send(QueueEntry entry) throws AMQException
         {
 
-            try
-            { // if we do not need to wait for client acknowledgements
-                // we can decrement the reference count immediately.
-
-                // By doing this _before_ the send we ensure that it
-                // doesn't get sent if it can't be dequeued, preventing
-                // duplicate delivery on recovery.
+            // if we do not need to wait for client acknowledgements
+            // we can decrement the reference count immediately.
 
-                // The send may of course still fail, in which case, as
-                // the message is unacked, it will be lost.
+            // By doing this _before_ the send we ensure that it
+            // doesn't get sent if it can't be dequeued, preventing
+            // duplicate delivery on recovery.
 
-                synchronized (getChannel())
-                {
-                    long deliveryTag = getChannel().getNextDeliveryTag();
+            // The send may of course still fail, in which case, as
+            // the message is unacked, it will be lost.
 
+            synchronized (getChannel())
+            {
+                long deliveryTag = getChannel().getNextDeliveryTag();
 
-                    recordMessageDelivery(entry, deliveryTag);
-                    sendToClient(entry, deliveryTag);
 
+                recordMessageDelivery(entry, deliveryTag);
+                sendToClient(entry, deliveryTag);
 
-                }
-            }
-            finally
-            {
-                //Only set delivered if it actually was writen successfully..
-                // using a try->finally would set it even if an error occured.
-                // Is this what we want?
 
-                entry.setDeliveredToSubscription();
             }
         }
 
 
+
     }
 
 
@@ -268,7 +258,7 @@
     private final AMQShortString _consumerTag;
 
 
-    private final boolean _noLocal;
+    private boolean _noLocal;
 
     private final FlowCreditManager _creditManager;
 
@@ -423,43 +413,35 @@
 
     public boolean hasInterest(QueueEntry entry)
     {
+
+        
+
+
         //check that the message hasn't been rejected
         if (entry.isRejectedBy(this))
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
+                _logger.debug("Subscription:" + this + " rejected message:" + entry);
             }
 //            return false;
         }
 
         if (_noLocal)
         {
-            //todo - client id should be recoreded so we don't have to handle
+
+            AMQMessage message = (AMQMessage) entry.getMessage();
+
+            //todo - client id should be recorded so we don't have to handle
             // the case where this is null.
-            final Object publisherId = entry.getMessage().getPublisherClientInstance();
+            final Object publisher = message.getPublisherIdentifier();
 
             // We don't want local messages so check to see if message is one we sent
-            Object localInstance;
+            Object localInstance = getProtocolSession();
 
-            if (publisherId != null && (getProtocolSession().getClientProperties() != null) &&
-                (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+            if(publisher.equals(localInstance))
             {
-                if(publisherId.equals(localInstance))
-                {
-                    return false;
-                }
-            }
-            else
-            {
-
-                localInstance = getProtocolSession().getClientIdentifier();
-
-                //todo - client id should be recoreded so we don't have to do the null check
-                if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
-                {
-                    return false;
-                }
+                return false;
             }
 
 
@@ -468,7 +450,7 @@
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
+            _logger.debug("(" + this + ") checking filters for message (" + entry);
         }
         return checkFilters(entry);
 
@@ -483,7 +465,7 @@
 
     private boolean checkFilters(QueueEntry msg)
     {
-        return (_filters == null) || _filters.allAllow(msg.getMessage());
+        return (_filters == null) || _filters.allAllow(msg);
     }
 
     public boolean isAutoClose()
@@ -550,11 +532,6 @@
         _stateChangeLock.unlock();
     }
 
-    public void resend(final QueueEntry entry) throws AMQException
-    {
-        _queue.resend(entry, this);
-    }
-
     public AMQChannel getChannel()
     {
         return _channel;
@@ -585,12 +562,18 @@
         return _queue;
     }
 
+    public void onDequeue(final QueueEntry queueEntry)
+    {
+        restoreCredit(queueEntry);
+    }
+
     public void restoreCredit(final QueueEntry queueEntry)
     {
-        _creditManager.addCredit(1, queueEntry.getSize());
+        _creditManager.restoreCredit(1, queueEntry.getSize());
     }
 
 
+
     public void creditStateChanged(boolean hasCredit)
     {
 
@@ -628,22 +611,14 @@
     }
 
 
-    public QueueEntry getLastSeenEntry()
+    public AMQQueue.Context getQueueContext()
     {
-        QueueEntry entry = _queueContext.get();
-
-        if(_logger.isDebugEnabled())
-        {
-            _logger.debug(_logActor + ": lastSeenEntry: " + (entry == null ? "null" : entry.debugIdentity()));
-        }        
-
-        return entry;
+        return _queueContext;
     }
 
-    public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
+    public void setQueueContext(AMQQueue.Context context)
     {
-        _logger.debug(debugIdentity() + " Setting Last Seen To:" + (newvalue == null ? "nullNV" : newvalue.debugIdentity()));
-        return _queueContext.compareAndSet(expected,newvalue);
+        _queueContext = context;
     }
 
 
@@ -670,4 +645,43 @@
         return _owningState;
     }
 
+    public QueueEntry.SubscriptionAssignedState getAssignedState()
+    {
+        return _assignedState;
+    }
+
+
+    public void confirmAutoClose()
+    {
+        ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+        converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
+    }
+
+    public boolean acquires()
+    {
+        return !isBrowser();
+    }
+
+    public boolean seesRequeues()
+    {
+        return !isBrowser();
+    }
+
+    public void set(String key, Object value)
+    {
+        _properties.put(key, value);
+    }
+
+    public Object get(String key)
+    {
+        return _properties.get(key);
+    }
+
+
+    public void setNoLocal(boolean noLocal)
+    {
+        _noLocal = noLocal;
+    }
+
+    abstract boolean isBrowser();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Sun Oct 25 22:58:57 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.virtualhost;
 
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,7 +30,7 @@
 
 public class VirtualHostRegistry
 {
-    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String,VirtualHost>();
+    private final Map<String, VirtualHost> _registry = new ConcurrentHashMap<String, VirtualHost>();
 
 
     private String _defaultVirtualHostName;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java Sun Oct 25 22:58:57 2009
@@ -14,14 +14,16 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.tools.messagestore.commands;
 
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 
 public class Copy extends Move
 {
@@ -49,7 +51,9 @@
 
     protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
     {
-        fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), _storeContext);
+        ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+        fromQueue.copyMessagesToAnotherQueue(start, end, toQueue.getName().toString(), txn);
+        txn.commit();
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java Sun Oct 25 22:58:57 2009
@@ -21,16 +21,13 @@
 package org.apache.qpid.tools.messagestore.commands;
 
 import org.apache.commons.codec.binary.Hex;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.tools.utils.Console;
 
 import java.io.UnsupportedEncodingException;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -100,7 +97,7 @@
 
         for (QueueEntry entry : messages)
         {
-            AMQMessage msg = entry.getMessage();
+            ServerMessage msg = entry.getMessage();
             if (!includeMsg(msg, msgids))
             {
                 continue;
@@ -112,7 +109,7 @@
 
             // Show general message information
             hex.add(Show.Columns.ID.name());
-            ascii.add(msg.getMessageId().toString());
+            ascii.add(msg.getMessageNumber().toString());
 
             hex.add(Console.ROW_DIVIDER);
             ascii.add(Console.ROW_DIVIDER);
@@ -136,10 +133,10 @@
             hex.add(Console.ROW_DIVIDER);
             ascii.add(Console.ROW_DIVIDER);
 
-            Iterator bodies = msg.getContentBodyIterator();
-            if (bodies.hasNext())
-            {
 
+            final int messageSize = (int) msg.getSize();
+            if (messageSize != 0)
+            {
                 hex.add("Hex");
                 hex.add(Console.ROW_DIVIDER);
 
@@ -147,14 +144,19 @@
                 ascii.add("ASCII");
                 ascii.add(Console.ROW_DIVIDER);
 
-                while (bodies.hasNext())
+                java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(64 * 1024);
+
+                int position = 0;
+
+                while(position < messageSize)
                 {
-                    ContentChunk chunk = (ContentChunk) bodies.next();
 
+                    position += msg.getContent(buf, position);
+                    buf.flip();
                     //Duplicate so we don't destroy original data :)
-                    ByteBuffer hexBuffer = chunk.getData().duplicate();
+                    java.nio.ByteBuffer hexBuffer = buf;
 
-                    ByteBuffer charBuffer = hexBuffer.duplicate();
+                    java.nio.ByteBuffer charBuffer = hexBuffer.duplicate();
 
                     Hex hexencoder = new Hex();
 
@@ -232,6 +234,7 @@
 
                         ascii.add(asciiLine);
                     }
+                    buf.clear();
                 }
             }
             else
@@ -252,7 +255,7 @@
         return display;
     }
 
-    private void addShowInformation(List<String> column1, List<String> column2, AMQMessage msg,
+    private void addShowInformation(List<String> column1, List<String> column2, ServerMessage msg,
                                     String title, boolean routing, boolean headers, boolean messageHeaders)
     {
         List<QueueEntry> single = new LinkedList<QueueEntry>();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java Sun Oct 25 22:58:57 2009
@@ -14,9 +14,9 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.tools.messagestore.commands;
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java Sun Oct 25 22:58:57 2009
@@ -14,17 +14,17 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.tools.messagestore.commands;
 
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.queue.QueueEntryImpl;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 
 import java.util.LinkedList;
@@ -33,12 +33,6 @@
 public class Move extends AbstractCommand
 {
 
-    /**
-     * Since the Coopy command is not associated with a real channel we can safely create our own store context
-     * for use in the few methods that require one.
-     */
-    protected StoreContext _storeContext = new StoreContext();
-
     public Move(MessageStoreTool tool)
     {
         super(tool);
@@ -172,7 +166,7 @@
             {
                 for (QueueEntry msg : messages)
                 {
-                    ids.add(msg.getMessage().getMessageId());
+                    ids.add(msg.getMessage().getMessageNumber());
                 }
             }
         }
@@ -201,6 +195,8 @@
 
     protected void doCommand(AMQQueue fromQueue, long start, long id, AMQQueue toQueue)
     {
-        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), _storeContext);
+        ServerTransaction txn = new LocalTransaction(fromQueue.getVirtualHost().getTransactionLog());
+        fromQueue.moveMessagesToAnotherQueue(start, id, toQueue.getName().toString(), txn);
+        txn.commit();
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java Sun Oct 25 22:58:57 2009
@@ -62,6 +62,6 @@
 
     protected void doCommand(AMQQueue fromQueue, long start, long end, AMQQueue toQueue)
     {
-        fromQueue.removeMessagesFromQueue(start, end, _storeContext);
+        fromQueue.removeMessagesFromQueue(start, end);
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Sun Oct 25 22:58:57 2009
@@ -25,10 +25,10 @@
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntryImpl;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.tools.messagestore.MessageStoreTool;
 import org.apache.qpid.tools.utils.Console;
 
@@ -171,7 +171,7 @@
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getEncoding();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getExpiration();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders();
-//        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageId();
+//        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getMessageNumber();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPriority();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getPropertyFlags();
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getReplyTo();
@@ -182,14 +182,14 @@
 //        //Print out all the property names
 //        ((BasicContentHeaderProperties)msg.getContentHeaderBody().properties).getHeaders().getPropertyNames();
 //
-//        msg.getMessageId();
+//        msg.getMessageNumber();
 //        msg.getSize();
 //        msg.getArrivalTime();
 
 //        msg.getDeliveredSubscription();
 //        msg.getDeliveredToConsumer();
 //        msg.getMessageHandle();
-//        msg.getMessageId();
+//        msg.getMessageNumber();
 //        msg.getMessagePublishInfo();
 //        msg.getPublisher();
 
@@ -337,30 +337,24 @@
         //Add create the table of data
         for (QueueEntry entry : messages)
         {
-            AMQMessage msg = entry.getMessage();
+            ServerMessage msg = entry.getMessage();
             if (!includeMsg(msg, msgids))
             {
                 continue;
             }
 
-            id.add(msg.getMessageId().toString());
+            id.add(msg.getMessageNumber().toString());
 
             size.add("" + msg.getSize());
 
             arrival.add("" + msg.getArrivalTime());
 
-            try
-            {
-                ispersitent.add(msg.isPersistent() ? "true" : "false");
-            }
-            catch (AMQException e)
-            {
-                ispersitent.add("n/a");
-            }
+            ispersitent.add(msg.isPersistent() ? "true" : "false");
+
 
-            isredelivered.add(msg.isRedelivered() ? "true" : "false");
+            isredelivered.add(entry.isRedelivered() ? "true" : "false");
 
-            isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
+            isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
 
 //        msg.getMessageHandle();
 
@@ -368,7 +362,10 @@
 
             try
             {
-                headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+                if(msg instanceof AMQMessage)
+                {
+                    headers = ((BasicContentHeaderProperties) ((AMQMessage)msg).getContentHeaderBody().properties);
+                }
             }
             catch (AMQException e)
             {
@@ -417,7 +414,11 @@
                 MessagePublishInfo info = null;
                 try
                 {
-                    info = msg.getMessagePublishInfo();
+                    if(msg instanceof AMQMessage)
+                    {
+                        info = ((AMQMessage)msg).getMessagePublishInfo();
+                    }
+
                 }
                 catch (AMQException e)
                 {
@@ -457,14 +458,14 @@
         return data;
     }
 
-    protected boolean includeMsg(AMQMessage msg, List<Long> msgids)
+    protected boolean includeMsg(ServerMessage msg, List<Long> msgids)
     {
         if (msgids == null)
         {
             return true;
         }
 
-        Long msgid = msg.getMessageId();
+        Long msgid = msg.getMessageNumber();
 
         boolean found = false;
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java Sun Oct 25 22:58:57 2009
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class AMQBrokerManagerMBeanTest extends TestCase
@@ -46,7 +47,7 @@
         assertTrue(_exchangeRegistry.getExchange(new AMQShortString(exchange3)) == null);
 
 
-        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
         mbean.createNewExchange(exchange1, "direct", false);
         mbean.createNewExchange(exchange2, "topic", false);
         mbean.createNewExchange(exchange3, "headers", false);
@@ -68,7 +69,7 @@
     {
         String queueName = "testQueue_" + System.currentTimeMillis();
 
-        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHost.VirtualHostMBean) _vHost.getManagedObject());
+        ManagedBroker mbean = new AMQBrokerManagerMBean((VirtualHostImpl.VirtualHostMBean) _vHost.getManagedObject());
 
         assertTrue(_queueRegistry.getQueue(new AMQShortString(queueName)) == null);
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java Sun Oct 25 22:58:57 2009
@@ -22,23 +22,22 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.SimpleQueueEntryList;
 import org.apache.qpid.server.queue.MockAMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.AMQException;
 
 import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
-import java.util.Iterator;
 
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
@@ -63,6 +62,7 @@
     UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
     private static final int INITIAL_MSG_COUNT = 10;
     private AMQQueue _queue = new MockAMQQueue(getName());
+    private MessageStore _messageStore = new MemoryMessageStore();
     private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
 
     @Override
@@ -89,7 +89,7 @@
         while(queueEntries.advance())
         {
             QueueEntry entry = queueEntries.getNode();
-            _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry);
+            _unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
 
             // Store the entry for future inspection
             _referenceList.add(entry);
@@ -137,7 +137,7 @@
 
         // requeueIfUnabletoResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, new StoreContext()));
+                                                                    msgToResend, true, _messageStore));
 
         assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -166,7 +166,7 @@
 
         // requeueIfUnabletoResend doesn't matter here.
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, new StoreContext()));
+                                                                    msgToResend, true, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -187,7 +187,7 @@
 
         // requeueIfUnabletoResend = true so all messages should go to msgToRequeue
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, true, new StoreContext()));
+                                                                    msgToResend, true, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
@@ -208,7 +208,7 @@
 
         // requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, new StoreContext()));
+                                                                    msgToResend, false, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
@@ -240,7 +240,7 @@
 
         // requeueIfUnabletoResend : value doesn't matter here as queue has been deleted
         _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
-                                                                    msgToResend, false, new StoreContext()));
+                                                                    msgToResend, false, _messageStore));
 
         assertEquals("Message count for resend not correct.", 0, msgToResend.size());
         assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Collections;
 
 import junit.framework.TestCase;
 
@@ -482,12 +483,17 @@
     {
         // Check default
         ServerConfiguration serverConfig = new ServerConfiguration(_config);
-        assertEquals(5672, serverConfig.getPort());
+        assertNotNull(serverConfig.getPorts());
+        assertEquals(1, serverConfig.getPorts().size());
+        assertEquals(5672, serverConfig.getPorts().get(0));
+
 
         // Check value we set
-        _config.setProperty("connector.port", 10);
+        _config.setProperty("connector.port", "10");
         serverConfig = new ServerConfiguration(_config);
-        assertEquals(10, serverConfig.getPort());
+        assertNotNull(serverConfig.getPorts());
+        assertEquals(1, serverConfig.getPorts().size());
+        assertEquals("10", serverConfig.getPorts().get(0));
     }
 
     public void testGetBind() throws ConfigurationException
@@ -723,7 +729,9 @@
         ServerConfiguration config = new ServerConfiguration(mainFile.getAbsoluteFile());
         assertEquals(4235, config.getSSLPort()); // From first file, not
                                                  // overriden by second
-        assertEquals(2342, config.getPort()); // From the first file, not
+        assertNotNull(config.getPorts());
+        assertEquals(1, config.getPorts().size());
+        assertEquals("2342", config.getPorts().get(0)); // From the first file, not
                                               // present in the second
         assertEquals(true, config.getQpidNIO()); // From the second file, not
                                                  // present in the first
@@ -967,7 +975,7 @@
         out.write("\t<rule access=\"deny\" network=\"127.0.0.1\"/>");
         out.write("</firewall>\n");
         out.close();
-        
+
         reg.getConfiguration().reparseConfigFile();
 
         assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java Sun Oct 25 22:58:57 2009
@@ -14,21 +14,20 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
- * 
+ *  under the License.
+ *
  */
 package org.apache.qpid.server.configuration;
 
 
 import junit.framework.TestCase;
-import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQPriorityQueue;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public class VirtualHostConfigurationTest extends TestCase
 {
@@ -55,50 +54,50 @@
 
         super.tearDown();
     }
-    
+
     public void testQueuePriority() throws Exception
     {
         // Set up queue with 5 priorities
-        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", 
+        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
                               "atest");
-        configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", 
+        configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange",
                               "amq.direct");
-        configXml.addProperty("virtualhost.test.queues.queue.atest.priorities", 
+        configXml.addProperty("virtualhost.test.queues.queue.atest.priorities",
                               "5");
 
         // Set up queue with JMS style priorities
-        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", 
+        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
                               "ptest");
-        configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange", 
+        configXml.addProperty("virtualhost.test.queues.queue.ptest(-1).exchange",
                               "amq.direct");
-        configXml.addProperty("virtualhost.test.queues.queue.ptest.priority", 
+        configXml.addProperty("virtualhost.test.queues.queue.ptest.priority",
                                "true");
-        
+
         // Set up queue with no priorities
-        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", 
+        configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)",
                               "ntest");
-        configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange", 
+        configXml.addProperty("virtualhost.test.queues.queue.ntest(-1).exchange",
                               "amq.direct");
-        configXml.addProperty("virtualhost.test.queues.queue.ntest.priority", 
+        configXml.addProperty("virtualhost.test.queues.queue.ntest.priority",
                               "false");
-        
-        VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-        
+
+        VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
         // Check that atest was a priority queue with 5 priorities
         AMQQueue atest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
         assertTrue(atest instanceof AMQPriorityQueue);
         assertEquals(5, ((AMQPriorityQueue) atest).getPriorities());
-        
+
         // Check that ptest was a priority queue with 10 priorities
         AMQQueue ptest = vhost.getQueueRegistry().getQueue(new AMQShortString("ptest"));
         assertTrue(ptest instanceof AMQPriorityQueue);
         assertEquals(10, ((AMQPriorityQueue) ptest).getPriorities());
-        
+
         // Check that ntest wasn't a priority queue
         AMQQueue ntest = vhost.getQueueRegistry().getQueue(new AMQShortString("ntest"));
         assertFalse(ntest instanceof AMQPriorityQueue);
     }
-    
+
     public void testQueueAlerts() throws Exception
     {
         // Set up queue with 5 priorities
@@ -106,7 +105,7 @@
         configXml.addProperty("virtualhost.test.queues.maximumQueueDepth", "1");
         configXml.addProperty("virtualhost.test.queues.maximumMessageSize", "2");
         configXml.addProperty("virtualhost.test.queues.maximumMessageAge", "3");
-        
+
         configXml.addProperty("virtualhost.test.queues(-1).queue(1).name(1)", "atest");
         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).exchange", "amq.direct");
         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumQueueDepth", "4");
@@ -114,21 +113,21 @@
         configXml.addProperty("virtualhost.test.queues.queue.atest(-1).maximumMessageAge", "6");
 
         configXml.addProperty("virtualhost.test.queues(-1).queue(-1).name(-1)", "btest");
-        
-        VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
-        
+
+        VirtualHost vhost = new VirtualHostImpl(new VirtualHostConfiguration("test", configXml.subset("virtualhost.test")));
+
         // Check specifically configured values
         AMQQueue aTest = vhost.getQueueRegistry().getQueue(new AMQShortString("atest"));
         assertEquals(4, aTest.getMaximumQueueDepth());
         assertEquals(5, aTest.getMaximumMessageSize());
         assertEquals(6, aTest.getMaximumMessageAge());
-        
-        // Check default values        
+
+        // Check default values
         AMQQueue bTest = vhost.getQueueRegistry().getQueue(new AMQShortString("btest"));
         assertEquals(1, bTest.getMaximumQueueDepth());
         assertEquals(2, bTest.getMaximumMessageSize());
         assertEquals(3, bTest.getMaximumMessageAge());
-        
+
     }
-    
+
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Sun Oct 25 22:58:57 2009
@@ -27,18 +27,18 @@
 import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
 import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.log4j.Logger;
 
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AbstractHeadersExchangeTestBase extends TestCase
 {
@@ -52,10 +52,6 @@
      */
     private MessageStore _store = new MemoryMessageStore();
 
-    private StoreContext _storeContext = new StoreContext();
-
-    private MessageHandleFactory _handleFactory = new MessageHandleFactory();
-
     private int count;
 
     public void testDoNothing()
@@ -91,14 +87,18 @@
     }
 
 
-    protected void route(Message m) throws AMQException
+    protected int route(Message m) throws AMQException
     {
+        m.getIncomingMessage().headersReceived();
         m.route(exchange);
-        m.getIncomingMessage().routingComplete(_store, _handleFactory);
         if(m.getIncomingMessage().allContentReceived())
         {
-            m.getIncomingMessage().deliverToQueues();
+            for(AMQQueue q : m.getIncomingMessage().getDestinationQueues())
+            {
+                q.enqueue(m);
+            }
         }
+        return m.getIncomingMessage().getDestinationQueues().size();
     }
 
     protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
@@ -118,10 +118,8 @@
 
     protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
     {
-        try
-        {
-            route(m);
-            assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+            int queueCount = route(m);
+
             for (TestQueue q : queues)
             {
                 if (expected.contains(q))
@@ -135,12 +133,11 @@
                     //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
                 }
             }
-        }
 
-        catch (NoRouteException ex)
-        {
-            assertTrue("Expected "+m+" not to be returned",expectReturn);
-        }
+            if(expectReturn)
+            {
+                assertEquals("Expected "+m+" to be returned due to manadatory flag, and lack of routing",0, queueCount);
+            }
 
     }
 
@@ -242,6 +239,11 @@
     {
         final List<HeadersExchangeTest.Message> messages = new ArrayList<HeadersExchangeTest.Message>();
 
+        public String toString()
+        {
+            return getName().toString();
+        }
+
         public TestQueue(AMQShortString name) throws AMQException
         {
             super(name, false, new AMQShortString("test"), true, ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"));
@@ -256,9 +258,9 @@
          * @throws AMQException
          */
         @Override
-        public QueueEntry enqueue(StoreContext context, AMQMessage msg) throws AMQException
+        public QueueEntry enqueue(ServerMessage msg) throws AMQException
         {
-            messages.add( new HeadersExchangeTest.Message(msg));
+            messages.add( new HeadersExchangeTest.Message((AMQMessage) msg));
             return new QueueEntry()
             {
 
@@ -317,6 +319,11 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
+                public boolean isAcquiredBy(Subscription subscription)
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public void setDeliveredToSubscription()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
@@ -327,9 +334,9 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public String debugIdentity()
+                public boolean releaseButRetain()
                 {
-                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                    return false;
                 }
 
                 public boolean immediateAndNotDelivered()
@@ -337,11 +344,26 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void setRedelivered(boolean b)
+                public void setRedelivered()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
+                public AMQMessageHeader getMessageHeader()
+                {
+                    return null;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isPersistent()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public boolean isRedelivered()
+                {
+                    return false;  //To change body of implemented methods use File | Settings | File Templates.
+                }
+
                 public Subscription getDeliveredSubscription()
                 {
                     return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -362,17 +384,22 @@
                     return false;  //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void requeue(StoreContext storeContext) throws AMQException
+                public void requeue()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void requeue(Subscription subscription)
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+                public void dequeue()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void dispose(final StoreContext storeContext) throws MessageCleanupException
+                public void dispose()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
@@ -382,7 +409,12 @@
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
 
-                public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+                public void discard()
+                {
+                    //To change body of implemented methods use File | Settings | File Templates.
+                }
+
+                public void routeToAlternate()
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
@@ -421,15 +453,16 @@
      */
     static class Message extends AMQMessage
     {
+        private static AtomicLong _messageId = new AtomicLong();
+
         private class TestIncomingMessage extends IncomingMessage
         {
 
             public TestIncomingMessage(final long messageId,
                                        final MessagePublishInfo info,
-                                       final TransactionalContext txnContext,
                                        final AMQProtocolSession publisher)
             {
-                super(messageId, info, txnContext, publisher);
+                super(info);
             }
 
 
@@ -439,7 +472,7 @@
             }
 
 
-            public ContentHeaderBody getContentHeaderBody()
+            public ContentHeaderBody getContentHeader()
             {
                 try
                 {
@@ -454,15 +487,6 @@
 
         private IncomingMessage _incoming;
 
-        private static MessageStore _messageStore = new SkeletonMessageStore();
-
-        private static StoreContext _storeContext = new StoreContext();
-
-
-        private static TransactionalContext _txnContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                                                      null,
-                                                                         new LinkedList<RequiredDeliveryException>()
-        );
 
         Message(AMQProtocolSession protocolSession, String id, String... headers) throws AMQException
         {
@@ -471,7 +495,7 @@
 
         Message(AMQProtocolSession protocolSession, String id, FieldTable headers) throws AMQException
         {
-            this(protocolSession, _messageStore.getNewMessageId(),getPublishRequest(id), getContentHeader(headers), null);
+            this(protocolSession, _messageId.incrementAndGet(),getPublishRequest(id), getContentHeader(headers), Collections.EMPTY_LIST);
         }
 
         public IncomingMessage getIncomingMessage()
@@ -484,46 +508,34 @@
                         ContentHeaderBody header,
                         List<ContentBody> bodies) throws AMQException
         {
-            super(createMessageHandle(messageId, publish, header), _txnContext.getStoreContext(), publish);
+            super(new MockStoredMessage(messageId, publish, header));
+
+            StoredMessage<MessageMetaData> storedMessage = getStoredMessage();
 
+            int pos = 0;
+            for(ContentBody body : bodies)
+            {
+                storedMessage.addContent(pos, body.payload.duplicate().buf());
+                pos += body.payload.limit();
+            }
 
-            
-            _incoming = new TestIncomingMessage(getMessageId(),publish, _txnContext, protocolsession);
+            _incoming = new TestIncomingMessage(getMessageId(),publish, protocolsession);
             _incoming.setContentHeaderBody(header);
 
 
         }
 
-        private static AMQMessageHandle createMessageHandle(final long messageId,
-                                                            final MessagePublishInfo publish,
-                                                            final ContentHeaderBody header)
-        {
-
-            final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId,
-                                                                                                       _messageStore,
-                                                                                                       true);
-
-            try
-            {
-                amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(),publish,header);
-            }
-            catch (AMQException e)
-            {
-                
-            }
-            return amqMessageHandle;
-        }
 
         private Message(AMQMessage msg) throws AMQException
         {
-            super(msg);
+            super(msg.getStoredMessage());
         }
 
 
 
         void route(Exchange exchange) throws AMQException
         {
-            exchange.route(_incoming);
+            _incoming.enqueue(exchange.route(_incoming));
         }
 
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java Sun Oct 25 22:58:57 2009
@@ -29,6 +29,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Sun Oct 25 22:58:57 2009
@@ -22,16 +22,95 @@
 
 import java.util.Map;
 import java.util.HashMap;
+import java.util.Set;
 
 import junit.framework.TestCase;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.message.AMQMessageHeader;
 
 /**
  */
 public class HeadersBindingTest extends TestCase
 {
+
+    private class MockHeader implements AMQMessageHeader
+    {
+
+        private final Map<String, Object> _headers = new HashMap<String, Object>();
+
+        public String getCorrelationId()
+        {
+            return null;
+        }
+
+        public long getExpiration()
+        {
+            return 0;
+        }
+
+        public String getMessageId()
+        {
+            return null;
+        }
+
+        public String getMimeType()
+        {
+            return null;  //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+        public String getEncoding()
+        {
+            return null;  //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+        public byte getPriority()
+        {
+            return 0;
+        }
+
+        public long getTimestamp()
+        {
+            return 0;
+        }
+
+        public String getType()
+        {
+            return null;
+        }
+
+        public String getReplyTo()
+        {
+            return null;
+        }
+
+        public Object getHeader(String name)
+        {
+            return _headers.get(name);
+        }
+
+        public boolean containsHeaders(Set<String> names)
+        {
+            return _headers.keySet().containsAll(names);
+        }
+
+        public boolean containsHeader(String name)
+        {
+            return _headers.containsKey(name);
+        }
+
+        public void setString(String key, String value)
+        {
+            setObject(key,value);
+        }
+
+        public void setObject(String key, Object value)
+        {
+            _headers.put(key,value);
+        }
+    }
+
     private FieldTable bindHeaders = new FieldTable();
-    private FieldTable matchHeaders = new FieldTable();
+    private MockHeader matchHeaders = new MockHeader();
 
     public void testDefault_1()
     {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,7 +35,8 @@
         super.setUp();
         // AR will use the NullAR by default
         // Just use the first vhost.
-        VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
+        VirtualHost
+                virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
         _protocolSession = new InternalTestProtocolSession(virtualHost);
     }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java Sun Oct 25 22:58:57 2009
@@ -121,7 +121,7 @@
         // Verify that the message has the correct type
         assertTrue("Message contains the [con: prefix",
                    logs.get(0).toString().contains("[con:"));
-        
+
 
         // Verify that all the values were presented to the MessageFormatter
         // so we will not end up with '{n}' entries in the log.

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java Sun Oct 25 22:58:57 2009
@@ -75,7 +75,7 @@
         // Correctly Close the AR we created
         ApplicationRegistry.remove();
 
-        super.tearDown();        
+        super.tearDown();
     }
 
     private void setUpWithConfig(ServerConfiguration serverConfig) throws AMQException

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java Sun Oct 25 22:58:57 2009
@@ -41,12 +41,12 @@
     {
         String location = "/path/to/the/message/store.files";
 
-        _logMessage = MessageStoreMessages.MST_1002(location);
+        _logMessage = ConfigStoreMessages.CFG_1002(location);
         List<Object> log = performLog();
 
         String[] expected = {"Store location :", location};
 
-        validateLogMessage(log, "MST-1002", expected);
+        validateLogMessage(log, "CFG-1002", expected);
     }
 
     public void testMessage1003()
@@ -59,7 +59,7 @@
         validateLogMessage(log, "MST-1003", expected);
     }
 
-    public void testMessage1004()
+  /*  public void testMessage1004()
     {
         _logMessage = MessageStoreMessages.MST_1004(null,false);
         List<Object> log = performLog();
@@ -91,7 +91,7 @@
 
         // Here we use MessageFormat to ensure the messasgeCount of 2000 is
         // reformated for display as '2,000'
-        String[] expected = {"Recovered ", 
+        String[] expected = {"Recovered ",
                              MessageFormat.format("{0,number}", messasgeCount),
                              "messages for queue", queueName};
 
@@ -119,5 +119,5 @@
 
         validateLogMessage(log, "MST-1006", expected);
     }
-
+    */
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java Sun Oct 25 22:58:57 2009
@@ -23,19 +23,16 @@
 import junit.framework.TestCase;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.SkeletonMessageStore;
 
 import javax.management.JMException;
-import java.security.Principal;
 
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class AMQProtocolSessionMBeanTest extends TestCase

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java Sun Oct 25 22:58:57 2009
@@ -21,19 +21,19 @@
 package org.apache.qpid.server.protocol;
 
 import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.transport.TestNetworkDriver;
 
 public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
@@ -70,6 +70,16 @@
         return (byte) 8;
     }
 
+    public void writeReturn(MessagePublishInfo messagePublishInfo,
+                            ContentHeaderBody header,
+                            MessageContentSource msgContent,
+                            int channelId,
+                            int replyCode,
+                            AMQShortString replyText) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public byte getProtocolMinorVersion()
     {
         return (byte) 0;
@@ -82,12 +92,12 @@
         synchronized (_channelDelivers)
         {
             List<DeliveryPair> all =_channelDelivers.get(channelId).get(consumerTag);
-            
+
             if (all == null)
             {
                 return new ArrayList<DeliveryPair>(0);
             }
-            
+
             List<DeliveryPair> msgs = all.subList(0, count);
 
             List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
@@ -108,7 +118,7 @@
     {
     }
 
-    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
     {
         _deliveryCount.incrementAndGet();
 
@@ -130,11 +140,11 @@
                 consumers.put(consumerTag, consumerDelivers);
             }
 
-            consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+            consumerDelivers.add(new DeliveryPair(deliveryTag, (AMQMessage)entry.getMessage()));
         }
     }
 
-    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    public void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
     }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Sun Oct 25 22:58:57 2009
@@ -22,17 +22,10 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-
-import java.security.Principal;
 
 /** Test class to test MBean operations for AMQMinaProtocolSession. */
 public class MaxChannelsTest extends TestCase
@@ -66,14 +59,14 @@
         }
         assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
     }
-    
+
     @Override
     public void setUp()
     {
         //Highlight that this test will cause a new AR to be created
         ApplicationRegistry.getInstance();
     }
-    
+
     @Override
     public void tearDown() throws Exception
     {
@@ -87,7 +80,7 @@
         {
             // Correctly Close the AR we created
             ApplicationRegistry.remove();
-        }        
+        }
     }
 
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Sun Oct 25 22:58:57 2009
@@ -1,6 +1,6 @@
 package org.apache.qpid.server.queue;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,21 +8,22 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 import java.util.ArrayList;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
 import junit.framework.AssertionFailedError;
@@ -42,38 +43,38 @@
     {
 
         // Enqueue messages in order
-        _queue.enqueue(null, createMessage(1L, (byte) 10));
-        _queue.enqueue(null, createMessage(2L, (byte) 4));
-        _queue.enqueue(null, createMessage(3L, (byte) 0));
-        
+        _queue.enqueue(createMessage(1L, (byte) 10));
+        _queue.enqueue(createMessage(2L, (byte) 4));
+        _queue.enqueue(createMessage(3L, (byte) 0));
+
         // Enqueue messages in reverse order
-        _queue.enqueue(null, createMessage(4L, (byte) 0));
-        _queue.enqueue(null, createMessage(5L, (byte) 4));
-        _queue.enqueue(null, createMessage(6L, (byte) 10));
-        
+        _queue.enqueue(createMessage(4L, (byte) 0));
+        _queue.enqueue(createMessage(5L, (byte) 4));
+        _queue.enqueue(createMessage(6L, (byte) 10));
+
         // Enqueue messages out of order
-        _queue.enqueue(null, createMessage(7L, (byte) 4));
-        _queue.enqueue(null, createMessage(8L, (byte) 10));
-        _queue.enqueue(null, createMessage(9L, (byte) 0));
-        
+        _queue.enqueue(createMessage(7L, (byte) 4));
+        _queue.enqueue(createMessage(8L, (byte) 10));
+        _queue.enqueue(createMessage(9L, (byte) 0));
+
         // Register subscriber
         _queue.registerSubscription(_subscription, false);
         Thread.sleep(150);
-        
+
         ArrayList<QueueEntry> msgs = _subscription.getMessages();
         try
         {
-            assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageId());
-            assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageId());
-            assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageId());
-
-            assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageId());
-            assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageId());
-            assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageId());
-
-            assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageId());
-            assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageId());
-            assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageId());
+            assertEquals(new Long(1L), msgs.get(0).getMessage().getMessageNumber());
+            assertEquals(new Long(6L), msgs.get(1).getMessage().getMessageNumber());
+            assertEquals(new Long(8L), msgs.get(2).getMessage().getMessageNumber());
+
+            assertEquals(new Long(2L), msgs.get(3).getMessage().getMessageNumber());
+            assertEquals(new Long(5L), msgs.get(4).getMessage().getMessageNumber());
+            assertEquals(new Long(7L), msgs.get(5).getMessage().getMessageNumber());
+
+            assertEquals(new Long(3L), msgs.get(6).getMessage().getMessageNumber());
+            assertEquals(new Long(4L), msgs.get(7).getMessage().getMessageNumber());
+            assertEquals(new Long(9L), msgs.get(8).getMessage().getMessageNumber());
         }
         catch (AssertionFailedError afe)
         {
@@ -81,7 +82,7 @@
             int index = 1;
             for (QueueEntry qe : msgs)
             {
-                System.err.println(index + ":" + qe.getMessage().getMessageId());
+                System.err.println(index + ":" + qe.getMessage().getMessageNumber());
                 index++;
             }
 
@@ -98,10 +99,10 @@
         msg.getContentHeaderBody().properties = props;
         return msg;
     }
-    
+
     protected AMQMessage createMessage(Long id) throws AMQException
     {
         return createMessage(id, (byte) 0);
     }
-    
+
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Sun Oct 25 22:58:57 2009
@@ -20,21 +20,17 @@
  */
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-
-import javax.management.Notification;
-
 import junit.framework.TestCase;
-
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.InternalTestProtocolSession;
@@ -42,16 +38,16 @@
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import javax.management.Notification;
+import java.util.ArrayList;
+
 /** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
 public class AMQQueueAlertTest extends TestCase
-{                                                         
+{
     private final static long MAX_MESSAGE_COUNT = 50;
     private final static long MAX_MESSAGE_AGE = 250;   // 0.25 sec
     private final static long MAX_MESSAGE_SIZE = 2000;  // 2 KB
@@ -61,11 +57,6 @@
     private VirtualHost _virtualHost;
     private AMQProtocolEngine _protocolSession;
     private MessageStore _messageStore = new MemoryMessageStore();
-    private StoreContext _storeContext = new StoreContext();
-    private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
-                                                                                     null,
-                                                                                     new LinkedList<RequiredDeliveryException>()
-    );
     private static final SubscriptionFactoryImpl SUBSCRIPTION_FACTORY = SubscriptionFactoryImpl.INSTANCE;
 
     /**
@@ -75,6 +66,10 @@
      */
     public void testMessageCountAlert() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue1"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -82,7 +77,7 @@
 
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
 
-        sendMessages(MAX_MESSAGE_COUNT, 256l);
+        sendMessages(channel, MAX_MESSAGE_COUNT, 256l);
         assertTrue(_queueMBean.getMessageCount() == MAX_MESSAGE_COUNT);
 
         Notification lastNotification = _queueMBean.getLastNotification();
@@ -99,6 +94,10 @@
      */
     public void testMessageSizeAlert() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue2"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -106,7 +105,7 @@
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
         _queueMBean.setMaximumMessageSize(MAX_MESSAGE_SIZE);
 
-        sendMessages(1, MAX_MESSAGE_SIZE * 2);
+        sendMessages(channel, 1, MAX_MESSAGE_SIZE * 2);
         assertTrue(_queueMBean.getMessageCount() == 1);
 
         Notification lastNotification = _queueMBean.getLastNotification();
@@ -125,6 +124,10 @@
      */
     public void testQueueDepthAlertNoSubscriber() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue3"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -134,7 +137,7 @@
 
         while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
         {
-            sendMessages(1, MAX_MESSAGE_SIZE);
+            sendMessages(channel, 1, MAX_MESSAGE_SIZE);
         }
 
         Notification lastNotification = _queueMBean.getLastNotification();
@@ -154,6 +157,10 @@
      */
     public void testMessageAgeAlert() throws Exception
     {
+        _protocolSession = new InternalTestProtocolSession(_virtualHost);
+        AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
+        _protocolSession.addChannel(channel);
+
         _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("testQueue4"), false, new AMQShortString("AMQueueAlertTest"),
                               false, _virtualHost,
                               null);
@@ -161,7 +168,7 @@
         _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
         _queueMBean.setMaximumMessageAge(MAX_MESSAGE_AGE);
 
-        sendMessages(1, MAX_MESSAGE_SIZE);
+        sendMessages(channel, 1, MAX_MESSAGE_SIZE);
 
         // Ensure message sits on queue long enough to age.
         Thread.sleep(MAX_MESSAGE_AGE * 2);
@@ -201,7 +208,7 @@
         // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
         int messageCount = Math.round(MAX_QUEUE_DEPTH / MAX_MESSAGE_SIZE) + 10;
         long totalSize = (messageCount * MAX_MESSAGE_SIZE);
-        sendMessages(messageCount, MAX_MESSAGE_SIZE);
+        sendMessages(channel, messageCount, MAX_MESSAGE_SIZE);
 
         // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
         // so there should be no Queue_Deoth alert raised
@@ -228,7 +235,7 @@
 
         _queue.registerSubscription(
                 subscription2, false);
-        
+
         while (_queue.getUndeliveredMessageCount()!= 0)
         {
             Thread.sleep(100);
@@ -247,7 +254,7 @@
         _queueMBean.clearQueue();
         assertEquals(new Long(0), new Long(_queueMBean.getQueueDepth()));
     }
-    
+
     protected IncomingMessage message(final boolean immediate, long size) throws AMQException
     {
         MessagePublishInfo publish = new MessagePublishInfo()
@@ -280,8 +287,10 @@
         };
 
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+        contentHeaderBody.properties = props;
         contentHeaderBody.bodySize = size;   // in bytes
-        IncomingMessage message = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
+        IncomingMessage message = new IncomingMessage(publish);
         message.setContentHeaderBody(contentHeaderBody);
 
         return message;
@@ -305,16 +314,19 @@
     }
 
 
-    private void sendMessages(long messageCount, final long size) throws AMQException
+    private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException
     {
         IncomingMessage[] messages = new IncomingMessage[(int) messageCount];
+        MessageMetaData[] metaData = new MessageMetaData[(int) messageCount];
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message(false, size);
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
+            metaData[i] = messages[i].headersReceived();
+            messages[i].setStoredMessage(_messageStore.addMessage(metaData[i]));
+
             messages[i].enqueue(qs);
-            messages[i].routingComplete(_messageStore, new MessageHandleFactory());
 
         }
 
@@ -324,6 +336,10 @@
 
                 ByteBuffer _data = ByteBuffer.allocate((int)size);
 
+                {
+                    _data.limit((int)size);
+                }
+
                 public int getSize()
                 {
                     return (int) size;
@@ -336,10 +352,12 @@
 
                 public void reduceToFit()
                 {
-                    
+
                 }
             });
-            messages[i].deliverToQueues();
+
+            _queue.enqueue(new AMQMessage(messages[i].getStoredMessage()));
+
         }
     }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message