qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1498976 [1/2] - in /qpid/trunk/qpid/java: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/...
Date Tue, 02 Jul 2013 15:26:43 GMT
Author: rgodfrey
Date: Tue Jul  2 15:26:42 2013
New Revision: 1498976

URL: http://svn.apache.org/r1498976
Log:
QPID-4973 : [Java Broker] Refactor DurableConfigurationStore

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
      - copied, changed from r1498342, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
Removed:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
Modified:
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
    qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Tue Jul  2 15:26:42 2013
@@ -51,16 +51,10 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.*;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
 import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
@@ -150,8 +144,6 @@ public abstract class AbstractBDBMessage
     private final EventManager _eventManager = new EventManager();
     private String _storeLocation;
 
-    private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
     private Map<String, String> _envConfigMap;
 
     public AbstractBDBMessageStore()
@@ -434,17 +426,10 @@ public abstract class AbstractBDBMessage
     {
         try
         {
-            List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-            ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
-            _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
-
-            QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
-            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
-            BindingRecoveryHandler brh = qrh.completeQueueRecovery();
-            _configuredObjectHelper.recoverBindings(brh, configuredObjects);
+            recoveryHandler.beginConfigurationRecovery(this);
+            loadConfiguredObjects(recoveryHandler);
 
-            brh.completeBindingRecovery();
+            recoveryHandler.completeConfigurationRecovery();
         }
         catch (DatabaseException e)
         {
@@ -453,10 +438,9 @@ public abstract class AbstractBDBMessage
 
     }
 
-    private List<ConfiguredObjectRecord> loadConfiguredObjects() throws DatabaseException
+    private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
     {
         Cursor cursor = null;
-        List<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
         try
         {
             cursor = _configuredObjectsDb.openCursor(null, null);
@@ -464,10 +448,10 @@ public abstract class AbstractBDBMessage
             DatabaseEntry value = new DatabaseEntry();
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
-                ConfiguredObjectRecord configuredObject = ConfiguredObjectBinding.getInstance().entryToObject(value);
                 UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
-                configuredObject.setId(id);
-                results.add(configuredObject);
+
+                ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
+                crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
             }
 
         }
@@ -475,7 +459,6 @@ public abstract class AbstractBDBMessage
         {
             closeCursorSafely(cursor);
         }
-        return results;
     }
 
     private void closeCursorSafely(Cursor cursor)
@@ -743,111 +726,43 @@ public abstract class AbstractBDBMessage
         }
     }
 
-    /**
-     * @see DurableConfigurationStore#createExchange(Exchange)
-     */
-    public void createExchange(Exchange exchange) throws AMQStoreException
-    {
-        if (_stateManager.isInState(State.ACTIVE))
-        {
-            ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
-            storeConfiguredObjectEntry(configuredObject);
-        }
-    }
-
-    /**
-     * @see DurableConfigurationStore#removeExchange(Exchange)
-     */
-    public void removeExchange(Exchange exchange) throws AMQStoreException
-    {
-        UUID id = exchange.getId();
-        if (LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug("public void removeExchange(String name = " + exchange.getName() + ", uuid = " + id + "): called");
-        }
-        OperationStatus status = removeConfiguredObject(id);
-        if (status == OperationStatus.NOTFOUND)
-        {
-            throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + id + " not found");
-        }
-    }
-
-
-    /**
-     * @see DurableConfigurationStore#bindQueue(Binding)
-     */
-    public void bindQueue(Binding binding) throws AMQStoreException
+    @Override
+    public void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
     {
         if (_stateManager.isInState(State.ACTIVE))
         {
-            ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+            ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(id, type, attributes);
             storeConfiguredObjectEntry(configuredObject);
         }
     }
 
-    /**
-     * @see DurableConfigurationStore#unbindQueue(Binding)
-     */
-    public void unbindQueue(Binding binding)
-            throws AMQStoreException
+    @Override
+    public void remove(UUID id, String type) throws AMQStoreException
     {
-        UUID id = binding.getId();
         if (LOGGER.isDebugEnabled())
         {
-            LOGGER.debug("public void unbindQueue(Binding binding = " + binding + ", uuid = " + id + "): called");
+            LOGGER.debug("public void remove(id = " + id + ", type="+type+"): called");
         }
-
         OperationStatus status = removeConfiguredObject(id);
         if (status == OperationStatus.NOTFOUND)
         {
-            throw new AMQStoreException("Binding " + binding + " not found");
-        }
-    }
-
-    /**
-     * @see DurableConfigurationStore#createQueue(AMQQueue)
-     */
-    public void createQueue(AMQQueue queue) throws AMQStoreException
-    {
-        createQueue(queue, null);
-    }
-
-    /**
-     * @see DurableConfigurationStore#createQueue(AMQQueue, FieldTable)
-     */
-    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
-    {
-        if (_stateManager.isInState(State.ACTIVE))
-        {
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("public void createQueue(AMQQueue queue(" + queue.getName() + "), queue id" + queue.getId()
-                        + ", arguments=" + arguments + "): called");
-            }
-            ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
-            storeConfiguredObjectEntry(configuredObject);
+            throw new AMQStoreException("Configured object of type " + type + " with id " + id + " not found");
         }
     }
 
-    /**
-     * Updates the specified queue in the persistent store, IF it is already present. If the queue
-     * is not present in the store, it will not be added.
-     *
-     * @param queue The queue to update the entry for.
-     * @throws AMQStoreException If the operation fails for any reason.
-     */
-    public void updateQueue(final AMQQueue queue) throws AMQStoreException
+    @Override
+    public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
     {
         if (LOGGER.isDebugEnabled())
         {
-            LOGGER.debug("Updating queue: " + queue.getName());
+            LOGGER.debug("Updating " +type + ", id: " + id);
         }
 
         try
         {
             DatabaseEntry key = new DatabaseEntry();
             UUIDTupleBinding keyBinding = UUIDTupleBinding.getInstance();
-            keyBinding.objectToEntry(queue.getId(), key);
+            keyBinding.objectToEntry(id, key);
 
             DatabaseEntry value = new DatabaseEntry();
             DatabaseEntry newValue = new DatabaseEntry();
@@ -856,8 +771,7 @@ public abstract class AbstractBDBMessage
             OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
             if (status == OperationStatus.SUCCESS)
             {
-                ConfiguredObjectRecord queueRecord = configuredObjectBinding.entryToObject(value);
-                ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueRecord);
+                ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
 
                 // write the updated entry to the store
                 configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
@@ -879,29 +793,6 @@ public abstract class AbstractBDBMessage
     }
 
     /**
-     * Removes the specified queue from the persistent store.
-     *
-     * @param queue The queue to remove.
-     *
-     * @throws AMQStoreException If the operation fails for any reason.
-     */
-    public void removeQueue(final AMQQueue queue) throws AMQStoreException
-    {
-        UUID id = queue.getId();
-        if (LOGGER.isDebugEnabled())
-        {
-            LOGGER.debug("public void removeQueue(AMQShortString name = " + queue.getName() + ", uuid = " + id + "): called");
-        }
-
-        OperationStatus status = removeConfiguredObject(id);
-        if (status == OperationStatus.NOTFOUND)
-        {
-            throw new AMQStoreException("Queue " + queue.getName() + " with id " + id + " not found");
-        }
-    }
-
-
-    /**
      * Places a message onto a specified queue, in a given transaction.
      *
      * @param tx   The transaction for the operation.

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBinding.java Tue Jul  2 15:26:42 2013
@@ -20,38 +20,75 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuple;
 
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
 
 public class ConfiguredObjectBinding extends TupleBinding<ConfiguredObjectRecord>
 {
-    private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding();
+    private static final ConfiguredObjectBinding INSTANCE = new ConfiguredObjectBinding(null);
+    private final UUID _uuid;
+
 
     public static ConfiguredObjectBinding getInstance()
     {
         return INSTANCE;
     }
 
-    /** non-public constructor forces getInstance instead */
-    private ConfiguredObjectBinding()
+    public ConfiguredObjectBinding(UUID uuid)
     {
+        _uuid = uuid;
     }
 
     public ConfiguredObjectRecord entryToObject(TupleInput tupleInput)
     {
         String type = tupleInput.readString();
         String json = tupleInput.readString();
-        ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(null, type, json);
-        return configuredObject;
+        ObjectMapper mapper = new ObjectMapper();
+        try
+        {
+            Map<String,Object> value = mapper.readValue(json, Map.class);
+            ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(_uuid, type, value);
+            return configuredObject;
+        }
+        catch (IOException e)
+        {
+            //should never happen
+            throw new RuntimeException(e);
+        }
+
     }
 
     public void objectToEntry(ConfiguredObjectRecord object, TupleOutput tupleOutput)
     {
-        tupleOutput.writeString(object.getType());
-        tupleOutput.writeString(object.getAttributes());
+        try
+        {
+            StringWriter writer = new StringWriter();
+            new ObjectMapper().writeValue(writer, object.getAttributes());
+            tupleOutput.writeString(object.getType());
+            tupleOutput.writeString(writer.toString());
+        }
+        catch (JsonMappingException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (JsonGenerationException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
 }

Modified: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/tuple/ConfiguredObjectBindingTest.java Tue Jul  2 15:26:42 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.store.berkeleydb.tuple;
 
+import java.util.Collections;
+import java.util.Map;
 import junit.framework.TestCase;
 
 import org.apache.qpid.server.model.UUIDGenerator;
@@ -33,7 +35,9 @@ public class ConfiguredObjectBindingTest
 
     private ConfiguredObjectRecord _object;
 
-    private static final String DUMMY_ATTRIBUTES_STRING = "dummyAttributes";
+    private static final Map<String, Object> DUMMY_ATTRIBUTES_MAP =
+            Collections.singletonMap("dummy",(Object) "attributes");
+
     private static final String DUMMY_TYPE_STRING = "dummyType";
     private ConfiguredObjectBinding _configuredObjectBinding;
 
@@ -42,7 +46,8 @@ public class ConfiguredObjectBindingTest
     {
         super.setUp();
         _configuredObjectBinding = ConfiguredObjectBinding.getInstance();
-        _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING, DUMMY_ATTRIBUTES_STRING);
+        _object = new ConfiguredObjectRecord(UUIDGenerator.generateRandomUUID(), DUMMY_TYPE_STRING,
+                DUMMY_ATTRIBUTES_MAP);
     }
 
     public void testObjectToEntryAndEntryToObject()
@@ -55,7 +60,7 @@ public class ConfiguredObjectBindingTest
         TupleInput tupleInput = new TupleInput(entryAsBytes);
 
         ConfiguredObjectRecord storedObject = _configuredObjectBinding.entryToObject(tupleInput);
-        assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_STRING, storedObject.getAttributes());
+        assertEquals("Unexpected attributes", DUMMY_ATTRIBUTES_MAP, storedObject.getAttributes());
         assertEquals("Unexpected type", DUMMY_TYPE_STRING, storedObject.getType());
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Jul  2 15:26:42 2013
@@ -40,6 +40,7 @@ import org.apache.qpid.server.plugin.Exc
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -554,7 +555,7 @@ public abstract class AbstractExchange i
 
             if (b.isDurable())
             {
-                _virtualHost.getDurableConfigurationStore().unbindQueue(b);
+                DurableConfigurationStoreHelper.removeBinding(_virtualHost.getDurableConfigurationStore(), b);
             }
             b.logDestruction();
         }
@@ -626,7 +627,7 @@ public abstract class AbstractExchange i
 
             if (b.isDurable() && !restore)
             {
-                _virtualHost.getDurableConfigurationStore().bindQueue(b);
+                DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
             }
 
             queue.addQueueDeleteTask(b);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Tue Jul  2 15:26:42 2013
@@ -29,6 +29,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
@@ -127,7 +128,7 @@ public class DefaultExchangeRegistry imp
         {
             if (e.isDurable())
             {
-                getDurableConfigurationStore().removeExchange(e);
+                DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), e);
             }
             e.close();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java Tue Jul  2 15:26:42 2013
@@ -21,12 +21,10 @@
 package org.apache.qpid.server.exchange;
 
 
-import java.util.UUID;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 
 public class ExchangeInitialiser
@@ -49,7 +47,7 @@ public class ExchangeInitialiser
             r.registerExchange(exchange);
             if(exchange.isDurable())
             {
-                store.createExchange(exchange);
+                DurableConfigurationStoreHelper.createExchange(store, exchange);
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Jul  2 15:26:42 2013
@@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
@@ -106,7 +107,8 @@ public class ExchangeDeclareHandler impl
 
                         if (exchange.isDurable())
                         {
-                            virtualHost.getDurableConfigurationStore().createExchange(exchange);
+                            DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
+                                    exchange);
                         }
                     }
                     catch(AMQUnknownExchangeType e)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Jul  2 15:26:42 2013
@@ -30,7 +30,6 @@ import org.apache.qpid.framing.QueueDecl
 import org.apache.qpid.framing.QueueDeclareOkBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -40,10 +39,10 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 
@@ -114,7 +113,7 @@ public class QueueDeclareHandler impleme
                     queue.setAuthorizationHolder(protocolConnection);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
-                        store.createQueue(queue, body.getArguments());
+                        DurableConfigurationStoreHelper.createQueue(store, queue, body.getArguments());
                     }
                     if(body.getAutoDelete())
                     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Tue Jul  2 15:26:42 2013
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class QueueDeleteHandler implements StateAwareMethodListener<QueueDeleteBody>
@@ -115,7 +116,7 @@ public class QueueDeleteHandler implemen
 
                 if (queue.isDurable())
                 {
-                    store.removeQueue(queue);
+                    DurableConfigurationStoreHelper.removeQueue(store, queue);
                 }
 
                 MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Tue Jul  2 15:26:42 2013
@@ -46,6 +46,7 @@ import org.apache.qpid.server.model.Stat
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.*;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.util.MapValueConverter;
 
@@ -195,7 +196,7 @@ final class QueueAdapter extends Abstrac
                 _queue.delete();
                 if (_queue.isDurable())
                 {
-                    _queue.getVirtualHost().getDurableConfigurationStore().removeQueue(_queue);
+                    DurableConfigurationStoreHelper.removeQueue(_queue.getVirtualHost().getDurableConfigurationStore(), _queue);
                 }
             }
         }
@@ -365,7 +366,8 @@ final class QueueAdapter extends Abstrac
             {
                 try
                 {
-                    _queue.getVirtualHost().getDurableConfigurationStore().updateQueue(_queue);
+                    DurableConfigurationStoreHelper.updateQueue(_queue.getVirtualHost().getDurableConfigurationStore(),
+                            _queue);
                 }
                 catch (AMQStoreException e)
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Tue Jul  2 15:26:42 2013
@@ -75,6 +75,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -312,7 +313,8 @@ public final class VirtualHostAdapter ex
                 _virtualHost.getExchangeRegistry().registerExchange(exchange);
                 if(durable)
                 {
-                    _virtualHost.getDurableConfigurationStore().createExchange(exchange);
+                    DurableConfigurationStoreHelper.createExchange(_virtualHost.getDurableConfigurationStore(),
+                            exchange);
                 }
                 synchronized (_exchangeAdapters)
                 {
@@ -434,7 +436,9 @@ public final class VirtualHostAdapter ex
 
                 if(durable)
                 {
-                    _virtualHost.getDurableConfigurationStore().createQueue(queue, FieldTable.convertToFieldTable(attributes));
+                    DurableConfigurationStoreHelper.createQueue(_virtualHost.getDurableConfigurationStore(),
+                            queue,
+                            FieldTable.convertToFieldTable(attributes));
                 }
                 synchronized (_queueAdapters)
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Tue Jul  2 15:26:42 2013
@@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class AMQQueueFactory
@@ -315,7 +316,8 @@ public class AMQQueueFactory
                     exchangeRegistry.registerExchange(dlExchange);
 
                     //enter the dle in the persistent store
-                    virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+                    DurableConfigurationStoreHelper.createExchange(virtualHost.getDurableConfigurationStore(),
+                            dlExchange);
                 }
             }
 
@@ -335,7 +337,9 @@ public class AMQQueueFactory
                     dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
 
                     //enter the dlq in the persistent store
-                    virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+                    DurableConfigurationStoreHelper.createQueue(virtualHost.getDurableConfigurationStore(),
+                            dlQueue,
+                            FieldTable.convertToFieldTable(args));
                 }
             }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Tue Jul  2 15:26:42 2013
@@ -43,16 +43,13 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
 
 abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
 {
@@ -164,9 +161,6 @@ abstract public class AbstractJDBCMessag
         _stateManager = new StateManager(_eventManager);
     }
 
-    private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
-
-
     @Override
     public void configureConfigStore(String name,
                                      ConfigurationRecoveryHandler configRecoveryHandler,
@@ -462,18 +456,10 @@ abstract public class AbstractJDBCMessag
     {
         try
         {
-            List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-
-            ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
-            _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
+            recoveryHandler.beginConfigurationRecovery(this);
+            loadConfiguredObjects(recoveryHandler);
 
-            QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
-            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
-
-            BindingRecoveryHandler brh = qrh.completeQueueRecovery();
-            _configuredObjectHelper.recoverBindings(brh, configuredObjects);
-
-            brh.completeBindingRecovery();
+            recoveryHandler.completeConfigurationRecovery();
         }
         catch (SQLException e)
         {
@@ -573,92 +559,42 @@ abstract public class AbstractJDBCMessag
 
     }
 
-    @Override
-    public void createExchange(Exchange exchange) throws AMQStoreException
-    {
-        if (_stateManager.isInState(State.ACTIVE))
-        {
-            ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
-            insertConfiguredObject(configuredObject);
-        }
-
-    }
-
-    @Override
-    public void removeExchange(Exchange exchange) throws AMQStoreException
-    {
-        int results = removeConfiguredObject(exchange.getId());
-        if (results == 0)
-        {
-            throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found");
-        }
-    }
 
     @Override
-    public void bindQueue(Binding binding)
-            throws AMQStoreException
+    public void create(UUID id, String type, Map<String,Object> attributes) throws AMQStoreException
     {
         if (_stateManager.isInState(State.ACTIVE))
         {
-            ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
-            insertConfiguredObject(configuredObject);
+            insertConfiguredObject(new ConfiguredObjectRecord(id, type, attributes));
         }
-    }
 
-    @Override
-    public void unbindQueue(Binding binding)
-            throws AMQStoreException
-    {
-        int results = removeConfiguredObject(binding.getId());
-        if (results == 0)
-        {
-            throw new AMQStoreException("Binding " + binding + " not found");
-        }
-    }
-
-    @Override
-    public void createQueue(AMQQueue queue) throws AMQStoreException
-    {
-        createQueue(queue, null);
     }
 
     @Override
-    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
+    public void remove(UUID id, String type) throws AMQStoreException
     {
-        getLogger().debug("public void createQueue(AMQQueue queue = " + queue + "): called");
-
-        if (_stateManager.isInState(State.ACTIVE))
+        int results = removeConfiguredObject(id);
+        if (results == 0)
         {
-            ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
-            insertConfiguredObject(queueConfiguredObject);
+            throw new AMQStoreException(type + " with id " + id + " not found");
         }
     }
 
-    /**
-     * Updates the specified queue in the persistent store, IF it is already present. If the queue
-     * is not present in the store, it will not be added.
-     *
-     * NOTE: Currently only updates the exclusivity.
-     *
-     * @param queue The queue to update the entry for.
-     * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
-     */
     @Override
-    public void updateQueue(final AMQQueue queue) throws AMQStoreException
+    public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
     {
         if (_stateManager.isInState(State.ACTIVE))
         {
-            ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId());
+            ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(id);
             if (queueConfiguredObject != null)
             {
-                ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject);
+                ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
                 updateConfiguredObject(newQueueRecord);
             }
         }
 
     }
 
-
     /**
      * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
      * isolation and with auto-commit transactions enabled.
@@ -714,18 +650,6 @@ abstract public class AbstractJDBCMessag
 
     protected abstract Connection getConnection() throws SQLException;
 
-    @Override
-    public void removeQueue(final AMQQueue queue) throws AMQStoreException
-    {
-        AMQShortString name = queue.getNameShortString();
-        getLogger().debug("public void removeQueue(AMQShortString name = " + name + "): called");
-        int results = removeConfiguredObject(queue.getId());
-        if (results == 0)
-        {
-            throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found");
-        }
-    }
-
     private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
     {
         byte[] argumentBytes;
@@ -1825,7 +1749,8 @@ abstract public class AbstractJDBCMessag
                                     }
                                     else
                                     {
-                                        byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+                                        final Map<String, Object> attributes = configuredObject.getAttributes();
+                                        byte[] attributesAsBytes = new ObjectMapper().writeValueAsBytes(attributes);
                                         ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
                                         insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
                                     }
@@ -1852,6 +1777,18 @@ abstract public class AbstractJDBCMessag
                     conn.close();
                 }
             }
+            catch (JsonMappingException e)
+            {
+                throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+            }
+            catch (JsonGenerationException e)
+            {
+                throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+            }
+            catch (IOException e)
+            {
+                throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+            }
             catch (SQLException e)
             {
                 throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
@@ -1914,7 +1851,8 @@ abstract public class AbstractJDBCMessag
                                     stmt2.setString(1, configuredObject.getType());
                                     if (configuredObject.getAttributes() != null)
                                     {
-                                        byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+                                        byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+                                                configuredObject.getAttributes());
                                         ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
                                         stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
                                     }
@@ -1946,6 +1884,18 @@ abstract public class AbstractJDBCMessag
                     conn.close();
                 }
             }
+            catch (JsonMappingException e)
+            {
+                throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+            }
+            catch (JsonGenerationException e)
+            {
+                throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+            }
+            catch (IOException e)
+            {
+                throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+            }
             catch (SQLException e)
             {
                 throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
@@ -1972,7 +1922,8 @@ abstract public class AbstractJDBCMessag
                         {
                             String type = rs.getString(1);
                             String attributes = getBlobAsString(rs, 2);
-                            result = new ConfiguredObjectRecord(id, type, attributes);
+                            result = new ConfiguredObjectRecord(id, type,
+                                    (new ObjectMapper()).readValue(attributes,Map.class));
                         }
                     }
                     finally
@@ -1990,6 +1941,21 @@ abstract public class AbstractJDBCMessag
                 conn.close();
             }
         }
+        catch (JsonMappingException e)
+        {
+            throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+                            + e.getMessage(), e);
+        }
+        catch (JsonParseException e)
+        {
+            throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+                                + e.getMessage(), e);
+        }
+        catch (IOException e)
+        {
+            throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+                            + e.getMessage(), e);
+        }
         catch (SQLException e)
         {
             throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
@@ -1998,10 +1964,11 @@ abstract public class AbstractJDBCMessag
         return result;
     }
 
-    private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
+    private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException, AMQStoreException
     {
-        ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
         Connection conn = newAutoCommitConnection();
+
+        final ObjectMapper objectMapper = new ObjectMapper();
         try
         {
             PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
@@ -2015,9 +1982,22 @@ abstract public class AbstractJDBCMessag
                         String id = rs.getString(1);
                         String objectType = rs.getString(2);
                         String attributes = getBlobAsString(rs, 3);
-                        results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
+                        recoveryHandler.configuredObject(UUID.fromString(id), objectType,
+                                objectMapper.readValue(attributes,Map.class));
                     }
                 }
+                catch (JsonMappingException e)
+                {
+                    throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                catch (JsonParseException e)
+                {
+                    throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
+                catch (IOException e)
+                {
+                    throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
+                }
                 finally
                 {
                     rs.close();
@@ -2032,7 +2012,6 @@ abstract public class AbstractJDBCMessag
         {
             conn.close();
         }
-        return results;
     }
 
     protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Tue Jul  2 15:26:42 2013
@@ -28,25 +28,10 @@ import java.util.UUID;
 
 public interface ConfigurationRecoveryHandler
 {
-    ExchangeRecoveryHandler begin(MessageStore store);
+    void beginConfigurationRecovery(DurableConfigurationStore store);
 
-    public static interface ExchangeRecoveryHandler
-    {
-        void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
-        QueueRecoveryHandler completeExchangeRecovery();
-    }
+    void configuredObject(UUID id, String type, Map<String, Object> attributes);
 
-    public static interface QueueRecoveryHandler
-    {
-        void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
-        BindingRecoveryHandler completeQueueRecovery();
-    }
-
-
-    public static interface BindingRecoveryHandler
-    {
-        void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
-        void completeBindingRecovery();
-    }
+    void completeConfigurationRecovery();
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java Tue Jul  2 15:26:42 2013
@@ -20,20 +20,23 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.UUID;
 
 public class ConfiguredObjectRecord
 {
     private UUID _id;
     private String _type;
-    private String _attributes;
+    private Map<String,Object> _attributes;
 
-    public ConfiguredObjectRecord(UUID id, String type, String attributes)
+    public ConfiguredObjectRecord(UUID id, String type, Map<String,Object> attributes)
     {
         super();
         _id = id;
         _type = type;
-        _attributes = attributes;
+        _attributes = Collections.unmodifiableMap(new LinkedHashMap<String,Object>(attributes));
     }
 
     public UUID getId()
@@ -41,17 +44,12 @@ public class ConfiguredObjectRecord
         return _id;
     }
 
-    public void setId(UUID id)
-    {
-        _id = id;
-    }
-
     public String getType()
-    {
+   {
         return _type;
     }
 
-    public String getAttributes()
+    public Map<String,Object> getAttributes()
     {
         return _attributes;
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Tue Jul  2 15:26:42 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.Map;
+import java.util.UUID;
 import org.apache.commons.configuration.Configuration;
 
 import org.apache.qpid.AMQStoreException;
@@ -51,77 +53,42 @@ public interface DurableConfigurationSto
     void configureConfigStore(String name,
                               ConfigurationRecoveryHandler recoveryHandler,
                               VirtualHost virtualHost) throws Exception;
-    /**
-     * Makes the specified exchange persistent.
-     *
-     * @param exchange The exchange to persist.
-     *
-     * @throws AMQStoreException If the operation fails for any reason.
-     */
-    void createExchange(Exchange exchange) throws AMQStoreException;
 
-    /**
-     * Removes the specified persistent exchange.
-     *
-     * @param exchange The exchange to remove.
-     *
-     * @throws AMQStoreException If the operation fails for any reason.
-     */
-    void removeExchange(Exchange exchange) throws AMQStoreException;
 
-    /**
-     * Store the queue binding.
-     *
-     * @param binding queue binding
-     *
-     * @throws AMQStoreException if the operation fails for any reason.
-     */
-    void bindQueue(Binding binding) throws AMQStoreException;
 
     /**
-     * Removes queue binding
+     * Makes the specified object persistent.
      *
-     * @param binding queue binding to remove
+     * @param id The id of the object to persist.
+     * @param type The type of the object to persist
+     * @param attributes the attributes of the object to persist
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    void unbindQueue(Binding binding) throws AMQStoreException;
+    void create(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
 
     /**
-     * Makes the specified queue persistent.
+     * Removes the specified persistent configured object.
      *
-     * @param queue The queue to store.
+     * @param id The id of the object to remove.
+     * @param type The type of the object to remove
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    void createQueue(AMQQueue queue) throws AMQStoreException;
+    void remove(UUID id, String type) throws AMQStoreException;
 
-    /**
-     * Makes the specified queue persistent.
-     *
-     * @param queue The queue to store.
-     * @param arguments The additional arguments to the binding
-     *
-     * @throws AMQStoreException If the operation fails for any reason.
-     */
-    void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException;
 
     /**
-     * Removes the specified queue from the persistent store.
+     * Updates the specified object in the persistent store, IF it is already present. If the object
+     * is not present in the store, it will not be added.
      *
-     * @param queue The queue to remove.
+     * @param id The id of the object to update.
+     * @param type The type of the object to update
+     * @param attributes the updated attributes
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    void removeQueue(AMQQueue queue) throws AMQStoreException;
+    void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
 
-    /**
-     * Updates the specified queue in the persistent store, IF it is already present. If the queue
-     * is not present in the store, it will not be added.
-     *
-     * @param queue The queue to update the entry for.
-     * @throws AMQStoreException If the operation fails for any reason.
-     */
-    void updateQueue(AMQQueue queue) throws AMQStoreException;
 
 }

Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (from r1498342, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java&r1=1498342&r2=1498976&rev=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Tue Jul  2 15:26:42 2013
@@ -20,12 +20,11 @@
  */
 package org.apache.qpid.server.store;
 
-import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.List;
+import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.UUID;
 
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.model.Binding;
@@ -33,41 +32,15 @@ import org.apache.qpid.server.model.Exch
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
-import org.apache.qpid.server.util.MapJsonSerializer;
 
-public class ConfiguredObjectHelper
+public class DurableConfigurationStoreHelper
 {
 
-
-    private MapJsonSerializer _serializer = new MapJsonSerializer();
-
-    public void loadQueue(ConfiguredObjectRecord configuredObject, QueueRecoveryHandler qrh)
-    {
-        if (Queue.class.getName().equals(configuredObject.getType()))
-        {
-            Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
-            String queueName = (String) attributeMap.get(Queue.NAME);
-            String owner = (String) attributeMap.get(Queue.OWNER);
-            boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
-            UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
-            @SuppressWarnings("unchecked")
-            Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
-            FieldTable arguments = null;
-            if (queueArgumentsMap != null)
-            {
-                arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
-            }
-            qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments, alternateExchangeId);
-        }
-    }
-
-    public ConfiguredObjectRecord updateQueueConfiguredObject(final AMQQueue queue, ConfiguredObjectRecord queueRecord)
+    public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
     {
-        Map<String, Object> attributesMap = _serializer.deserialize(queueRecord.getAttributes());
+        Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
         attributesMap.put(Queue.NAME, queue.getName());
+        attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
         attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
         if (queue.getAlternateExchange() != null)
         {
@@ -87,12 +60,11 @@ public class ConfiguredObjectHelper
         {
             attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
         }
-        String newJson = _serializer.serialize(attributesMap);
-        ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson);
-        return newQueueRecord;
+        store.update(queue.getId(), Queue.class.getName(), attributesMap);
     }
 
-    public ConfiguredObjectRecord createQueueConfiguredObject(AMQQueue queue, FieldTable arguments)
+    public static void createQueue(DurableConfigurationStore store, AMQQueue queue, FieldTable arguments)
+            throws AMQStoreException
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Queue.NAME, queue.getName());
@@ -108,60 +80,35 @@ public class ConfiguredObjectHelper
         {
             attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
         }
-        String json = _serializer.serialize(attributesMap);
-        ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json);
-        return configuredObject;
+        store.create(queue.getId(),Queue.class.getName(),attributesMap);
     }
 
-    public void loadExchange(ConfiguredObjectRecord configuredObject, ExchangeRecoveryHandler erh)
+    public static void removeQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
     {
-        if (Exchange.class.getName().equals(configuredObject.getType()))
-        {
-            Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
-            String exchangeName = (String) attributeMap.get(Exchange.NAME);
-            String exchangeType = (String) attributeMap.get(Exchange.TYPE);
-            String lifeTimePolicy = (String) attributeMap.get(Exchange.LIFETIME_POLICY);
-            boolean autoDelete = lifeTimePolicy == null
-                    || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
-            erh.exchange(configuredObject.getId(), exchangeName, exchangeType, autoDelete);
-        }
+        store.remove(queue.getId(), Queue.class.getName());
     }
 
-    public ConfiguredObjectRecord createExchangeConfiguredObject(org.apache.qpid.server.exchange.Exchange exchange)
+    public static void createExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
+            throws AMQStoreException
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Exchange.NAME, exchange.getName());
         attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString()));
         attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
                 : LifetimePolicy.PERMANENT.name());
-        String json = _serializer.serialize(attributesMap);
-        ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(exchange.getId(), Exchange.class.getName(), json);
-        return configuredObject;
-    }
+        store.create(exchange.getId(), Exchange.class.getName(), attributesMap);
 
-    public void loadQueueBinding(ConfiguredObjectRecord configuredObject, BindingRecoveryHandler brh)
-    {
-        if (Binding.class.getName().equals(configuredObject.getType()))
-        {
-            Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
-            UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE));
-            UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE));
-            String bindingName = (String) attributeMap.get(Binding.NAME);
+    }
 
-            @SuppressWarnings("unchecked")
-            Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS);
-            FieldTable arguments = null;
-            if (bindingArgumentsMap != null)
-            {
-                arguments = FieldTable.convertToFieldTable(bindingArgumentsMap);
-            }
-            ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes()));
 
-            brh.binding(configuredObject.getId(), exchangeId, queueId, bindingName, argumentsBB);
-        }
+    public static void removeExchange(DurableConfigurationStore store, org.apache.qpid.server.exchange.Exchange exchange)
+            throws AMQStoreException
+    {
+        store.remove(exchange.getId(),Exchange.class.getName());
     }
 
-    public ConfiguredObjectRecord createBindingConfiguredObject(org.apache.qpid.server.binding.Binding binding)
+    public static void createBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
+                throws AMQStoreException
     {
         Map<String, Object> attributesMap = new HashMap<String, Object>();
         attributesMap.put(Binding.NAME, binding.getBindingKey());
@@ -172,32 +119,14 @@ public class ConfiguredObjectHelper
         {
             attributesMap.put(Binding.ARGUMENTS, arguments);
         }
-        String json = _serializer.serialize(attributesMap);
-        ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(binding.getId(), Binding.class.getName(), json);
-        return configuredObject;
+        store.create(binding.getId(), Binding.class.getName(), attributesMap);
     }
 
-    public void recoverQueues(QueueRecoveryHandler qrh, List<ConfiguredObjectRecord> configuredObjects)
-    {
-        for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
-        {
-            loadQueue(configuredObjectRecord, qrh);
-        }
-    }
 
-    public void recoverExchanges(ExchangeRecoveryHandler erh, List<ConfiguredObjectRecord> configuredObjects)
+    public static void removeBinding(DurableConfigurationStore store, org.apache.qpid.server.binding.Binding binding)
+                throws AMQStoreException
     {
-        for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
-        {
-            loadExchange(configuredObjectRecord, erh);
-        }
+        store.remove(binding.getId(), Binding.class.getName());
     }
 
-    public void recoverBindings(BindingRecoveryHandler brh, List<ConfiguredObjectRecord> configuredObjects)
-    {
-        for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
-        {
-            loadQueueBinding(configuredObjectRecord, brh);
-        }
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Tue Jul  2 15:26:42 2013
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.store;
 
+import java.util.Map;
+import java.util.UUID;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.message.EnqueableMessage;
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Tue Jul  2 15:26:42 2013
@@ -19,12 +19,9 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.Exchange;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.queue.AMQQueue;
 
 public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
 {
@@ -36,42 +33,17 @@ public abstract class NullMessageStore i
     }
 
     @Override
-    public void createExchange(Exchange exchange) throws AMQStoreException
+    public void update(UUID id, String type, Map<String, Object> attributes)
     {
     }
 
     @Override
-    public void removeExchange(Exchange exchange) throws AMQStoreException
+    public void remove(UUID id, String type)
     {
     }
 
     @Override
-    public void bindQueue(Binding binding) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void unbindQueue(Binding binding) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void createQueue(AMQQueue queue) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void removeQueue(AMQQueue queue) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void updateQueue(AMQQueue queue) throws AMQStoreException
+    public void create(UUID id, String type, Map<String, Object> attributes)
     {
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Tue Jul  2 15:26:42 2013
@@ -46,6 +46,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
@@ -763,7 +764,7 @@ public class ServerSessionDelegate exten
                                 if (exchange.isDurable())
                                 {
                                     DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
-                                    store.createExchange(exchange);
+                                    DurableConfigurationStoreHelper.createExchange(store, exchange);
                                 }
                                 exchangeRegistry.registerExchange(exchange);
                             }
@@ -918,7 +919,7 @@ public class ServerSessionDelegate exten
                 if (exchange.isDurable() && !exchange.isAutoDelete())
                 {
                     DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
-                    store.removeExchange(exchange);
+                    DurableConfigurationStoreHelper.removeExchange(store, exchange);
                 }
             }
         }
@@ -1300,11 +1301,11 @@ public class ServerSessionDelegate exten
                                 {
                                     ftArgs.put(new AMQShortString(entry.getKey()), entry.getValue());
                                 }
-                                store.createQueue(queue, ftArgs);
+                                DurableConfigurationStoreHelper.createQueue(store, queue, ftArgs);
                             }
                             else
                             {
-                                store.createQueue(queue);
+                                DurableConfigurationStoreHelper.createQueue(store, queue, null);
                             }
                         }
                         queueRegistry.registerQueue(queue);
@@ -1469,7 +1470,7 @@ public class ServerSessionDelegate exten
                         if (queue.isDurable() && !queue.isAutoDelete())
                         {
                             DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
-                            store.removeQueue(queue);
+                            DurableConfigurationStoreHelper.removeQueue(store,queue);
                         }
                     }
                     catch (AMQException e)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Jul  2 15:26:42 2013
@@ -56,6 +56,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.txn.DtxRegistry;
@@ -294,7 +295,7 @@ public abstract class AbstractVirtualHos
 
             if (newExchange.isDurable())
             {
-                getDurableConfigurationStore().createExchange(newExchange);
+                DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), newExchange);
             }
         }
     }
@@ -306,7 +307,7 @@ public abstract class AbstractVirtualHos
 
         if (queue.isDurable())
         {
-            getDurableConfigurationStore().createQueue(queue);
+            DurableConfigurationStoreHelper.createQueue(getDurableConfigurationStore(), queue, null);
         }
 
         //get the exchange name (returns default exchange name if none was specified)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1498976&r1=1498975&r2=1498976&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Tue Jul  2 15:26:42 2013
@@ -35,6 +35,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.message.AMQMessage;
@@ -42,11 +43,15 @@ import org.apache.qpid.server.message.En
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.Message_1_0;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
 import org.apache.qpid.server.store.StoredMessage;
@@ -61,9 +66,6 @@ import org.apache.qpid.transport.util.Fu
 import org.apache.qpid.util.ByteBufferInputStream;
 
 public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
-                                                        ConfigurationRecoveryHandler.QueueRecoveryHandler,
-                                                        ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
-                                                        ConfigurationRecoveryHandler.BindingRecoveryHandler,
                                                         MessageStoreRecoveryHandler,
                                                         MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
                                                         TransactionLogRecoveryHandler,
@@ -78,6 +80,8 @@ public class VirtualHostConfigRecoveryHa
     private final Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
     private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
 
+    private final Map<String, Map<UUID, Map<String, Object>>> _configuredObjects = new HashMap<String, Map<UUID, Map<String, Object>>>();
+
     private MessageStoreLogSubject _logSubject;
     private MessageStore _store;
 
@@ -86,12 +90,19 @@ public class VirtualHostConfigRecoveryHa
         _virtualHost = virtualHost;
     }
 
+    @Override
+    public void beginConfigurationRecovery(DurableConfigurationStore store)
+    {
+        _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
+
+        CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START());
+    }
+
     public VirtualHostConfigRecoveryHandler begin(MessageStore store)
     {
         _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
         _store = store;
         CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
-
         return this;
     }
 
@@ -100,7 +111,7 @@ public class VirtualHostConfigRecoveryHa
         try
         {
             AMQQueue q = _virtualHost.getQueueRegistry().getQueue(queueName);
-    
+
             if (q == null)
             {
                 q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
@@ -118,9 +129,9 @@ public class VirtualHostConfigRecoveryHa
                     q.setAlternateExchange(altExchange);
                 }
             }
-    
+
             CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
-    
+
             //Record that we have a queue for recovery
             _queueRecoveries.put(queueName, 0);
         }
@@ -130,12 +141,6 @@ public class VirtualHostConfigRecoveryHa
         }
     }
 
-    @Override
-    public BindingRecoveryHandler completeQueueRecovery()
-    {
-        return this;
-    }
-
     public void exchange(UUID id, String exchangeName, String type, boolean autoDelete)
     {
         try
@@ -155,11 +160,6 @@ public class VirtualHostConfigRecoveryHa
         }
     }
 
-    public QueueRecoveryHandler completeExchangeRecovery()
-    {
-        return this;
-    }
-
     public StoredMessageRecoveryHandler begin()
     {
         return this;
@@ -250,7 +250,7 @@ public class VirtualHostConfigRecoveryHa
                     CurrentActor.get().message(_logSubject,
                                                TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
                                                                                             Long.toString(messageId)));
-                    
+
                 }
 
             }
@@ -275,9 +275,9 @@ public class VirtualHostConfigRecoveryHa
                 if(message != null)
                 {
                     final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
-                    
+
                     entry.acquire();
-                    
+
                     branch.dequeue(queue, message);
 
                     branch.addPostTransactionAcion(new ServerTransaction.Action()
@@ -348,8 +348,7 @@ public class VirtualHostConfigRecoveryHa
         CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
     }
 
-    @Override
-    public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
+    private void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
     {
         try
         {
@@ -399,14 +398,8 @@ public class VirtualHostConfigRecoveryHa
 
     }
 
-    public void completeBindingRecovery()
-    {
-    }
-
     public void complete()
     {
-
-
     }
 
     public void queueEntry(final UUID queueId, long messageId)
@@ -486,6 +479,107 @@ public class VirtualHostConfigRecoveryHa
         return this;
     }
 
+    @Override
+    public void configuredObject(UUID id, String type, Map<String, Object> attributes)
+    {
+        Map<UUID, Map<String, Object>> typeMap = _configuredObjects.get(type);
+        if(typeMap == null)
+        {
+            typeMap = new HashMap<UUID, Map<String, Object>>();
+            _configuredObjects.put(type,typeMap);
+        }
+        typeMap.put(id, attributes);
+    }
+
+    @Override
+    public void completeConfigurationRecovery()
+    {
+        Map<UUID, Map<String, Object>> exchangeObjects =
+                _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName());
+
+        if(exchangeObjects != null)
+        {
+            recoverExchanges(exchangeObjects);
+        }
+
+        Map<UUID, Map<String, Object>> queueObjects =
+                _configuredObjects.remove(org.apache.qpid.server.model.Queue.class.getName());
+
+        if(queueObjects != null)
+        {
+            recoverQueues(queueObjects);
+        }
+
+
+        Map<UUID, Map<String, Object>> bindingObjects =
+                    _configuredObjects.remove(Binding.class.getName());
+
+        if(bindingObjects != null)
+        {
+            recoverBindings(bindingObjects);
+        }
+
+
+        CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+    }
+
+    private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects)
+    {
+        for(Map.Entry<UUID, Map<String,Object>> entry : exchangeObjects.entrySet())
+        {
+            Map<String,Object> attributeMap = entry.getValue();
+            String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME);
+            String exchangeType = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.TYPE);
+            String lifeTimePolicy = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY);
+            boolean autoDelete = lifeTimePolicy == null
+                    || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+            exchange(entry.getKey(), exchangeName, exchangeType, autoDelete);
+        }
+    }
+
+    private void recoverQueues(Map<UUID, Map<String, Object>> queueObjects)
+    {
+        for(Map.Entry<UUID, Map<String,Object>> entry : queueObjects.entrySet())
+        {
+            Map<String,Object> attributeMap = entry.getValue();
+
+            String queueName = (String) attributeMap.get(Queue.NAME);
+            String owner = (String) attributeMap.get(Queue.OWNER);
+            boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
+            UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
+            @SuppressWarnings("unchecked")
+            Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
+            FieldTable arguments = null;
+            if (queueArgumentsMap != null)
+            {
+                arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
+            }
+            queue(entry.getKey(), queueName, owner, exclusive, arguments, alternateExchangeId);
+        }
+    }
+
+    private void recoverBindings(Map<UUID, Map<String, Object>> bindingObjects)
+    {
+        for(Map.Entry<UUID, Map<String,Object>> entry : bindingObjects.entrySet())
+        {
+            Map<String,Object> attributeMap = entry.getValue();
+            UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE));
+            UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE));
+            String bindingName = (String) attributeMap.get(Binding.NAME);
+
+            @SuppressWarnings("unchecked")
+            Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS);
+            FieldTable arguments = null;
+            if (bindingArgumentsMap != null)
+            {
+                arguments = FieldTable.convertToFieldTable(bindingArgumentsMap);
+            }
+            ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes()));
+
+            binding(entry.getKey(), exchangeId, queueId, bindingName, argumentsBB);
+        }
+    }
+
     private static class DummyMessage implements EnqueableMessage
     {
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message