qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r761741 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/store/ test/java/org/apache/qpid/server/ack/ test/java/org/apache/qpid/server/queue/ test/java/org/apache/qpid/server/security/access/ test/java/org/apache/qpid...
Date Fri, 03 Apr 2009 17:54:44 GMT
Author: ritchiem
Date: Fri Apr  3 17:54:44 2009
New Revision: 761741

URL: http://svn.apache.org/viewvc?rev=761741&view=rev
Log:
QPID-1764 : Updated all tests to use the TestTransactionLog interface and split testing code into subclasses. TestableTransactionLog will now correctly wrap a TransactionLog for testing. To enable testing of the BaseTransactionLog a TestableBaseTransactionLog was needed to only return values that are actually stored in the BaseTL the TestableTransactionLog actually stores single enqueues so that they can be queried by the test.

Added:
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java
      - copied, changed from r761721, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Apr  3 17:54:44 2009
@@ -51,7 +51,7 @@
  */
 public class MemoryMessageStore implements TransactionLog, RoutingTable
 {
-    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+    protected static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
 
     private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
 
@@ -154,13 +154,7 @@
 
     public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
-        for (AMQQueue q : queues)
-        {
-            if (q.isDurable())
-            {
-                enqueueMessage(context,q,messageId);
-            }
-        }
+        // Not required to do anything
     }
 
     public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
@@ -232,25 +226,13 @@
         _metaDataMap.put(messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
-    {
-        checkNotClosed();
-        return _metaDataMap.get(messageId);
-    }
-
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
-    {
-        checkNotClosed();
-        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
-        return bodyList.get(index);
-    }
 
     public boolean isPersistent()
     {
         return false;
     }
 
-    private void checkNotClosed() throws MessageStoreClosedException
+    protected void checkNotClosed() throws MessageStoreClosedException
     {
         if (_closed.get())
         {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java Fri Apr  3 17:54:44 2009
@@ -103,7 +103,7 @@
         private final List<Long> _unacked;
         private StoreContext _storeContext = new StoreContext();
 		private AMQQueue _queue;
-        private TransactionLog _transactionLog = new TestableMemoryMessageStore();
+        private TransactionLog _transactionLog = new TestableMemoryMessageStore().configure();
 
         private static final int MESSAGE_SIZE=100;
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Fri Apr  3 17:54:44 2009
@@ -32,6 +32,7 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactory;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -43,14 +44,13 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.mina.common.ByteBuffer;
 
 import javax.management.JMException;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 
 /**
  * Test class to test AMQQueueMBean attribtues and operations
@@ -70,7 +70,7 @@
     public void testMessageCountTransient() throws Exception
     {
         int messageCount = 10;
-        sendMessages(messageCount, false);
+        List<AMQMessage> messages = sendMessages(messageCount, false);
         assertTrue(_queueMBean.getMessageCount() == messageCount);
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
         long queueDepth = (messageCount * MESSAGE_SIZE);
@@ -85,13 +85,13 @@
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
 
         //Ensure that the data has been removed from the Store
-        verifyBrokerState();
+        verifyBrokerState(messages);
     }
 
     public void testMessageCountPersistent() throws Exception
     {
         int messageCount = 10;
-        sendMessages(messageCount, true);
+        List<AMQMessage> messages = sendMessages(messageCount, true);
         assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
         long queueDepth = (messageCount * MESSAGE_SIZE);
@@ -106,20 +106,38 @@
         assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
 
         //Ensure that the data has been removed from the Store
-        verifyBrokerState();
+        verifyBrokerState(messages);
     }
 
     // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
-    private void verifyBrokerState()
+    private void verifyBrokerState(List<AMQMessage> messages)
     {
 
-        TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog());
+        TestableTransactionLog store = new TestableTransactionLog(_virtualHost.getTransactionLog());
 
-        // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
-        assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());       
-        assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
-        assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
-        assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+        // We can only now check messageData and ConentBodyChunks by MessageID.
+        for (AMQMessage message : messages)
+        {
+            // Check we have no message metadata for the messages we sent 
+            try
+            {
+                assertNull(store.getMessageMetaData(new StoreContext(), message.getMessageId()));
+            }
+            catch (AMQException e)
+            {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+
+            try
+            {
+                assertNull(store.getContentBodyChunk(new StoreContext(), message.getMessageId(),0));
+            }
+            catch (AMQException e)
+            {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+
+        }
     }
 
     public void testConsumerCount() throws AMQException
@@ -297,8 +315,9 @@
         ApplicationRegistry.remove(1);
     }
 
-    private void sendMessages(int messageCount, boolean persistent) throws AMQException
+    private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
     {
+        List<AMQMessage> messages = new LinkedList<AMQMessage>();
         for (int i = 0; i < messageCount; i++)
         {
             IncomingMessage currentMessage = message(false, persistent);
@@ -316,9 +335,10 @@
                                                        .convertToContentChunk(
                                                        new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
                                                                        MESSAGE_SIZE)));
-            currentMessage.deliverToQueues();
+            messages.add(currentMessage.deliverToQueues());
 
 
         }
+        return messages;
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Fri Apr  3 17:54:44 2009
@@ -30,6 +30,7 @@
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,8 +39,6 @@
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -47,6 +46,7 @@
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Set;
+import java.util.List;
 
 /**
  * Tests that acknowledgements are handled correctly.
@@ -59,7 +59,7 @@
 
     private MockProtocolSession _protocolSession;
 
-    private TestableMemoryMessageStore _messageStore;
+    private TestableTransactionLog _transactionLog;
 
     private StoreContext _storeContext = new StoreContext();
 
@@ -75,9 +75,9 @@
         ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
 
         VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
-        _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog());
-        _protocolSession = new MockProtocolSession(_messageStore);
-        _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
+        _transactionLog = new TestableTransactionLog(vhost.getTransactionLog());
+        _protocolSession = new MockProtocolSession(_transactionLog);
+        _channel = new AMQChannel(_protocolSession,5, _transactionLog /*dont need exchange registry*/);
 
         _protocolSession.addChannel(_channel);
 
@@ -95,13 +95,13 @@
         publishMessages(count, false);
     }
 
-    private void publishMessages(int count, boolean persistent) throws AMQException
+    private List<AMQMessage> publishMessages(int count, boolean persistent) throws AMQException
     {
-        TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
+        TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null,
                                                                       new LinkedList<RequiredDeliveryException>()
         );
         _queue.registerSubscription(_subscription,false);
-        MessageFactory factory = MessageFactory.getInstance();
+        List<AMQMessage> sentMessages = new LinkedList<AMQMessage>();
         for (int i = 1; i <= count; i++)
         {
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -109,7 +109,7 @@
             MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false,
                                                                         false, new AMQShortString("rk"));
 
-            IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore);
+            IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _transactionLog);
             //IncomingMessage msg2 = null;
             if (persistent)
             {
@@ -130,14 +130,16 @@
             ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
             qs.add(_queue);
             msg.enqueue(qs);
-            msg.routingComplete(_messageStore);
+            msg.routingComplete(_transactionLog);
             if(msg.allContentReceived())
             {
-                msg.deliverToQueues();
+               sentMessages.add(msg.deliverToQueues());
             }
             // we manually send the message to the subscription
             //_subscription.send(new QueueEntry(_queue,msg), _queue);
         }
+
+        return sentMessages;
     }
 
     /**
@@ -148,11 +150,16 @@
     {
         _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
         final int msgCount = 10;
-        publishMessages(msgCount, true);
+        List<AMQMessage> sentMessages = publishMessages(msgCount, true);
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+        for (AMQMessage message : sentMessages)
+        {
+            List<AMQQueue> enqueuedQueues = _transactionLog.getMessageReferenceMap(message.getMessageId());
+            assertNotNull("Expected message to be enqueued",enqueuedQueues);
+            assertEquals("Message is not enqueued on expected number of queues.",1, enqueuedQueues.size());
+        }
 
         Set<Long> deliveryTagSet = map.getDeliveryTags();
         int i = 1;
@@ -165,7 +172,6 @@
         }
 
         assertTrue(map.size() == msgCount);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
     }
 
     /**
@@ -180,8 +186,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize());
+//        assertTrue(_messageStore.getContentBodyMap().size() == 0);to be
 
     }
 
@@ -197,8 +203,8 @@
 
         UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
         assertTrue(map.size() == 0);
-        assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0);
-        assertTrue(_messageStore.getContentBodyMap().size() == 0);
+        assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize());
+//        assertTrue(_messageStore.getContentBodyMap().size() == 0);
 
     }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java Fri Apr  3 17:54:44 2009
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -31,10 +32,10 @@
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.commons.configuration.PropertiesConfiguration;
 
 import java.util.ArrayList;
 import java.util.LinkedList;
@@ -42,7 +43,7 @@
 
 public class PersistentMessageTest extends TransientMessageTest
 {
-    private TestableMemoryMessageStore _messageStore;
+    private TestableTransactionLog _transactionLog;
 
     protected SimpleAMQQueue _queue;
     protected AMQShortString _q1name = new AMQShortString("q1name");
@@ -54,22 +55,22 @@
 
     public void setUp() throws Exception
     {
-        _messageStore = new TestableMemoryMessageStore();
+        _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
 
         _storeContext = new StoreContext();
         VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration(PersistentMessageTest.class.getName(),
                                                                          new PropertiesConfiguration()),
-                                            _messageStore);
+                                            _transactionLog);
         _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null);
         // Create IncomingMessage and nondurable queue
-        _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages);
+        _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, _returnMessages);
 
     }
 
     @Override
     protected AMQMessage newMessage()
     {
-        return MessageFactory.getInstance().createMessage(_messageStore, true);
+        return MessageFactory.getInstance().createMessage(_transactionLog, true);
     }
 
     @Override
@@ -82,7 +83,7 @@
     /**
      * Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and
      * checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right
-     * contents. TransactionLog = Empty, returnMessages 1 item. 
+     * contents. TransactionLog = Empty, returnMessages 1 item.
      *
      * @throws Exception
      */
@@ -98,17 +99,16 @@
         // equivalent to amqChannel.routeMessage()
         msg.enqueue(qs);
 
-        msg.routingComplete(_messageStore);
+        msg.routingComplete(_transactionLog);
 
         // equivalent to amqChannel.deliverCurrentMessageIfComplete
         msg.deliverToQueues();
 
         // Check that data has been stored to disk
         long messageId = msg.getMessageId();
-        checkMessageMetaDataExists(messageId);
 
         // Check that it was not enqueued
-        List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
+        List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
         assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty());
         checkMessageMetaDataRemoved(messageId);
 
@@ -118,7 +118,7 @@
     protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
     {
         IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
-                                                  new MockProtocolSession(_messageStore), _messageStore);
+                                                  new MockProtocolSession(_transactionLog), _transactionLog);
 
         // equivalent to amqChannel.publishContenHeader
         ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
@@ -138,7 +138,8 @@
     {
         try
         {
-            _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+            assertNotNull("Message MetaData does not exist for message:" + messageId,
+                          _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
         }
         catch (AMQException amqe)
         {
@@ -151,8 +152,8 @@
         try
         {
             assertNull("Message MetaData still exists for message:" + messageId,
-                       _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
-            List ids = _messageStore.getMessageReferenceMap(messageId);
+                       _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
+            List ids = _transactionLog.getMessageReferenceMap(messageId);
             assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty());
 
         }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Apr  3 17:54:44 2009
@@ -35,12 +35,13 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.store.TestTransactionLog;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,7 +51,7 @@
 
     protected SimpleAMQQueue _queue;
     protected VirtualHost _virtualHost;
-    protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
+    protected TestableTransactionLog _transactionLog;
     protected AMQShortString _qname = new AMQShortString("qname");
     protected AMQShortString _owner = new AMQShortString("owner");
     protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -68,6 +69,7 @@
         //Create Application Registry for test
         ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
 
+        _transactionLog = new TestableTransactionLog(new MemoryMessageStore().configure());
         PropertiesConfiguration env = new PropertiesConfiguration();
         _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
         applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -340,7 +342,9 @@
 
         // Check that it is enqueued
         List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
-        assertNotNull(data);
+        assertNotNull("Message has no enqueued information.", data);
+        assertTrue("Message is not enqueued on correct queue.", data.contains(_queue));
+        assertEquals("Message not enqueued on the right queues.", 1, data.size());
 
         // Dequeue message
         ContentHeaderBody header = new ContentHeaderBody();
@@ -355,7 +359,7 @@
 
         // Check that it is dequeued
         data = _transactionLog.getMessageReferenceMap(messageId);
-        assertTrue(data == null || data.isEmpty());
+        assertNull("Message still has enqueue data.", data);
     }
 
     public void testMessagesFlowToDisk() throws AMQException, InterruptedException
@@ -509,7 +513,9 @@
 
         //Check message was correctly enqueued
         List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
-        assertNotNull(data);
+        assertNotNull("Message has no enqueued information.", data);
+        assertTrue("Message is not enqueued on correct queue.", data.contains(_queue));
+        assertEquals("Message not enqueued on the right queues.", 1, data.size());             
     }
 
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java Fri Apr  3 17:54:44 2009
@@ -39,7 +39,7 @@
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MockAMQQueue;
 import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 
 public class ACLManagerTest extends TestCase
@@ -66,7 +66,7 @@
         _pluginManager = new MockPluginManager("");
         _authzManager = new ACLManager(_conf, _pluginManager);
         
-        _session = new MockProtocolSession(new TestableMemoryMessageStore());
+        _session = new MockProtocolSession(new MemoryMessageStore().configure());
     }
 
     public void tearDown() throws Exception

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java Fri Apr  3 17:54:44 2009
@@ -37,7 +37,7 @@
 import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
 import org.apache.qpid.server.protocol.TestIoSession;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
@@ -81,14 +81,12 @@
         }
     }
 
-    private TestableMemoryMessageStore _store;
     private VirtualHost _virtualHost;
     private AMQMinaProtocolSession _session;
 
     @Override
     public void setUp() throws Exception
     {
-        _store = new TestableMemoryMessageStore();
         PropertiesConfiguration env = new PropertiesConfiguration();
         _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
         TestIoSession iosession = new TestIoSession();

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Fri Apr  3 17:54:44 2009
@@ -26,28 +26,24 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.server.queue.MessageFactory;
 import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 
-/**
- * Tests that reference counting works correctly with AMQMessage and the message store
- */
+/** Tests that reference counting works correctly with AMQMessage and the message store */
 public class TestReferenceCounting extends TestCase
 {
-    private TestableMemoryMessageStore _store;
+    private TestableTransactionLog _store;
 
     private StoreContext _storeContext = new StoreContext();
 
-
     protected void setUp() throws Exception
     {
         super.setUp();
-        _store = new TestableMemoryMessageStore();
+        _store = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
     }
 
-    /**
-     * Check that when the reference count is decremented the message removes itself from the store
-     */
+    /** Check that when the reference count is decremented the message removes itself from the store */
     public void testMessageGetsRemoved() throws AMQException
     {
         ContentHeaderBody chb = createPersistentContentHeader();
@@ -57,14 +53,15 @@
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
 
-        assertEquals(1, _store.getMessageMetaDataMap().size());
+        assertNotNull("Message Metadata did not exist for new message",
+                      _store.getMessageMetaData(new StoreContext(), message.getMessageId()));
     }
 
     private ContentHeaderBody createPersistentContentHeader()
     {
         ContentHeaderBody chb = new ContentHeaderBody();
         BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
-        bchp.setDeliveryMode((byte)2);
+        bchp.setDeliveryMode((byte) 2);
         chb.properties = bchp;
         return chb;
     }
@@ -77,8 +74,9 @@
         final ContentHeaderBody chb = createPersistentContentHeader();
         AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
         message.setPublishAndContentHeaderBody(_storeContext, info, chb);
-        
-        assertEquals(1, _store.getMessageMetaDataMap().size());
+
+        assertNotNull("Message Metadata did not exist for new message",
+                      _store.getMessageMetaData(new StoreContext(), message.getMessageId()));
     }
 
     public static junit.framework.Test suite()

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java Fri Apr  3 17:54:44 2009
@@ -21,12 +21,23 @@
 package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
 
 import java.util.Map;
 import java.util.List;
 
 public interface TestTransactionLog extends TransactionLog
 {
+    public void setBaseTransactionLog(BaseTransactionLog base);
+
     public List<AMQQueue> getMessageReferenceMap(Long messageID);
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+    public long getMessageMetaDataSize();
+    public TransactionLog getDelegate();
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Fri Apr  3 17:54:44 2009
@@ -20,192 +20,80 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.routing.RoutingTable;
 import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
 
 /** Adds some extra methods to the memory message store for testing purposes. */
-public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
+public class TestableMemoryMessageStore extends MemoryMessageStore implements TestTransactionLog
 {
-    private TransactionLog _transactionLog;
-    private RoutingTable _routingTable;
-    private MemoryMessageStore _mms;
+    private TestableTransactionLog _base;
 
-    public TestableMemoryMessageStore(TransactionLog log)
+    public void setBaseTransactionLog(BaseTransactionLog base)
     {
-        _transactionLog = log;
-        if (log instanceof BaseTransactionLog)
+        if (!(base instanceof TestableTransactionLog))
         {
-            TransactionLog delegate = ((BaseTransactionLog) log).getDelegate();
-            if (delegate instanceof RoutingTable)
-            {
-                _routingTable = (RoutingTable) delegate;
-            }
-            else
-            {
-                throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log);
-            }
-
-            if (delegate instanceof MemoryMessageStore)
-            {
-                _mms = (MemoryMessageStore) delegate;
-            }
-
-        }
-        else
-        {
-            throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log);
+            throw new RuntimeException("base must be a TestableTransactionLog for correct operation in a TestMemoryMessageStore");
         }
 
+        _base = (TestableTransactionLog) base;
     }
 
-    public TestableMemoryMessageStore(MemoryMessageStore mms)
-    {
-        _routingTable = mms;
-        _transactionLog = mms.configure();
-    }
-
-    public TestableMemoryMessageStore()
-    {
-        _mms = new MemoryMessageStore();
-        _transactionLog = _mms.configure();
-        _routingTable = _mms;        
-    }
-
-    public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
-    {
-        return ((MemoryMessageStore) _routingTable)._metaDataMap;
-    }
-
-    public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
-    {
-        return ((MemoryMessageStore) _routingTable)._contentBodyMap;
-    }
-
-    public List<AMQQueue> getMessageReferenceMap(Long messageId)
-    {
-//        return _mms._messageEnqueueMap.get(messageId);
-//        ((BaseTransactionLog)_transactionLog).
-        return new ArrayList<AMQQueue>();
-    }
-
-    public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+    @Override
+    public TransactionLog configure()
     {
-        _transactionLog  = (TransactionLog) _transactionLog.configure(virtualHost, base, config);
-        return _transactionLog;
-    }
+        BaseTransactionLog base = (BaseTransactionLog) super.configure();
 
-    public void close() throws Exception
-    {
-        _transactionLog.close();
-        _routingTable.close();
-    }
+        _base = new TestableTransactionLog(base.getDelegate());
 
-    public void createExchange(Exchange exchange) throws AMQException
-    {
-        _routingTable.createExchange(exchange);
+        return _base;
     }
 
-    public void removeExchange(Exchange exchange) throws AMQException
+    @Override
+    public TransactionLog configure(String base, VirtualHostConfiguration config)
     {
-        _routingTable.removeExchange(exchange);
-    }
-
-    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
-    {
-        _routingTable.bindQueue(exchange, routingKey, queue, args);
-    }
-
-    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
-    {
-        _routingTable.unbindQueue(exchange, routingKey, queue, args);
-    }
-
-    public void createQueue(AMQQueue queue) throws AMQException
-    {
-        _routingTable.createQueue(queue);
-    }
-
-    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
-    {
-        _routingTable.createQueue(queue, arguments);
-    }
-
-    public void removeQueue(AMQQueue queue) throws AMQException
-    {
-        _routingTable.removeQueue(queue);
-    }
-
-    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
-    {
-        _transactionLog.enqueueMessage(context, queues, messageId);
-    }
-
-    public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
-    {
-        _transactionLog.dequeueMessage(context, queue, messageId);
-    }
-
-    public void removeMessage(StoreContext context, Long messageId) throws AMQException
-    {
-        _transactionLog.removeMessage(context, messageId);
-    }
-
-    public void beginTran(StoreContext context) throws AMQException
-    {
-        _transactionLog.beginTran(context);
-    }
+        //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
+        if (base.equals("store"))
+        {
+            super.configure();
 
-    public void commitTran(StoreContext context) throws AMQException
-    {
-        _transactionLog.commitTran(context);
-    }
+            _base = new TestableTransactionLog(this);
 
-    public void abortTran(StoreContext context) throws AMQException
-    {
-        _transactionLog.abortTran(context);
-    }
+            return _base;
+        }
 
-    public boolean inTran(StoreContext context)
-    {
-        return _transactionLog.inTran(context);
+        return super.configure();
     }
 
-    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+    public List<AMQQueue> getMessageReferenceMap(Long messageId)
     {
-        _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+        return _base.getMessageReferenceMap(messageId);
     }
 
-    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId)
     {
-        _transactionLog.storeMessageMetaData(context, messageId, messageMetaData);
+        return _metaDataMap.get(messageId);
     }
 
-    public boolean isPersistent()
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index)
     {
-        return _transactionLog.isPersistent();
+        List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
+        return bodyList.get(index);
     }
 
-    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    public long getMessageMetaDataSize()
     {
-        return _mms.getMessageMetaData(context, messageId);
+        return _metaDataMap.size();
     }
 
-    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    public TransactionLog getDelegate()
     {
-        return _mms.getContentBodyChunk(context, messageId, index);
+        return _base;
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Fri Apr  3 17:54:44 2009
@@ -33,6 +33,7 @@
 import org.apache.qpid.server.queue.MockContentChunk;
 import org.apache.qpid.server.queue.MockPersistentAMQMessage;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
@@ -47,14 +48,14 @@
     final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
     final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
 
-    TestableTransactionLog _transactionLog;
+    TestTransactionLog _transactionLog;
     private ArrayList<AMQQueue> _queues;
     private MockPersistentAMQMessage _message;
 
     public void setUp() throws Exception
     {
         super.setUp();
-        _transactionLog = new TestableTransactionLog(this);
+        _transactionLog = new TestableBaseTransactionLog(this);
     }
 
     public void testSingleEnqueueNoTransactional() throws AMQException

Copied: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java (from r761721, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java?p2=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java&p1=qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java&r1=761721&r2=761741&rev=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java Fri Apr  3 17:54:44 2009
@@ -20,52 +20,51 @@
  */
 package org.apache.qpid.server.transactionlog;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
-import java.util.LinkedList;
 
-public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog
+public class TestableBaseTransactionLog extends BaseTransactionLog implements TestTransactionLog
 {
 
-    List<Long> _singleEnqueues = new LinkedList<Long>();
-
-    public TestableTransactionLog()
+    public TestableBaseTransactionLog()
     {
         super(null);
     }
 
-    public TestableTransactionLog(BaseTransactionLog delegate)
-    {
-        super(delegate.getDelegate());
-    }
-
-    public TestableTransactionLog(TransactionLog delegate)
+    public TestableBaseTransactionLog(TransactionLog delegate)
     {
         super(delegate);
-    }
+        if (delegate instanceof BaseTransactionLog)
+        {
+            _delegate = ((BaseTransactionLog) delegate).getDelegate();
+        }
 
+    }
 
     @Override
     public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
         if (_delegate != null)
         {
-            TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config);
+            TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config);
 
             // Unwrap any BaseTransactionLog
             if (configuredLog instanceof BaseTransactionLog)
             {
-                _delegate = ((BaseTransactionLog)configuredLog).getDelegate();
+                _delegate = ((BaseTransactionLog) configuredLog).getDelegate();
             }
         }
         else
         {
-                String delegateClass = config.getStoreConfiguration().getString("delegate");
+            String delegateClass = config.getStoreConfiguration().getString("delegate");
             Class clazz = Class.forName(delegateClass);
             Object o = clazz.newInstance();
 
@@ -77,13 +76,54 @@
             _delegate = (TransactionLog) o;
 
             // If a TransactionLog uses the BaseTransactionLog then it will return this object.
-             _delegate.configure(virtualHost, base, config);
+            _delegate.configure(virtualHost, base, config);
         }
         return this;
     }
 
+    public void setBaseTransactionLog(BaseTransactionLog base)
+    {
+        throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs");
+    }
+
     public List<AMQQueue> getMessageReferenceMap(Long messageID)
     {
         return _idToQueues.get(messageID);
     }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public long getMessageMetaDataSize()
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaDataSize();
+        }
+        else
+        {
+            return 0;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java Fri Apr  3 17:54:44 2009
@@ -20,52 +20,95 @@
  */
 package org.apache.qpid.server.transactionlog;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.LinkedList;
+import java.util.Map;
 
 public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog
 {
-
-    List<Long> _singleEnqueues = new LinkedList<Long>();
+    protected Map<Long, List<AMQQueue>> _singleEnqueuedIDstoQueue = new HashMap<Long, List<AMQQueue>>();
 
     public TestableTransactionLog()
     {
         super(null);
     }
 
-    public TestableTransactionLog(BaseTransactionLog delegate)
+    public TestableTransactionLog(TransactionLog delegate)
     {
-        super(delegate.getDelegate());
+        super(delegate);
+        if (delegate instanceof BaseTransactionLog)
+        {
+            _delegate = ((BaseTransactionLog) delegate).getDelegate();
+        }
+
     }
 
-    public TestableTransactionLog(TransactionLog delegate)
+    /**
+     * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting
+     *
+     * @param context   The transactional context for the operation.
+     * @param queues
+     * @param messageId The message to enqueue.  @throws AMQException If the operation fails for any reason.  @throws org.apache.qpid.AMQException
+     *
+     * @throws AMQException
+     */
+    @Override
+    public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
     {
-        super(delegate);
+        if (queues.size() == 1)
+        {
+            _singleEnqueuedIDstoQueue.put(messageId, queues);
+        }
+
+        super.enqueueMessage(context, queues, messageId);
     }
 
+    /**
+     * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting
+     *
+     * @param context   The transactional context for the operation.
+     * @param queue
+     * @param messageId The message to enqueue.  @throws AMQException If the operation fails for any reason.  @throws org.apache.qpid.AMQException
+     *
+     * @throws AMQException
+     */
+    @Override
+    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+    {
+        if (_singleEnqueuedIDstoQueue.containsKey(messageId))
+        {
+            _singleEnqueuedIDstoQueue.remove(messageId);
+        }
+
+        super.dequeueMessage(context, queue, messageId);
+    }
 
     @Override
     public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
     {
         if (_delegate != null)
         {
-            TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config);
+            TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config);
 
             // Unwrap any BaseTransactionLog
             if (configuredLog instanceof BaseTransactionLog)
             {
-                _delegate = ((BaseTransactionLog)configuredLog).getDelegate();
+                _delegate = ((BaseTransactionLog) configuredLog).getDelegate();
             }
         }
         else
         {
-                String delegateClass = config.getStoreConfiguration().getString("delegate");
+            String delegateClass = config.getStoreConfiguration().getString("delegate");
             Class clazz = Class.forName(delegateClass);
             Object o = clazz.newInstance();
 
@@ -77,13 +120,61 @@
             _delegate = (TransactionLog) o;
 
             // If a TransactionLog uses the BaseTransactionLog then it will return this object.
-             _delegate.configure(virtualHost, base, config);
+            _delegate.configure(virtualHost, base, config);
         }
         return this;
     }
 
+    public void setBaseTransactionLog(BaseTransactionLog base)
+    {
+        throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs");
+    }
+
     public List<AMQQueue> getMessageReferenceMap(Long messageID)
     {
-        return _idToQueues.get(messageID);
+        List<AMQQueue> result = _idToQueues.get(messageID);
+
+        if (result == null)
+        {
+            result = _singleEnqueuedIDstoQueue.get(messageID);
+        }
+
+        return result;
+    }
+
+    public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index);
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public long getMessageMetaDataSize()
+    {
+        if (_delegate instanceof TestTransactionLog)
+        {
+            return ((TestTransactionLog) _delegate).getMessageMetaDataSize();
+        }
+        else
+        {
+            return 0;
+        }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Fri Apr  3 17:54:44 2009
@@ -34,8 +34,10 @@
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
@@ -62,7 +64,11 @@
     {
         super.setUp();
         PropertiesConfiguration configuration = new PropertiesConfiguration();
-        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+        // This configuration is not used as TestApplicationRegistry just creates a single vhost 'test' with
+        // TransactionLog TestableTransactionLog(TestMemoryMessageStore)
+        configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableTransactionLog.class.getName());
+        configuration.setProperty("virtualhosts.virtualhost.test.store.delegate", TestableMemoryMessageStore.class.getName());
+        
         _registry = new TestApplicationRegistry(new ServerConfiguration(configuration));
         ApplicationRegistry.initialise(_registry);
         _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");        
@@ -96,7 +102,7 @@
 
     protected void checkStoreContents(int messageCount)
     {
-        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size());
+        assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestTransactionLog) _transactionLog).getMessageMetaDataSize());
 
         //The above publish message is sufficiently small not to fit in the header so no Body is required.
         //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java?rev=761741&r1=761740&r2=761741&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java Fri Apr  3 17:54:44 2009
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.util;
 
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -38,9 +37,9 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Properties;
 import java.util.Arrays;
 
@@ -83,7 +82,7 @@
 
         _managedObjectRegistry = new NoopManagedObjectRegistry();
 
-        _transactionLog = new TestableMemoryMessageStore();
+        _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
 
         _virtualHostRegistry = new VirtualHostRegistry();
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message