qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1603626 [1/3] - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/store/ broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/ broker-plugins/derby-store/src/main/java/org/apache/qpid/server/...
Date Wed, 18 Jun 2014 20:51:44 GMT
Author: kwall
Date: Wed Jun 18 20:51:43 2014
New Revision: 1603626

URL: http://svn.apache.org/r1603626
Log:
QPID-5800: [Java Broker] Refactor Derby/JDBC message store implementations to separate message
and config store implementations.

* Message store implementations can now be used in isolation, which is useful when the user
is using a JSON VirtualHostNode with
  another persistent store implementation.

Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JdbcUtils.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/AbstractDerbyMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyConfigurationStore.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyUtils.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java
Removed:
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhost/derby/DerbyVirtualHost.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/virtualhostnode/derby/DerbyVirtualHostNode.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreConfigurationTest.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java
    qpid/trunk/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhost/jdbc/JDBCVirtualHost.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/virtualhostnode/jdbc/JDBCVirtualHostNodeImpl.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java

Added: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java?rev=1603626&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
(added)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCConfigurationStore.java
Wed Jun 18 20:51:43 2014
@@ -0,0 +1,1013 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonProcessingException;
+import org.codehaus.jackson.Version;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.Module;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializerProvider;
+import org.codehaus.jackson.map.module.SimpleModule;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
+public abstract class AbstractJDBCConfigurationStore implements MessageStoreProvider, DurableConfigurationStore
+{
+    private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
+
+    private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+    private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY";
+
+    private static final int DEFAULT_CONFIG_VERSION = 0;
+
+    public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME,
CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME));
+
+    private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+    private static final String DROP_CONFIG_VERSION_TABLE = "DROP TABLE "+ CONFIGURATION_VERSION_TABLE_NAME;
+
+    private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+            + " ( id, object_type, attributes) VALUES (?,?,?)";
+    private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+            + " set object_type =?, attributes = ? where id = ?";
+    private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+            + " where id = ?";
+    private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes
FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+            + " where id = ?";
+    private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type,
attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+
+    private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO "
+ CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+                                                                          + " ( child_id,
parent_type, parent_id) VALUES (?,?,?)";
+
+    private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM "
+ CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+                                                                          + " where child_id
= ?";
+    private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id,
parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME;
+
+    private static final Module _module;
+    static
+    {
+        SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
+
+        final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
+        {
+            @Override
+            public void serialize(final ConfiguredObject value,
+                                  final JsonGenerator jgen,
+                                  final SerializerProvider provider)
+                    throws IOException, JsonProcessingException
+            {
+                jgen.writeString(value.getId().toString());
+            }
+        };
+        module.addSerializer(ConfiguredObject.class, serializer);
+
+        _module = module;
+    }
+
+    @Override
+    public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
+    {
+        checkConfigurationStoreOpen();
+
+        try
+        {
+            handler.begin();
+            doVisitAllConfiguredObjectRecords(handler);
+            handler.end();
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Cannot visit configured object records", e);
+        }
+    }
+
+    private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID,
ConfiguredObjectRecordImpl>();
+        final ObjectMapper objectMapper = new ObjectMapper();
+        try
+        {
+            PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    while (rs.next())
+                    {
+                        String id = rs.getString(1);
+                        String objectType = rs.getString(2);
+                        String attributes = getBlobAsString(rs, 3);
+                        final ConfiguredObjectRecordImpl configuredObjectRecord =
+                                new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+                                                               objectMapper.readValue(attributes,
Map.class));
+                        configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+                    }
+                }
+                catch (JsonMappingException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+                }
+                catch (JsonParseException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+                }
+                catch (IOException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+            stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    while (rs.next())
+                    {
+                        UUID childId = UUID.fromString(rs.getString(1));
+                        String parentType = rs.getString(2);
+                        UUID parentId = UUID.fromString(rs.getString(3));
+
+                        ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+                        ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+                        if(child != null && parent != null)
+                        {
+                            child.addParent(parentType, parent);
+                        }
+                    }
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        finally
+        {
+            conn.close();
+        }
+
+        for(ConfiguredObjectRecord record : configuredObjects.values())
+        {
+            boolean shouldContinue = handler.handle(record);
+            if (!shouldContinue)
+            {
+                break;
+            }
+        }
+    }
+
+    protected abstract void checkConfigurationStoreOpen();
+
+    protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws StoreException
+    {
+        Connection connection = null;
+        try
+        {
+            connection = newConnection();
+
+            boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
+            if(tableExists)
+            {
+                int configVersion = getConfigVersion(connection);
+                if (getLogger().isDebugEnabled())
+                {
+                    getLogger().debug("Upgrader read existing config version " + configVersion);
+                }
+
+                switch(configVersion)
+                {
+
+                    case 7:
+                        upgradeFromV7(parent);
+                        break;
+                    default:
+                        throw new UnsupportedOperationException("Cannot upgrade from configuration
version : "
+                                                                + configVersion);
+                }
+            }
+        }
+        catch (SQLException se)
+        {
+            throw new StoreException("Failed to upgrade database", se);
+        }
+        finally
+        {
+            JdbcUtils.closeConnection(connection, getLogger());
+        }
+
+    }
+
+    private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
+    {
+        @SuppressWarnings("serial")
+        Map<String, String> defaultExchanges = new HashMap<String, String>()
+        {{
+            put("amq.direct", "direct");
+            put("amq.topic", "topic");
+            put("amq.fanout", "fanout");
+            put("amq.match", "headers");
+        }};
+
+        Connection connection = newConnection();
+        try
+        {
+            String virtualHostName = parent.getName();
+            UUID virtualHostId = UUIDGenerator.generateVhostUUID(virtualHostName);
+
+            String stringifiedConfigVersion = "0." + DEFAULT_CONFIG_VERSION;
+
+            boolean tableExists = tableExists(CONFIGURATION_VERSION_TABLE_NAME, connection);
+            if(tableExists)
+            {
+                int configVersion = getConfigVersion(connection);
+                if (getLogger().isDebugEnabled())
+                {
+                    getLogger().debug("Upgrader read existing config version " + configVersion);
+                }
+
+                stringifiedConfigVersion = "0." + configVersion;
+            }
+
+            Map<String, Object> virtualHostAttributes = new HashMap<String, Object>();
+            virtualHostAttributes.put("modelVersion", stringifiedConfigVersion);
+            virtualHostAttributes.put("name", virtualHostName);
+
+            ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(virtualHostId,
"VirtualHost", virtualHostAttributes);
+            insertConfiguredObject(virtualHostRecord, connection);
+
+            if (getLogger().isDebugEnabled())
+            {
+                getLogger().debug("Upgrader created VirtualHost configuration entry with
config version " + stringifiedConfigVersion);
+            }
+
+            Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID,
Map<String, Object>>();
+            List<UUID> others = new ArrayList<UUID>();
+            final ObjectMapper objectMapper = new ObjectMapper();
+            objectMapper.registerModule(_module);
+
+            PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+            try
+            {
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    while (rs.next())
+                    {
+                        UUID id = UUID.fromString(rs.getString(1));
+                        String objectType = rs.getString(2);
+                        if ("VirtualHost".equals(objectType))
+                        {
+                            continue;
+                        }
+                        Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs,
3),Map.class);
+
+                        if(objectType.endsWith("Binding"))
+                        {
+                            bindingsToUpdate.put(id,attributes);
+                        }
+                        else
+                        {
+                            if (objectType.equals("Exchange"))
+                            {
+                                defaultExchanges.remove((String)attributes.get("name"));
+                            }
+                            others.add(id);
+                        }
+                    }
+                }
+                catch (JsonMappingException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+                }
+                catch (JsonParseException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+                }
+                catch (IOException e)
+                {
+                    throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+                }
+                finally
+                {
+                    rs.close();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+            stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
+            try
+            {
+                for (UUID id : others)
+                {
+                    stmt.setString(1, id.toString());
+                    stmt.setString(2, "VirtualHost");
+                    stmt.setString(3, virtualHostId.toString());
+                    stmt.execute();
+                }
+                for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+                {
+                    stmt.setString(1, bindingEntry.getKey().toString());
+                    stmt.setString(2,"Queue");
+                    stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
+                    stmt.execute();
+
+                    stmt.setString(1, bindingEntry.getKey().toString());
+                    stmt.setString(2,"Exchange");
+                    stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
+                    stmt.execute();
+                }
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+            for (Map.Entry<String, String> defaultExchangeEntry : defaultExchanges.entrySet())
+            {
+                UUID id = UUIDGenerator.generateExchangeUUID(defaultExchangeEntry.getKey(),
virtualHostName);
+                Map<String, Object> exchangeAttributes = new HashMap<String, Object>();
+                exchangeAttributes.put("name", defaultExchangeEntry.getKey());
+                exchangeAttributes.put("type", defaultExchangeEntry.getValue());
+                exchangeAttributes.put("lifetimePolicy", "PERMANENT");
+                Map<String, ConfiguredObjectRecord> parents = Collections.singletonMap("VirtualHost",
virtualHostRecord);
+                ConfiguredObjectRecord exchangeRecord = new org.apache.qpid.server.store.ConfiguredObjectRecordImpl(id,
"Exchange", exchangeAttributes, parents);
+                insertConfiguredObject(exchangeRecord, connection);
+            }
+
+            stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+            try
+            {
+                for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
+                {
+                    stmt.setString(1, "Binding");
+                    byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
+
+                    ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+                    stmt.setBinaryStream(2, bis, attributesAsBytes.length);
+                    stmt.setString(3, bindingEntry.getKey().toString());
+                    stmt.execute();
+                }
+            }
+            catch (JsonMappingException e)
+            {
+                throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+            }
+            catch (JsonGenerationException e)
+            {
+                throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+            }
+            catch (IOException e)
+            {
+                throw new StoreException("Error recovering persistent state: " + e.getMessage(),
e);
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+            if (tableExists)
+            {
+                dropConfigVersionTable(connection);
+            }
+
+            connection.commit();
+        }
+        catch(SQLException e)
+        {
+            try
+            {
+                connection.rollback();
+            }
+            catch(SQLException re)
+            {
+            }
+            throw e;
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    protected abstract Logger getLogger();
+
+    protected abstract String getSqlBlobType();
+
+    protected abstract String getSqlVarBinaryType(int size);
+
+    protected abstract String getSqlBigIntType();
+
+
+    protected void createOrOpenConfigurationStoreDatabase() throws StoreException
+    {
+        Connection conn = null;
+        try
+        {
+            conn = newAutoCommitConnection();
+
+            createConfiguredObjectsTable(conn);
+            createConfiguredObjectHierarchyTable(conn);
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Unable to open configuration tables", e);
+        }
+        finally
+        {
+            JdbcUtils.closeConnection(conn, getLogger());
+        }
+    }
+
+    private void dropConfigVersionTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute(DROP_CONFIG_VERSION_TABLE);
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+    }
+
+    private void createConfiguredObjectsTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+                        + " ( id VARCHAR(36) not null, object_type varchar(255), attributes
"+getSqlBlobType()+",  PRIMARY KEY (id))");
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+    }
+
+    private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+                             + " ( child_id VARCHAR(36) not null, parent_type varchar(255),
parent_id VARCHAR(36),  PRIMARY KEY (child_id, parent_type))");
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+    }
+
+    protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
+    {
+        return JdbcUtils.tableExists(tableName, conn);
+    }
+
+    private int getConfigVersion(Connection conn) throws SQLException
+    {
+        Statement stmt = conn.createStatement();
+        try
+        {
+            ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+            try
+            {
+
+                if(rs.next())
+                {
+                    return rs.getInt(1);
+                }
+                return DEFAULT_CONFIG_VERSION;
+            }
+            finally
+            {
+                rs.close();
+            }
+
+        }
+        finally
+        {
+            stmt.close();
+        }
+
+    }
+
+    @Override
+    public void create(ConfiguredObjectRecord object) throws StoreException
+    {
+        checkConfigurationStoreOpen();
+        try
+        {
+            Connection conn = newConnection();
+            try
+            {
+                insertConfiguredObject(object, conn);
+                conn.commit();
+            }
+            finally
+            {
+                conn.close();
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error creating ConfiguredObject " + object);
+        }
+    }
+
+    /**
+     * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+     * isolation and with auto-commit transactions enabled.
+     */
+    protected Connection newAutoCommitConnection() throws SQLException
+    {
+        final Connection connection = newConnection();
+        try
+        {
+            connection.setAutoCommit(true);
+        }
+        catch (SQLException sqlEx)
+        {
+
+            try
+            {
+                connection.close();
+            }
+            finally
+            {
+                throw sqlEx;
+            }
+        }
+
+        return connection;
+    }
+
+    /**
+     * Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
+     * isolation and with auto-commit transactions disabled.
+     */
+    protected Connection newConnection() throws SQLException
+    {
+        final Connection connection = getConnection();
+        try
+        {
+            connection.setAutoCommit(false);
+            connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
+        }
+        catch (SQLException sqlEx)
+        {
+            try
+            {
+                connection.close();
+            }
+            finally
+            {
+                throw sqlEx;
+            }
+        }
+        return connection;
+    }
+
+    protected abstract Connection getConnection() throws SQLException;
+
+    private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection
conn) throws StoreException
+    {
+        try
+        {
+            PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+            try
+            {
+                stmt.setString(1, configuredObject.getId().toString());
+                ResultSet rs = stmt.executeQuery();
+                boolean exists;
+                try
+                {
+                    exists = rs.next();
+
+                }
+                finally
+                {
+                    rs.close();
+                }
+                // If we don't have any data in the result set then we can add this configured
object
+                if (!exists)
+                {
+                    PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+                    try
+                    {
+                        insertStmt.setString(1, configuredObject.getId().toString());
+                        insertStmt.setString(2, configuredObject.getType());
+                        if(configuredObject.getAttributes() == null)
+                        {
+                            insertStmt.setNull(3, Types.BLOB);
+                        }
+                        else
+                        {
+                            final Map<String, Object> attributes = configuredObject.getAttributes();
+                            final ObjectMapper objectMapper = new ObjectMapper();
+                            objectMapper.registerModule(_module);
+                            byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+
+                            ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+                            insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+                        }
+                        insertStmt.execute();
+                    }
+                    finally
+                    {
+                        insertStmt.close();
+                    }
+
+                    writeHierarchy(configuredObject, conn);
+                }
+
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+        }
+        catch (JsonMappingException e)
+        {
+            throw new StoreException("Error inserting of configured object " + configuredObject
+ " into database: " + e.getMessage(), e);
+        }
+        catch (JsonGenerationException e)
+        {
+            throw new StoreException("Error inserting of configured object " + configuredObject
+ " into database: " + e.getMessage(), e);
+        }
+        catch (IOException e)
+        {
+            throw new StoreException("Error inserting of configured object " + configuredObject
+ " into database: " + e.getMessage(), e);
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error inserting of configured object " + configuredObject
+ " into database: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
+    {
+        checkConfigurationStoreOpen();
+
+        Collection<UUID> removed = new ArrayList<UUID>(objects.length);
+        try
+        {
+
+            Connection conn = newAutoCommitConnection();
+            try
+            {
+                for(ConfiguredObjectRecord record : objects)
+                {
+                    if(removeConfiguredObject(record.getId(), conn) != 0)
+                    {
+                        removed.add(record.getId());
+                    }
+                }
+            }
+            finally
+            {
+                conn.close();
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects)
+ " from database: " + e.getMessage(), e);
+        }
+        return removed.toArray(new UUID[removed.size()]);
+    }
+
+    private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
+    {
+        final int results;
+        PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+        try
+        {
+            stmt.setString(1, id.toString());
+            results = stmt.executeUpdate();
+        }
+        finally
+        {
+            stmt.close();
+        }
+        stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY);
+        try
+        {
+            stmt.setString(1, id.toString());
+            stmt.executeUpdate();
+        }
+        finally
+        {
+            stmt.close();
+        }
+
+        return results;
+    }
+
+    @Override
+    public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws
StoreException
+    {
+        checkConfigurationStoreOpen();
+        try
+        {
+            Connection conn = newConnection();
+            try
+            {
+                for(ConfiguredObjectRecord record : records)
+                {
+                    updateConfiguredObject(record, createIfNecessary, conn);
+                }
+                conn.commit();
+            }
+            finally
+            {
+                conn.close();
+            }
+        }
+        catch (SQLException e)
+        {
+            throw new StoreException("Error updating configured objects in database: " +
e.getMessage(), e);
+        }
+    }
+
+    private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
+                                        boolean createIfNecessary,
+                                        Connection conn)
+            throws SQLException, StoreException
+    {
+        PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+        try
+        {
+            stmt.setString(1, configuredObject.getId().toString());
+            ResultSet rs = stmt.executeQuery();
+            try
+            {
+                final ObjectMapper objectMapper = new ObjectMapper();
+                objectMapper.registerModule(_module);
+                if (rs.next())
+                {
+                    PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+                    try
+                    {
+                        stmt2.setString(1, configuredObject.getType());
+                        if (configuredObject.getAttributes() != null)
+                        {
+                            byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
+                                    configuredObject.getAttributes());
+                            ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+                            stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+                        }
+                        else
+                        {
+                            stmt2.setNull(2, Types.BLOB);
+                        }
+                        stmt2.setString(3, configuredObject.getId().toString());
+                        stmt2.execute();
+                    }
+                    finally
+                    {
+                        stmt2.close();
+                    }
+                }
+                else if(createIfNecessary)
+                {
+                    PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+                    try
+                    {
+                        insertStmt.setString(1, configuredObject.getId().toString());
+                        insertStmt.setString(2, configuredObject.getType());
+                        if(configuredObject.getAttributes() == null)
+                        {
+                            insertStmt.setNull(3, Types.BLOB);
+                        }
+                        else
+                        {
+                            final Map<String, Object> attributes = configuredObject.getAttributes();
+                            byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+                            ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+                            insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+                        }
+                        insertStmt.execute();
+                    }
+                    finally
+                    {
+                        insertStmt.close();
+                    }
+                    writeHierarchy(configuredObject, conn);
+                }
+            }
+            finally
+            {
+                rs.close();
+            }
+        }
+        catch (JsonMappingException e)
+        {
+            throw new StoreException("Error updating configured object " + configuredObject
+ " in database: " + e.getMessage(), e);
+        }
+        catch (JsonGenerationException e)
+        {
+            throw new StoreException("Error updating configured object " + configuredObject
+ " in database: " + e.getMessage(), e);
+        }
+        catch (IOException e)
+        {
+            throw new StoreException("Error updating configured object " + configuredObject
+ " in database: " + e.getMessage(), e);
+        }
+        finally
+        {
+            stmt.close();
+        }
+    }
+
+    private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection
conn) throws SQLException, StoreException
+    {
+        PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
+        try
+        {
+            for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet())
+            {
+                insertStmt.setString(1, configuredObject.getId().toString());
+                insertStmt.setString(2, parentEntry.getKey());
+                insertStmt.setString(3, parentEntry.getValue().getId().toString());
+
+                insertStmt.execute();
+            }
+        }
+        finally
+        {
+            insertStmt.close();
+        }
+    }
+
+    protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
+
+    @Override
+    public void onDelete()
+    {
+        // TODO should probably check we are closed
+        try
+        {
+            Connection conn = newAutoCommitConnection();
+            try
+            {
+                List<String> tables = new ArrayList<String>();
+                tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
+
+                for (String tableName : tables)
+                {
+                    Statement stmt = conn.createStatement();
+                    try
+                    {
+                        stmt.execute("DROP TABLE " +  tableName);
+                    }
+                    catch(SQLException e)
+                    {
+                        getLogger().warn("Failed to drop table '" + tableName + "' :" + e);
+                    }
+                    finally
+                    {
+                        stmt.close();
+                    }
+                }
+            }
+            finally
+            {
+                conn.close();
+            }
+        }
+        catch(SQLException e)
+        {
+            getLogger().error("Exception while deleting store tables", e);
+        }
+    }
+
+    private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
+    {
+
+        private final UUID _id;
+        private final String _type;
+        private final Map<String, Object> _attributes;
+        private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String,
ConfiguredObjectRecord>();
+
+        private ConfiguredObjectRecordImpl(final UUID id,
+                                           final String type,
+                                           final Map<String, Object> attributes)
+        {
+            _id = id;
+            _type = type;
+            _attributes = Collections.unmodifiableMap(attributes);
+        }
+
+        @Override
+        public UUID getId()
+        {
+            return _id;
+        }
+
+        @Override
+        public String getType()
+        {
+            return _type;
+        }
+
+        private void addParent(String parentType, ConfiguredObjectRecord parent)
+        {
+            _parents.put(parentType, parent);
+        }
+
+        @Override
+        public Map<String, Object> getAttributes()
+        {
+            return _attributes;
+        }
+
+        @Override
+        public Map<String, ConfiguredObjectRecord> getParents()
+        {
+            return Collections.unmodifiableMap(_parents);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "ConfiguredObjectRecordImpl [_id=" + _id + ", _type=" + _type + ", _attributes="
+ _attributes + ", _parents="
+                    + _parents + "]";
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message