qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1584600 [1/4] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/...
Date Fri, 04 Apr 2014 10:21:38 GMT
Author: kwall
Date: Fri Apr  4 10:21:37 2014
New Revision: 1584600

URL: http://svn.apache.org/r1584600
Log:
QPID-5653: Replace DurableConfigurationStore/MessageStore recoverers with visitors.

* MS/DCS impls now have stateless visitXXX methods to retrieve message/configuration data (replaces the recoverXXXX methods)
* VH implementations now uses Handlers to perform the recovery operation.
* DCS's handler (ConfiguredObjectRecordRecoveverAndUpgrader) currently implemented in terms of the old DefaultUpgradeProvider/DurableConfigurationRecoverer.
  This will be refactored by a future commit.

Added:
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordRecoveverAndUpgrader.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Xid.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/ConfiguredObjectRecordHandler.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/DistributedTransactionHandler.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageHandler.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/handler/MessageInstanceHandler.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestRecord.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
      - copied, changed from r1584379, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
Removed:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStoreFactory.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
    qpid/trunk/qpid/java/broker-core/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageStoreFactory
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ReferenceCountingTest.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/memory-store/pom.xml
    qpid/trunk/qpid/java/broker-plugins/memory-store/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes
    qpid/trunk/qpid/java/test-profiles/JavaBDBExcludes
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java Fri Apr  4 10:21:37 2014
@@ -29,15 +29,15 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-import org.apache.qpid.server.virtualhost.DefaultUpgraderProvider;
+import org.apache.qpid.server.virtualhost.MessageStoreRecoverer;
 import org.apache.qpid.server.virtualhost.State;
-import org.apache.qpid.server.virtualhost.VirtualHostConfigRecoveryHandler;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import com.sleepycat.je.rep.StateChangeEvent;
@@ -98,17 +98,12 @@ public class BDBHAVirtualHost extends Ab
         {
             _messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
 
-            DefaultUpgraderProvider upgraderProvider = new DefaultUpgraderProvider(this);
-
-            DurableConfigurationRecoverer configRecoverer =
-                    new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
-                            upgraderProvider, getEventLogger());
-            _messageStore.recoverConfigurationStore(configRecoverer);
+            ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+            _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
 
             initialiseModel();
 
-            VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(BDBHAVirtualHost.this, getMessageStoreLogSubject());
-            _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+            new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
 
             attainActivation();
         }

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Fri Apr  4 10:21:37 2014
@@ -27,8 +27,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
@@ -38,29 +36,23 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.store.EventManager;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMemoryMessage;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
-import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
 import org.apache.qpid.server.store.berkeleydb.entry.HierarchyKey;
 import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
 import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
 import org.apache.qpid.server.store.berkeleydb.tuple.ConfiguredObjectBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.HierarchyKeyBinding;
@@ -70,6 +62,10 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
 import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
 import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.util.MapValueConverter;
 import org.apache.qpid.util.FileUtils;
 
@@ -129,7 +125,6 @@ public class BDBMessageStore implements 
     private long _persistentSizeHighThreshold;
 
     private final EventManager _eventManager = new EventManager();
-    private final String _type;
 
     private final EnvironmentFacadeFactory _environmentFacadeFactory;
 
@@ -143,7 +138,6 @@ public class BDBMessageStore implements 
 
     public BDBMessageStore(EnvironmentFacadeFactory environmentFacadeFactory)
     {
-        _type = environmentFacadeFactory.getType();
         _environmentFacadeFactory = environmentFacadeFactory;
     }
 
@@ -160,18 +154,19 @@ public class BDBMessageStore implements 
         {
             if (_environmentFacade == null)
             {
-                String[] databaseNames = null;
+                EnvironmentFacadeTask[] initialisationTasks = null;
                 if (MapValueConverter.getBooleanAttribute(IS_MESSAGE_STORE_TOO, storeSettings, false))
                 {
-                    databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
+                    String[] databaseNames = new String[CONFIGURATION_STORE_DATABASE_NAMES.length + MESSAGE_STORE_DATABASE_NAMES.length];
                     System.arraycopy(CONFIGURATION_STORE_DATABASE_NAMES, 0, databaseNames, 0, CONFIGURATION_STORE_DATABASE_NAMES.length);
                     System.arraycopy(MESSAGE_STORE_DATABASE_NAMES, 0, databaseNames, CONFIGURATION_STORE_DATABASE_NAMES.length, MESSAGE_STORE_DATABASE_NAMES.length);
+                    initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(databaseNames), new DiskSpaceTask(), new MaxMessageIdTask() };
                 }
                 else
                 {
-                    databaseNames = CONFIGURATION_STORE_DATABASE_NAMES;
+                    initialisationTasks = new EnvironmentFacadeTask[]{new UpgradeTask(parent), new OpenDatabasesTask(CONFIGURATION_STORE_DATABASE_NAMES)};
                 }
-                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, new UpgradeTask(parent), new OpenDatabasesTask(databaseNames));
+                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(storeSettings, initialisationTasks);
             }
             else
             {
@@ -181,12 +176,88 @@ public class BDBMessageStore implements 
     }
 
     @Override
-    public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
     {
         checkConfigurationStoreOpen();
 
+        try
+        {
+            int configVersion = getConfigVersion();
+
+            handler.begin(configVersion);
+            doVisitAllConfiguredObjectRecords(handler);
+
+            int newConfigVersion = handler.end();
+            if(newConfigVersion != configVersion)
+            {
+                updateConfigVersion(newConfigVersion);
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot visit configured object records", e);
+        }
+
+    }
+
+    private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+    {
+        Map<UUID, BDBConfiguredObjectRecord> configuredObjects = new HashMap<UUID, BDBConfiguredObjectRecord>();
+        Cursor objectsCursor = null;
+        Cursor hierarchyCursor = null;
+        try
+        {
+            objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+
+
+            while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
+
+                BDBConfiguredObjectRecord configuredObject =
+                        (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
+                configuredObjects.put(configuredObject.getId(), configuredObject);
+            }
+
+            // set parents
+            hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
+            while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
+                UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
+                BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
+                if(child != null)
+                {
+                    ConfiguredObjectRecord parent = configuredObjects.get(parentId);
+                    if(parent != null)
+                    {
+                        child.addParent(hk.getParentType(), parent);
+                    }
+                    else if(hk.getParentType().equals("Exchange"))
+                    {
+                        // TODO - remove this hack for the pre-defined exchanges
+                        child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
+                    }
+                }
+            }
+        }
+        finally
+        {
+            closeCursorSafely(objectsCursor);
+            closeCursorSafely(hierarchyCursor);
+        }
+
+        for (ConfiguredObjectRecord record : configuredObjects.values())
+        {
+            boolean shoudlContinue = handler.handle(record);
+            if (!shoudlContinue)
+            {
+                break;
+            }
+        }
 
-        recoverConfig(recoveryHandler);
     }
 
     @Override
@@ -210,7 +281,8 @@ public class BDBMessageStore implements 
 
             if (_environmentFacade == null)
             {
-                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings, new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask());
+                _environmentFacade = _environmentFacadeFactory.createEnvironmentFacade(messageStoreSettings,
+                        new UpgradeTask(parent), new OpenDatabasesTask(MESSAGE_STORE_DATABASE_NAMES), new DiskSpaceTask(), new MaxMessageIdTask());
             }
 
             _committer = _environmentFacade.createCommitter(parent.getName());
@@ -219,21 +291,6 @@ public class BDBMessageStore implements 
     }
 
     @Override
-    public synchronized void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler) throws StoreException
-    {
-        checkMessageStoreOpen();
-
-        if(messageRecoveryHandler != null)
-        {
-            recoverMessages(messageRecoveryHandler);
-        }
-        if(transactionLogRecoveryHandler != null)
-        {
-            recoverQueueEntries(transactionLogRecoveryHandler);
-        }
-    }
-
-    @Override
     public org.apache.qpid.server.store.Transaction newTransaction() throws StoreException
     {
         checkMessageStoreOpen();
@@ -315,27 +372,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    private void recoverConfig(ConfigurationRecoveryHandler recoveryHandler) throws StoreException
-    {
-        try
-        {
-            final int configVersion = getConfigVersion();
-            recoveryHandler.beginConfigurationRecovery(this, configVersion);
-            loadConfiguredObjects(recoveryHandler);
-
-            final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
-            if(newConfigVersion != configVersion)
-            {
-                updateConfigVersion(newConfigVersion);
-            }
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Error recovering persistent state: " + e.getMessage(), e);
-        }
-
-    }
-
     @SuppressWarnings("resource")
     private void updateConfigVersion(int newConfigVersion) throws StoreException
     {
@@ -400,62 +436,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException, StoreException
-    {
-        Cursor objectsCursor = null;
-        Cursor hierarchyCursor = null;
-        try
-        {
-            objectsCursor = getConfiguredObjectsDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-
-            Map<UUID, BDBConfiguredObjectRecord> configuredObjects =
-                    new HashMap<UUID, BDBConfiguredObjectRecord>();
-
-            while (objectsCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-
-                BDBConfiguredObjectRecord configuredObject =
-                        (BDBConfiguredObjectRecord) new ConfiguredObjectBinding(id).entryToObject(value);
-                configuredObjects.put(configuredObject.getId(), configuredObject);
-            }
-
-            // set parents
-            hierarchyCursor = getConfiguredObjectHierarchyDb().openCursor(null, null);
-            while (hierarchyCursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                HierarchyKey hk = HierarchyKeyBinding.getInstance().entryToObject(key);
-                UUID parentId = UUIDTupleBinding.getInstance().entryToObject(value);
-                BDBConfiguredObjectRecord child = configuredObjects.get(hk.getChildId());
-                if(child != null)
-                {
-                    ConfiguredObjectRecord parent = configuredObjects.get(parentId);
-                    if(parent != null)
-                    {
-                        child.addParent(hk.getParentType(), parent);
-                    }
-                    else if(hk.getParentType().equals("Exchange"))
-                    {
-                        // TODO - remove this hack for the pre-defined exchanges
-                        child.addParent(hk.getParentType(), new BDBConfiguredObjectRecord(parentId, "Exchange", Collections.<String,Object>emptyMap()));
-                    }
-                }
-            }
-
-            for (ConfiguredObjectRecord record : configuredObjects.values())
-            {
-                crh.configuredObject(record);
-            }
-        }
-        finally
-        {
-            closeCursorSafely(objectsCursor);
-            closeCursorSafely(hierarchyCursor);
-        }
-    }
-
     private void closeCursorSafely(Cursor cursor) throws StoreException
     {
         if (cursor != null)
@@ -471,124 +451,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    private void recoverMessages(MessageStoreRecoveryHandler msrh) throws StoreException
-    {
-        StoredMessageRecoveryHandler mrh = msrh.begin();
-
-        Cursor cursor = null;
-        try
-        {
-            cursor = getMessageMetaDataDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry value = new DatabaseEntry();
-            MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
-
-            long maxId = 0;
-
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                long messageId = LongBinding.entryToLong(key);
-                StorableMessageMetaData metaData = valueBinding.entryToObject(value);
-
-                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
-
-                mrh.message(message);
-
-                maxId = Math.max(maxId, messageId);
-            }
-
-            _messageId.set(maxId);
-            mrh.completeMessageRecovery();
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover messages", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-    }
-
-    private void recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler)
-    throws StoreException
-    {
-        QueueEntryRecoveryHandler qerh = recoveryHandler.begin(this);
-
-        ArrayList<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
-
-        Cursor cursor = null;
-        try
-        {
-            cursor = getDeliveryDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-
-            DatabaseEntry value = new DatabaseEntry();
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                QueueEntryKey qek = keyBinding.entryToObject(key);
-
-                entries.add(qek);
-            }
-
-            try
-            {
-                cursor.close();
-            }
-            finally
-            {
-                cursor = null;
-            }
-
-            for(QueueEntryKey entry : entries)
-            {
-                UUID queueId = entry.getQueueId();
-                long messageId = entry.getMessageId();
-                qerh.queueEntry(queueId, messageId);
-            }
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover queue entries", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-
-        TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
-
-        cursor = null;
-        try
-        {
-            cursor = getXidDb().openCursor(null, null);
-            DatabaseEntry key = new DatabaseEntry();
-            XidBinding keyBinding = XidBinding.getInstance();
-            PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
-            DatabaseEntry value = new DatabaseEntry();
-
-            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
-            {
-                Xid xid = keyBinding.entryToObject(key);
-                PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
-                dtxrh.dtxRecord(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
-                                preparedTransaction.getEnqueues(),preparedTransaction.getDequeues());
-            }
-
-        }
-        catch (DatabaseException e)
-        {
-            throw _environmentFacade.handleDatabaseException("Cannot recover transactions", e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-
-
-        dtxrh.completeDtxRecordRecovery();
-    }
 
     void removeMessage(long messageId, boolean sync) throws StoreException
     {
@@ -739,6 +601,12 @@ public class BDBMessageStore implements 
     public void create(ConfiguredObjectRecord configuredObject) throws StoreException
     {
         checkConfigurationStoreOpen();
+
+        if (LOGGER.isDebugEnabled())
+        {
+            LOGGER.debug("Create " + configuredObject);
+        }
+
         com.sleepycat.je.Transaction txn = null;
         try
         {
@@ -832,7 +700,7 @@ public class BDBMessageStore implements 
     {
         if (LOGGER.isDebugEnabled())
         {
-            LOGGER.debug("Updating " + record.getType() + ", id: " + record.getId());
+            LOGGER.debug("Updating, creating " + createIfNecessary + " : "  + record);
         }
 
         DatabaseEntry key = new DatabaseEntry();
@@ -890,8 +758,7 @@ public class BDBMessageStore implements 
             if (LOGGER.isDebugEnabled())
             {
                 LOGGER.debug("Enqueuing message " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
-                        + " in transaction " + tx);
+                        + queue.getName() + " with id " + queue.getId() + " in transaction " + tx);
             }
             getDeliveryDb().put(tx, key, value);
         }
@@ -899,8 +766,7 @@ public class BDBMessageStore implements 
         {
             LOGGER.error("Failed to enqueue: " + e.getMessage(), e);
             throw _environmentFacade.handleDatabaseException("Error writing enqueued message with id " + messageId + " for queue "
-                    + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + queue.getId()
-                    + " to database", e);
+                    + queue.getName() + " with id " + queue.getId() + " to database", e);
         }
     }
 
@@ -925,7 +791,7 @@ public class BDBMessageStore implements 
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Dequeue message id " + messageId + " from queue "
-                    + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                    + queue.getName() + " with id " + id);
         }
 
         try
@@ -935,19 +801,18 @@ public class BDBMessageStore implements 
             if (status == OperationStatus.NOTFOUND)
             {
                 throw new StoreException("Unable to find message with id " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                        + queue.getName() + " with id "  + id);
             }
             else if (status != OperationStatus.SUCCESS)
             {
                 throw new StoreException("Unable to remove message with id " + messageId + " on queue"
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id);
+                        + queue.getName() + " with id " + id);
             }
 
             if (LOGGER.isDebugEnabled())
             {
                 LOGGER.debug("Removed message " + messageId + " on queue "
-                        + (queue instanceof AMQQueue ? ((AMQQueue) queue).getName() + " with id " : "") + id
-                        + " from delivery db");
+                        + queue.getName() + " with id " + id);
 
             }
         }
@@ -1073,57 +938,6 @@ public class BDBMessageStore implements 
     }
 
     /**
-     * Primarily for testing purposes.
-     *
-     * @param queueId
-     *
-     * @return a list of message ids for messages enqueued for a particular queue
-     */
-    List<Long> getEnqueuedMessages(UUID queueId) throws StoreException
-    {
-        Cursor cursor = null;
-        try
-        {
-            cursor = getDeliveryDb().openCursor(null, null);
-
-            DatabaseEntry key = new DatabaseEntry();
-
-            QueueEntryKey dd = new QueueEntryKey(queueId, 0);
-
-            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
-            keyBinding.objectToEntry(dd, key);
-
-            DatabaseEntry value = new DatabaseEntry();
-
-            LinkedList<Long> messageIds = new LinkedList<Long>();
-
-            OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
-            dd = keyBinding.entryToObject(key);
-
-            while ((status == OperationStatus.SUCCESS) && dd.getQueueId().equals(queueId))
-            {
-
-                messageIds.add(dd.getMessageId());
-                status = cursor.getNext(key, value, LockMode.DEFAULT);
-                if (status == OperationStatus.SUCCESS)
-                {
-                    dd = keyBinding.entryToObject(key);
-                }
-            }
-
-            return messageIds;
-        }
-        catch (DatabaseException e)
-        {
-            throw new StoreException("Database error: " + e.getMessage(), e);
-        }
-        finally
-        {
-            closeCursorSafely(cursor);
-        }
-    }
-
-    /**
      * Return a valid, currently unused message id.
      *
      * @return A fresh message id.
@@ -1793,12 +1607,6 @@ public class BDBMessageStore implements 
         }
     }
 
-    @Override
-    public String getStoreType()
-    {
-        return _type;
-    }
-
     private Database getConfiguredObjectsDb()
     {
         return _environmentFacade.getOpenDatabase(CONFIGURED_OBJECTS_DB_NAME);
@@ -1902,4 +1710,147 @@ public class BDBMessageStore implements 
         }
 
     }
+
+    public class MaxMessageIdTask implements EnvironmentFacadeTask, MessageHandler
+    {
+        private long _maxId;
+
+        @Override
+        public void execute(EnvironmentFacade facade)
+        {
+            visitMessagesInternal(this, facade);
+            _messageId.set(_maxId);
+        }
+
+        @Override
+        public boolean handle(StoredMessage<?> storedMessage)
+        {
+            long id = storedMessage.getMessageNumber();
+            if (_maxId<id)
+            {
+                _maxId = id;
+            }
+            return true;
+        }
+
+    }
+
+    @Override
+    public void visitMessages(MessageHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+        visitMessagesInternal(handler, _environmentFacade);
+    }
+
+    private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade)
+    {
+        Cursor cursor = null;
+        try
+        {
+            cursor = environmentFacade.getOpenDatabase(MESSAGE_META_DATA_DB_NAME).openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                long messageId = LongBinding.entryToLong(key);
+                StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+                StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+
+                if (!handler.handle(message))
+                {
+                    break;
+                }
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw environmentFacade.handleDatabaseException("Cannot recover messages", e);
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch(DatabaseException e)
+                {
+                    throw environmentFacade.handleDatabaseException("Cannot close cursor", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = getDeliveryDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+            DatabaseEntry value = new DatabaseEntry();
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                QueueEntryKey entry = keyBinding.entryToObject(key);
+                UUID queueId = entry.getQueueId();
+                long messageId = entry.getMessageId();
+                if (!handler.handle(queueId, messageId))
+                {
+                    break;
+                }
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot visit message instances", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+    }
+
+    @Override
+    public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = getXidDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            XidBinding keyBinding = XidBinding.getInstance();
+            PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
+            DatabaseEntry value = new DatabaseEntry();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                Xid xid = keyBinding.entryToObject(key);
+                PreparedTransaction preparedTransaction = valueBinding.entryToObject(value);
+                if (!handler.handle(xid.getFormat(),xid.getGlobalId(),xid.getBranchId(),
+                                preparedTransaction.getEnqueues(),preparedTransaction.getDequeues()))
+                {
+                    break;
+                }
+            }
+
+        }
+        catch (DatabaseException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot recover distributed transactions", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java Fri Apr  4 10:21:37 2014
@@ -25,7 +25,7 @@ import com.sleepycat.bind.tuple.TupleBin
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
 
 public class XidBinding extends TupleBinding<Xid>
 {

Copied: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (from r1584379, qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?p2=qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java&p1=qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java&r1=1584379&r2=1584600&rev=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Apr  4 10:21:37 2014
@@ -20,14 +20,11 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -35,25 +32,15 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10;
 import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
 import org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
-import org.apache.qpid.server.store.MessageStoreTest;
+import org.apache.qpid.server.store.MessageStoreTestCase;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
@@ -62,15 +49,31 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.MessageDeliveryPriority;
 import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.util.FileUtils;
 
 /**
- * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * Subclass of MessageStoreTestCase which runs the standard tests from the superclass against
  * the BDB Store as well as additional tests specific to the BDB store-implementation.
  */
-public class BDBMessageStoreTest extends MessageStoreTest
+public class BDBMessageStoreTest extends MessageStoreTestCase
 {
     private static byte[] CONTENT_BYTES = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
 
+    private String _storeLocation;
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            deleteStoreIfExists();
+        }
+    }
+
     /**
      * Tests that message metadata and content are successfully read back from a
      * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
@@ -78,9 +81,7 @@ public class BDBMessageStoreTest extends
      */
     public void testBDBMessagePersistence() throws Exception
     {
-        MessageStore store = getVirtualHost().getMessageStore();
-
-        BDBMessageStore bdbStore = assertBDBStore(store);
+        BDBMessageStore bdbStore = (BDBMessageStore)getStore();
 
         // Create content ByteBuffers.
         // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
@@ -133,12 +134,13 @@ public class BDBMessageStoreTest extends
         /*
          * reload the store only (read-only)
          */
-        BDBMessageStore readOnlyStore = reloadStore(bdbStore);
+        reopenStore();
 
         /*
          * Read back and validate the 0-8 message metadata and content
          */
-        StorableMessageMetaData storeableMMD_0_8 = readOnlyStore.getMessageMetaData(messageid_0_8);
+        BDBMessageStore reopenedBdbStore = (BDBMessageStore) getStore();
+        StorableMessageMetaData storeableMMD_0_8 = reopenedBdbStore.getMessageMetaData(messageid_0_8);
 
         assertEquals("Unexpected message type", MessageMetaDataType_0_8.TYPE, storeableMMD_0_8.getType().ordinal());
         assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
@@ -162,7 +164,7 @@ public class BDBMessageStoreTest extends
         assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
 
         ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.getBodySize()) ;
-        long recoveredCount_0_8 = readOnlyStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+        long recoveredCount_0_8 = reopenedBdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
         assertEquals("Incorrect amount of payload data recovered", chb_0_8.getBodySize(), recoveredCount_0_8);
         String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
         assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
@@ -170,7 +172,7 @@ public class BDBMessageStoreTest extends
         /*
          * Read back and validate the 0-10 message metadata and content
          */
-        StorableMessageMetaData storeableMMD_0_10 = readOnlyStore.getMessageMetaData(messageid_0_10);
+        StorableMessageMetaData storeableMMD_0_10 = reopenedBdbStore.getMessageMetaData(messageid_0_10);
 
         assertEquals("Unexpected message type", MessageMetaDataType_0_10.TYPE, storeableMMD_0_10.getType().ordinal());
         assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
@@ -193,13 +195,13 @@ public class BDBMessageStoreTest extends
         assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
 
         ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
-        long recoveredCount = readOnlyStore.getContent(messageid_0_10, 0, recoveredContent);
+        long recoveredCount = reopenedBdbStore.getContent(messageid_0_10, 0, recoveredContent);
         assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
 
         String returnedPayloadString_0_10 = new String(recoveredContent.array());
         assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
 
-        readOnlyStore.closeMessageStore();
+        reopenedBdbStore.closeMessageStore();
     }
 
     private DeliveryProperties createDeliveryProperties_0_10()
@@ -226,28 +228,6 @@ public class BDBMessageStoreTest extends
         return msgProps_0_10;
     }
 
-    /**
-     * Close the provided store and create a new (read-only) store to read back the data.
-     *
-     * Use this method instead of reloading the virtual host like other tests in order
-     * to avoid the recovery handler deleting the message for not being on a queue.
-     */
-    private BDBMessageStore reloadStore(BDBMessageStore messageStore) throws Exception
-    {
-        messageStore.closeMessageStore();
-
-
-        BDBMessageStore newStore = new BDBMessageStore();
-
-        MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
-        when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
-        VirtualHost<?> virtualHost = getVirtualHostModel();
-        newStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
-
-        newStore.recoverMessageStore(recoveryHandler, null);
-
-        return newStore;
-    }
 
     private MessagePublishInfo createPublishInfoBody_0_8()
     {
@@ -258,20 +238,24 @@ public class BDBMessageStoreTest extends
                 return new AMQShortString("exchange12345");
             }
 
+            @Override
             public void setExchange(AMQShortString exchange)
             {
             }
 
+            @Override
             public boolean isImmediate()
             {
                 return false;
             }
 
+            @Override
             public boolean isMandatory()
             {
                 return true;
             }
 
+            @Override
             public AMQShortString getRoutingKey()
             {
                 return new AMQShortString("routingKey12345");
@@ -298,9 +282,8 @@ public class BDBMessageStoreTest extends
 
     public void testGetContentWithOffset() throws Exception
     {
-        MessageStore store = getVirtualHost().getMessageStore();
-        BDBMessageStore bdbStore = assertBDBStore(store);
-        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+        BDBMessageStore bdbStore = (BDBMessageStore) getStore();
+        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
         long messageid_0_8 = storedMessage_0_8.getMessageNumber();
 
         // normal case: offset is 0
@@ -350,6 +333,7 @@ public class BDBMessageStoreTest extends
         System.arraycopy(CONTENT_BYTES, 2, expected, 0, 5);
         assertTrue("Unexpected content", Arrays.equals(expected, array));
     }
+
     /**
      * Tests that messages which are added to the store and then removed using the
      * public MessageStore interfaces are actually removed from the store by then
@@ -358,10 +342,9 @@ public class BDBMessageStoreTest extends
      */
     public void testMessageCreationAndRemoval() throws Exception
     {
-        MessageStore store = getVirtualHost().getMessageStore();
-        BDBMessageStore bdbStore = assertBDBStore(store);
+        BDBMessageStore bdbStore = (BDBMessageStore)getStore();
 
-        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(store);
+        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreSingleChunkMessage_0_8(bdbStore);
         long messageid_0_8 = storedMessage_0_8.getMessageNumber();
 
         bdbStore.removeMessage(messageid_0_8, true);
@@ -384,13 +367,6 @@ public class BDBMessageStoreTest extends
         assertEquals("Retrieved content when none was expected",
                         0, bdbStore.getContent(messageid_0_8, 0, dst));
     }
-    private BDBMessageStore assertBDBStore(MessageStore store)
-    {
-
-        assertEquals("Test requires an instance of BDBMessageStore to proceed", BDBMessageStore.class, store.getClass());
-
-        return (BDBMessageStore) store;
-    }
 
     private StoredMessage<MessageMetaData> createAndStoreSingleChunkMessage_0_8(MessageStore store)
     {
@@ -413,254 +389,48 @@ public class BDBMessageStoreTest extends
         return storedMessage_0_8;
     }
 
-    /**
-     * Tests transaction commit by utilising the enqueue and dequeue methods available
-     * in the TransactionLog interface implemented by the store, and verifying the
-     * behaviour using BDB implementation methods.
-     */
-    public void testTranCommit() throws Exception
-    {
-        MessageStore log = getVirtualHost().getMessageStore();
-
-        BDBMessageStore bdbStore = assertBDBStore(log);
-
-        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
-        TransactionLogResource mockQueue = new TransactionLogResource()
-        {
-            @Override
-            public String getName()
-            {
-                return getId().toString();
-            }
-
-            @Override
-            public UUID getId()
-            {
-                return mockQueueId;
-            }
-
-            @Override
-            public boolean isDurable()
-            {
-                return true;
-            }
-        };
-
-        Transaction txn = log.newTransaction();
-
-        txn.enqueueMessage(mockQueue, new MockMessage(1L));
-        txn.enqueueMessage(mockQueue, new MockMessage(5L));
-        txn.commitTran();
-
-        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
-        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
-        Long val = enqueuedIds.get(0);
-        assertEquals("First Message is incorrect", 1L, val.longValue());
-        val = enqueuedIds.get(1);
-        assertEquals("Second Message is incorrect", 5L, val.longValue());
-    }
-
-
-    /**
-     * Tests transaction rollback before a commit has occurred by utilising the
-     * enqueue and dequeue methods available in the TransactionLog interface
-     * implemented by the store, and verifying the behaviour using BDB
-     * implementation methods.
-     */
-    public void testTranRollbackBeforeCommit() throws Exception
-    {
-        MessageStore log = getVirtualHost().getMessageStore();
-
-        BDBMessageStore bdbStore = assertBDBStore(log);
-
-        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
-        TransactionLogResource mockQueue = new TransactionLogResource()
-        {
-            @Override
-            public String getName()
-            {
-                return getId().toString();
-            }
-
-            @Override
-            public UUID getId()
-            {
-                return mockQueueId;
-            }
-
-            @Override
-            public boolean isDurable()
-            {
-                return true;
-            }
-        };
-
-        Transaction txn = log.newTransaction();
-
-        txn.enqueueMessage(mockQueue, new MockMessage(21L));
-        txn.abortTran();
-
-        txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, new MockMessage(22L));
-        txn.enqueueMessage(mockQueue, new MockMessage(23L));
-        txn.commitTran();
-
-        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
-        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
-        Long val = enqueuedIds.get(0);
-        assertEquals("First Message is incorrect", 22L, val.longValue());
-        val = enqueuedIds.get(1);
-        assertEquals("Second Message is incorrect", 23L, val.longValue());
-    }
-
     public void testOnDelete() throws Exception
     {
-        MessageStore log = getVirtualHost().getMessageStore();
-        BDBMessageStore bdbStore = assertBDBStore(log);
-        String storeLocation = bdbStore.getStoreLocation();
+        String storeLocation = getStore().getStoreLocation();
 
         File location = new File(storeLocation);
         assertTrue("Store does not exist at " + storeLocation, location.exists());
 
-        bdbStore.closeMessageStore();
+        getStore().closeMessageStore();
         assertTrue("Store does not exist at " + storeLocation, location.exists());
 
-        bdbStore.onDelete();
+        getStore().onDelete();
         assertFalse("Store exists at " + storeLocation, location.exists());
     }
 
-    /**
-     * Tests transaction rollback after a commit has occurred by utilising the
-     * enqueue and dequeue methods available in the TransactionLog interface
-     * implemented by the store, and verifying the behaviour using BDB
-     * implementation methods.
-     */
-    public void testTranRollbackAfterCommit() throws Exception
+
+    @Override
+    protected Map<String, Object> getStoreSettings() throws Exception
     {
-        MessageStore log = getVirtualHost().getMessageStore();
+        _storeLocation = TMP_FOLDER + File.separator + getTestName();
+        deleteStoreIfExists();
+        Map<String, Object> messageStoreSettings = new HashMap<String, Object>();
+        messageStoreSettings.put(MessageStore.STORE_PATH, _storeLocation);
+        return messageStoreSettings;
 
-        BDBMessageStore bdbStore = assertBDBStore(log);
+    }
 
-        final UUID mockQueueId = UUIDGenerator.generateRandomUUID();
-        TransactionLogResource mockQueue = new TransactionLogResource()
+    private void deleteStoreIfExists()
+    {
+        if (_storeLocation != null)
         {
-            @Override
-            public String getName()
-            {
-                return getId().toString();
-            }
-
-            @Override
-            public UUID getId()
-            {
-                return mockQueueId;
-            }
-
-            @Override
-            public boolean isDurable()
+            File location = new File(_storeLocation);
+            if (location.exists())
             {
-                return true;
+                FileUtils.delete(location, true);
             }
-        };
-
-        Transaction txn = log.newTransaction();
-
-        txn.enqueueMessage(mockQueue, new MockMessage(30L));
-        txn.commitTran();
-
-        txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, new MockMessage(31L));
-        txn.abortTran();
-
-        txn = log.newTransaction();
-        txn.enqueueMessage(mockQueue, new MockMessage(32L));
-        txn.commitTran();
-
-        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueId);
-
-        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
-        Long val = enqueuedIds.get(0);
-        assertEquals("First Message is incorrect", 30L, val.longValue());
-        val = enqueuedIds.get(1);
-        assertEquals("Second Message is incorrect", 32L, val.longValue());
+        }
     }
 
-    @SuppressWarnings("rawtypes")
-    private static class MockMessage implements ServerMessage, EnqueueableMessage
+    @Override
+    protected MessageStore createMessageStore()
     {
-        private long _messageId;
-
-        public MockMessage(long messageId)
-        {
-            _messageId = messageId;
-        }
-
-        public String getInitialRoutingAddress()
-        {
-            return null;
-        }
-
-        public AMQMessageHeader getMessageHeader()
-        {
-            return null;
-        }
-
-        public StoredMessage getStoredMessage()
-        {
-            return null;
-        }
-
-        public boolean isPersistent()
-        {
-            return true;
-        }
-
-        public long getSize()
-        {
-            return 0;
-        }
-
-        public boolean isImmediate()
-        {
-            return false;
-        }
-
-        public long getExpiration()
-        {
-            return 0;
-        }
-
-        public MessageReference newReference()
-        {
-            return null;
-        }
-
-        public long getMessageNumber()
-        {
-            return _messageId;
-        }
-
-        public long getArrivalTime()
-        {
-            return 0;
-        }
-
-        public int getContent(ByteBuffer buf, int offset)
-        {
-            return 0;
-        }
-
-        public ByteBuffer getContent(int offset, int length)
-        {
-            return null;
-        }
-
-        @Override
-        public Object getConnectionReference()
-        {
-            return null;
-        }
+        return new BDBMessageStore();
     }
+
 }

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java?rev=1584600&r1=1584599&r2=1584600&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java Fri Apr  4 10:21:37 2014
@@ -48,7 +48,7 @@ import org.apache.qpid.server.model.Life
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.Xid;
 import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
 import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKey;
 import org.apache.qpid.server.store.berkeleydb.upgrade.UpgradeFrom5To6.CompoundKeyBinding;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message