Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 78620 invoked from network); 3 Apr 2009 13:27:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 3 Apr 2009 13:27:18 -0000 Received: (qmail 37752 invoked by uid 500); 3 Apr 2009 13:27:17 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 37735 invoked by uid 500); 3 Apr 2009 13:27:17 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 37726 invoked by uid 99); 3 Apr 2009 13:27:17 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Apr 2009 13:27:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Apr 2009 13:27:16 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 877432388A36; Fri, 3 Apr 2009 13:26:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r761670 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/store/ main/java/org/apache/qpid/server/transactionlog/ main/java/org/apache/qpid/server/virtualhost/ test/java/org/apache/qpid/server/queue/ test/java/org/apac... Date: Fri, 03 Apr 2009 13:26:56 -0000 To: commits@qpid.apache.org From: ritchiem@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090403132656.877432388A36@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ritchiem Date: Fri Apr 3 13:26:55 2009 New Revision: 761670 URL: http://svn.apache.org/viewvc?rev=761670&view=rev Log: QPID-1764 : Update to BaseTransactionLog to create a TestableTransactionLog, which will replace TestableMessageStore. Update to BaseTransactionLog/Test to work correctly with transactions and to fully test that functionality. Updated StoreContext to know when it is in a transaction as relying on a payload being set is not sufficient as that is not set when running with the MessageMemoryStore and so transactional testing in the BTLT was not correct. Update to Virtualhost to correctly set the RoutingTable when the specified TransactionLog is wrapped in a BaseTransactionLog. Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?rev=761670&r1=761669&r2=761670&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Fri Apr 3 13:26:55 2009 @@ -44,6 +44,7 @@ private HashMap> _enqueueMap; private HashMap> _dequeueMap; private boolean _async; + private boolean _inTransaction; public StoreContext() { @@ -64,6 +65,9 @@ { _name = name; _async = asynchrouous; + _inTransaction = false; + _enqueueMap = new HashMap>(); + _dequeueMap = new HashMap>(); } public StoreContext(boolean asynchronous) @@ -82,7 +86,7 @@ { _logger.debug("public void setPayload(Object payload = " + payload + "): called"); } - _payload = payload; + _payload = payload; } /** @@ -137,7 +141,7 @@ } /** - * Record the dequeue for processing on commit + * Record the dequeue for processing after the commit * * @param queue * @param messageId @@ -146,39 +150,37 @@ */ public void dequeueMessage(AMQQueue queue, Long messageId) throws AMQException { - if (inTransaction()) - { - ArrayList dequeues = _dequeueMap.get(messageId); + ArrayList dequeues = _dequeueMap.get(messageId); - if (dequeues == null) - { - dequeues = new ArrayList(); - _dequeueMap.put(messageId, dequeues); - } - - dequeues.add(queue); + if (dequeues == null) + { + dequeues = new ArrayList(); + _dequeueMap.put(messageId, dequeues); } + + dequeues.add(queue); } public void beginTransaction() throws AMQException { - _enqueueMap = new HashMap>(); - _dequeueMap = new HashMap>(); + _inTransaction = true; } public void commitTransaction() throws AMQException { _dequeueMap.clear(); + _inTransaction = false; } public void abortTransaction() throws AMQException { _enqueueMap.clear(); + _inTransaction = false; } public boolean inTransaction() { - return _payload != null; + return _inTransaction; // _payload != null; } public boolean isAsync() Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=761670&r1=761669&r2=761670&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java Fri Apr 3 13:26:55 2009 @@ -39,7 +39,7 @@ private static final Logger _logger = Logger.getLogger(BaseTransactionLog.class); TransactionLog _delegate; - private Map> _idToQueues = new HashMap>(); + protected Map> _idToQueues = new HashMap>(); public BaseTransactionLog(TransactionLog delegate) { @@ -60,7 +60,7 @@ { context.enqueueMessage(queues, messageId); - if (queues.size() > 0) + if (queues.size() > 1) { _logger.info("Recording Enqueue of (" + messageId + ") on queue:" + queues); @@ -73,10 +73,10 @@ public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException { + context.dequeueMessage(queue, messageId); + if (context.inTransaction()) { - context.dequeueMessage(queue, messageId); - Map> messageMap = context.getDequeueMap(); //For each Message ID that is in the map check @@ -97,11 +97,7 @@ if (!context.inTransaction()) { - HashMap> dequeue = new HashMap>(); - ArrayList list = new ArrayList(); - list.add(queue); - dequeue.put(messageId, list); - processDequeues(dequeue); + processDequeues(context.getDequeueMap()); } } @@ -128,11 +124,7 @@ //Perform real commit of current data _delegate.commitTran(context); - // If we have dequeues to process then process them - if (context.getDequeueMap() != null) - { - processDequeues(context.getDequeueMap()); - } + processDequeues(context.getDequeueMap()); //Commit the recorded state for this transaction. context.commitTransaction(); @@ -141,10 +133,8 @@ public void abortTran(StoreContext context) throws AMQException { // If we have enqueues to rollback - if (context.getEnqueueMap() != null) - { - processDequeues(context.getEnqueueMap()); - } + processDequeues(context.getEnqueueMap()); + //Abort the recorded state for this transaction. context.abortTransaction(); @@ -154,6 +144,12 @@ private void processDequeues(Map> messageMap) throws AMQException { + // Check we have dequeues to process then process them + if (messageMap == null || messageMap.isEmpty()) + { + return; + } + // Process any enqueues to bring our model up to date. Set messageIDs = messageMap.keySet(); @@ -190,6 +186,8 @@ if (enqueuedList.isEmpty()) { _delegate.removeMessage(removeContext, messageID); + //Remove references list + _idToQueues.remove(messageID); } } } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=761670&r1=761669&r2=761670&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Apr 3 13:26:55 2009 @@ -295,14 +295,23 @@ } _transactionLog = (TransactionLog) o; + // If a TransactionLog uses the BaseTransactionLog then it will return this object. + _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config); + //Assign RoutingTable as old MessageStores converted to TransactionLog will require the _routingTable. if (_transactionLog instanceof RoutingTable) { _routingTable = (RoutingTable) _transactionLog; } + else if (_transactionLog instanceof BaseTransactionLog) + { + TransactionLog delegate = ((BaseTransactionLog) _transactionLog).getDelegate(); + if (delegate instanceof RoutingTable) + { + _routingTable = (RoutingTable) delegate; + } + } - // If a TransactionLog uses the BaseTransactionLog then it will return this object. - _transactionLog = (TransactionLog) _transactionLog.configure(this, "store", config); } //todo we need to move from store.class to transactionlog.class Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=761670&r1=761669&r2=761670&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Fri Apr 3 13:26:55 2009 @@ -390,6 +390,7 @@ { sendMessage(txnContext); + // This check may be too soon as a purging thread may be required to bring the queue back under quota. long usage = _queue.getMemoryUsageCurrent(); assertTrue("Queue has gone over quota:" + usage, usage <= _queue.getMemoryUsageMaximum()); Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java?rev=761670&r1=761669&r2=761670&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java Fri Apr 3 13:26:55 2009 @@ -21,11 +21,12 @@ package org.apache.qpid.server.store; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.transactionlog.TransactionLog; import java.util.Map; import java.util.List; -public interface TestTransactionLog +public interface TestTransactionLog extends TransactionLog { public List getMessageReferenceMap(Long messageID); } Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java?rev=761670&r1=761669&r2=761670&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java Fri Apr 3 13:26:55 2009 @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class BaseTransactionLogTest extends TestCase implements TransactionLog @@ -46,14 +47,14 @@ final private Map> _storeChunks = new HashMap>(); final private Map _storeMetaData = new HashMap(); - BaseTransactionLog _transactionLog; + TestableTransactionLog _transactionLog; private ArrayList _queues; private MockPersistentAMQMessage _message; public void setUp() throws Exception { super.setUp(); - _transactionLog = new BaseTransactionLog(this); + _transactionLog = new TestableTransactionLog(this); } public void testSingleEnqueueNoTransactional() throws AMQException @@ -87,11 +88,9 @@ // Enqueue a message to dequeue testSingleEnqueueNoTransactional(); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } public void testSingleEnqueueTransactional() throws AMQException @@ -137,16 +136,13 @@ _transactionLog.beginTran(context); - _transactionLog.dequeueMessage(context,_queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(context, _queues.get(0), _message.getMessageId()); _transactionLog.commitTran(context); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } - public void testMultipleEnqueueNoTransactional() throws AMQException { //Store Data @@ -185,34 +181,54 @@ // Enqueue a message to dequeue testMultipleEnqueueNoTransactional(); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(0), _message.getMessageId()); + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); ArrayList enqueued = _enqueues.get(_message.getMessageId()); - assertNotNull("Message not enqueued", enqueued); - assertFalse("Message still enqueued on the first queue,",enqueued.contains(_queues.get(0))); - assertEquals("Message should still be enqueued on 2 queues", 2, enqueued.size()); - assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); - assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + _queues.remove(0); + verifyEnqueuedOnQueues(_message.getMessageId(), _queues); + verifyMessageStored(_message.getMessageId()); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(1), _message.getMessageId()); + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); - enqueued = _enqueues.get(_message.getMessageId()); - assertNotNull("Message not enqueued", enqueued); - assertFalse("Message still enqueued on the second queue,",enqueued.contains(_queues.get(1))); - assertEquals("Message should still be enqueued on 2 queues", 1, enqueued.size()); - - assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); - assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); + assertFalse("Message still enqueued on the first queue,", enqueued.contains(_queues.get(0))); + _queues.remove(0); - _transactionLog.dequeueMessage(new StoreContext(),_queues.get(2), _message.getMessageId()); + ArrayList enqueues = _enqueues.get(_message.getMessageId()); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + assertNotNull("Message not enqueued", enqueues); + assertEquals("Message is not enqueued on the right number of queues", _queues.size(), enqueues.size()); + for (AMQQueue queue : _queues) + { + assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue)); + } + + //Use the reference map to ensure that we are enqueuing the right number of messages + List references = _transactionLog.getMessageReferenceMap(_message.getMessageId()); + + assertNotNull("Message not enqueued", references); + assertEquals("Message is not enqueued on the right number of queues", _queues.size(), references.size()); + for (AMQQueue queue : references) + { + assertTrue("Message not enqueued on:" + queue, references.contains(queue)); + } + + verifyMessageStored(_message.getMessageId()); + + _transactionLog.dequeueMessage(new StoreContext(), _queues.get(0), _message.getMessageId()); + + verifyMessageRemoved(_message.getMessageId()); } + private void verifyMessageRemoved(Long messageID) + { + assertNull("Message references exist", _transactionLog.getMessageReferenceMap(messageID)); + assertNull("Message enqueued", _enqueues.get(messageID)); + assertNull("Message chunks enqueued", _storeChunks.get(messageID)); + assertNull("Message meta data enqueued", _storeMetaData.get(messageID)); + } public void testMultipleEnqueueTransactional() throws AMQException { @@ -294,12 +310,10 @@ _transactionLog.commitTran(context); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } - public void testMultipleDequeueSingleTransaction() throws AMQException + public void testMultipleDequeueSingleTransaction() throws AMQException { // Enqueue a message to dequeue testMultipleEnqueueTransactional(); @@ -318,10 +332,8 @@ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); - _transactionLog.dequeueMessage(context, _queues.get(1), _message.getMessageId()); - enqueued = _enqueues.get(_message.getMessageId()); assertNotNull("Message not enqueued", enqueued); assertFalse("Message still enqueued on the second queue,", enqueued.contains(_queues.get(1))); @@ -330,14 +342,11 @@ assertNotNull("Message not enqueued", _storeChunks.get(_message.getMessageId())); assertNotNull("Message not enqueued", _storeMetaData.get(_message.getMessageId())); - _transactionLog.dequeueMessage(context, _queues.get(2), _message.getMessageId()); _transactionLog.commitTran(context); - assertNull("Message enqueued", _enqueues.get(_message.getMessageId())); - assertNull("Message enqueued", _storeChunks.get(_message.getMessageId())); - assertNull("Message enqueued", _storeMetaData.get(_message.getMessageId())); + verifyMessageRemoved(_message.getMessageId()); } private void verifyMessageStored(Long messageId) @@ -356,6 +365,23 @@ { assertTrue("Message not enqueued on:" + queue, enqueues.contains(queue)); } + + //Use the reference map to ensure that we are enqueuing the right number of messages + List references = _transactionLog.getMessageReferenceMap(messageId); + + if (queues.size() == 1) + { + assertNull("Message has an enqueued list", references); + } + else + { + assertNotNull("Message not enqueued", references); + assertEquals("Message is not enqueued on the right number of queues", queues.size(), references.size()); + for (AMQQueue queue : references) + { + assertTrue("Message not enqueued on:" + queue, references.contains(queue)); + } + } } /*************************** TransactionLog ******************************* @@ -419,19 +445,42 @@ if (queues == null) { - throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + - "queue(" + queue + ") but no enqueue data available"); - } + boolean found = false; + // If we are in a transaction we may have already done the dequeue. + if (context.inTransaction()) + { - synchronized (queues) - { - if (!queues.contains(queue)) + for (Object record : (ArrayList) context.getPayload()) + { + if (record instanceof RemoveRecord) + { + if (((RemoveRecord) record)._messageId.equals(messageId)) + { + found = true; + break; + } + } + } + } + + if (!found) { throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + - "queue(" + queue + ") but no message not enqueued on queue"); + "queue(" + queue + ") but no enqueue data available"); + } + } + else + { + synchronized (queues) + { + if (!queues.contains(queue)) + { + throw new RuntimeException("Attempt to dequeue message(" + messageId + ") from " + + "queue(" + queue + ") but no message not enqueued on queue"); + } + + queues.remove(queue); } - - queues.remove(queue); } } @@ -450,21 +499,29 @@ "no enqueue data available"); } - if (!queues.isEmpty()) + if (queues.size() > 1) { throw new RuntimeException("Removed a message(" + messageId + ") that still had references."); } + MessageMetaData mmd; synchronized (_storeMetaData) { - _storeMetaData.remove(messageId); + mmd = _storeMetaData.remove(messageId); } + ArrayList chunks; synchronized (_storeChunks) { - _storeChunks.remove(messageId); + chunks = _storeChunks.remove(messageId); } + //Record the remove for part of the transaction + if (context.inTransaction()) + { + ArrayList transactionData = (ArrayList) context.getPayload(); + transactionData.add(new RemoveRecord(messageId, queues, mmd, chunks)); + } } // @@ -474,7 +531,7 @@ public void beginTran(StoreContext context) throws AMQException { - context.setPayload(new Object()); + context.setPayload(new ArrayList()); } public void commitTran(StoreContext context) throws AMQException @@ -532,4 +589,20 @@ { return false; } + + class RemoveRecord + { + MessageMetaData _mmd; + ArrayList _queues; + ArrayList _chunks; + Long _messageId; + + RemoveRecord(Long messageId, ArrayList queues, MessageMetaData mmd, ArrayList chunks) + { + _messageId = messageId; + _queues = queues; + _mmd = mmd; + _chunks = chunks; + } + } } Added: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java?rev=761670&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java (added) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java Fri Apr 3 13:26:55 2009 @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transactionlog; + +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.store.TestTransactionLog; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.routing.RoutingTable; + +import java.util.List; +import java.util.LinkedList; + +public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog +{ + + List _singleEnqueues = new LinkedList(); + + public TestableTransactionLog() + { + super(null); + } + + public TestableTransactionLog(BaseTransactionLog delegate) + { + super(delegate.getDelegate()); + } + + public TestableTransactionLog(TransactionLog delegate) + { + super(delegate); + } + + + @Override + public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception + { + if (_delegate != null) + { + TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config); + + // Unwrap any BaseTransactionLog + if (configuredLog instanceof BaseTransactionLog) + { + _delegate = ((BaseTransactionLog)configuredLog).getDelegate(); + } + } + else + { + String delegateClass = config.getStoreConfiguration().getString("delegate"); + Class clazz = Class.forName(delegateClass); + Object o = clazz.newInstance(); + + if (!(o instanceof TransactionLog)) + { + throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz + + " does not."); + } + _delegate = (TransactionLog) o; + + // If a TransactionLog uses the BaseTransactionLog then it will return this object. + _delegate.configure(virtualHost, base, config); + } + return this; + } + + public List getMessageReferenceMap(Long messageID) + { + return _idToQueues.get(messageID); + } +} --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org