jackrabbit-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jukka Zitting" <jukka.zitt...@gmail.com>
Subject Re: Database PersistenceManagers (was "Results of a JR Oracle test that we conducted)
Date Tue, 13 Mar 2007 20:52:50 GMT
Hi,

On 3/13/07, Bryan Davis <brdavis@bea.com> wrote:
> I am working on a response to the many recent additions to this thread
> (hopefully will have something later today).

If you're interested, see below for some code I drafted together last
year when this subject was up earlier. I quickly updated the code to
match the latest changes in Jackrabbit. The class is just a quick
prototype, i.e. it compiles but is not tested and not really
documented.

PS. How about moving this discussion to the development mailing list?

BR,

Jukka Zitting

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.jackrabbit.core.persistence.db;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;

import javax.naming.InitialContext;
import javax.sql.DataSource;

import org.apache.jackrabbit.core.NodeId;
import org.apache.jackrabbit.core.PropertyId;
import org.apache.jackrabbit.core.fs.BasedFileSystem;
import org.apache.jackrabbit.core.fs.FileSystem;
import org.apache.jackrabbit.core.persistence.PMContext;
import org.apache.jackrabbit.core.persistence.PersistenceManager;
import org.apache.jackrabbit.core.persistence.util.BLOBStore;
import org.apache.jackrabbit.core.persistence.util.FileSystemBLOBStore;
import org.apache.jackrabbit.core.persistence.util.Serializer;
import org.apache.jackrabbit.core.state.ChangeLog;
import org.apache.jackrabbit.core.state.ItemState;
import org.apache.jackrabbit.core.state.ItemStateException;
import org.apache.jackrabbit.core.state.NoSuchItemStateException;
import org.apache.jackrabbit.core.state.NodeReferences;
import org.apache.jackrabbit.core.state.NodeReferencesId;
import org.apache.jackrabbit.core.state.NodeState;
import org.apache.jackrabbit.core.state.PropertyState;

public class DataSourcePersistenceManager implements PersistenceManager {

    /**
     * The underlying data source.
     */
    private DataSource database;

    /**
     * JNDI location of the data source used to acquire database connections.
     */
    private String location;

    /**
     * Schema object prefix.
     */
    private String prefix;

    /**
     * Blob store.
     */
    private BLOBStore blobs;

    private String nodeExistsSQL;
    private String propExistsSQL;
    private String refsExistsSQL;

    private String nodeSelectSQL;
    private String propSelectSQL;
    private String refsSelectSQL;

    private String nodeInsertSQL;
    private String propInsertSQL;
    private String refsInsertSQL;

    private String nodeUpdateSQL;
    private String propUpdateSQL;
    private String refsUpdateSQL;

    private String nodeDeleteSQL;
    private String propDeleteSQL;
    private String refsDeleteSQL;

    //----------------------------------------------------< setters & getters >

    /**
     * Returns the JNDI location of the data source.
     *
     * @return data source location
     */
    public String getDataSourceLocation() {
        return location;
    }

    /**
     * Sets the JNDI location of the data source.
     *
     * @param location data source location
     */
    public void setDataSourceLocation(String location) {
        this.location = location;
    }

    /**
     * Returns the schema object prefix.
     *
     * @return schema object prefix
     */
    public String getSchemaObjectPrefix() {
        return prefix;
    }

    /**
     * Sets the schema object prefix.
     *
     * @param prefix
     */
    public void setSchemaObjectPrefix(String prefix) {
        this.prefix = prefix.toUpperCase();
    }

    //--------------------------------------------------< PersistenceManager >

    /**
     * Initializes this persistence manager.
     */
    public void init(PMContext context) throws Exception {
        database = (DataSource) new InitialContext().lookup(location);

        FileSystem filesystem =
            new BasedFileSystem(context.getFileSystem(), "blobs");
        filesystem.init();
        blobs = new FileSystemBLOBStore(filesystem);

        nodeExistsSQL = "SELECT 1 FROM " + prefix + "NODE WHERE NODE_ID=?";
        propExistsSQL = "SELECT 1 FROM " + prefix + "PROP WHERE PROP_ID=?";
        refsExistsSQL = "SELECT 1 FROM " + prefix + "REFS WHERE NODE_ID=?";
        nodeSelectSQL = "SELECT NODE_DATA FROM " + prefix + "NODE
WHERE NODE_ID=?";
        propSelectSQL = "SELECT PROP_DATA FROM " + prefix + "PROP
WHERE PROP_ID=?";
        refsSelectSQL = "SELECT REFS_DATA FROM " + prefix + "REFS
WHERE NODE_ID=?";
        nodeInsertSQL = "INSERT INTO " + prefix + "NODE
(NODE_DATA,NODE_ID) VALUES (?,?)";
        propInsertSQL = "INSERT INTO " + prefix + "PROP
(PROP_DATA,PROP_ID) VALUES (?,?)";
        refsInsertSQL = "INSERT INTO " + prefix + "REFS
(REFS_DATA,NODE_ID) VALUES (?,?)";
        nodeUpdateSQL = "UPDATE " + prefix + "NODE SET NODE_DATA=?
WHERE NODE_ID=?";
        propUpdateSQL = "UPDATE " + prefix + "PROP SET PROP_DATA=?
WHERE PROP_ID=?";
        refsUpdateSQL = "UPDATE " + prefix + "REFS SET REFS_DATA=?
WHERE NODE_ID=?";
        nodeDeleteSQL = "DELETE FROM " + prefix + "NODE WHERE NODE_ID=?";
        propDeleteSQL = "DELETE FROM " + prefix + "PROP WHERE PROP_ID=?";
        refsDeleteSQL = "DELETE FROM " + prefix + "REFS WHERE NODE_ID=?";
    }

    /**
     * Closes this persistence manager.
     */
    public void close() {
        database = null;
        blobs = null;
    }

    /**
     * Creates a new node state instance.
     *
     * @param id node identifier
     * @return node state
     */
    public NodeState createNew(NodeId id) {
        return new NodeState(id, null, null, NodeState.STATUS_NEW, false);
    }

    /**
     * Creates a new property state instance.
     *
     * @param id property identifier
     * @return property state
     */
    public PropertyState createNew(PropertyId id) {
        return new PropertyState(id, PropertyState.STATUS_NEW, false);
    }

    /**
     * Checks whether the identified node state exists.
     *
     * @param id node identifier
     * @return <code>true</code> if the node state exists,
     *         <code>false</code> otherwise
     * @throws ItemStateException if a database error occurred
     */
    public boolean exists(NodeId id) throws ItemStateException {
        return exists(nodeExistsSQL, id.toString());
    }

    /**
     * Checks whether the identified property state exists.
     *
     * @param id property identifier
     * @return <code>true</code> if the property state exists,
     *         <code>false</code> otherwise
     * @throws ItemStateException if a database error occurred
     */
    public boolean exists(PropertyId id) throws ItemStateException {
        return exists(propExistsSQL, id.toString());
    }

    /**
     * Checks whether references to the identified node exists.
     *
     * @param targetId reference identifier
     * @return <code>true</code> if references to the identified node exist,
     *         <code>false</code> otherwise
     * @throws ItemStateException if a database error occurred
     */
    public boolean exists(NodeReferencesId targetId) throws ItemStateException {
        return exists(refsExistsSQL, targetId.toString());
    }

    /**
     * Loads the identified node state.
     *
     * @param id node identifier
     * @return node state
     * @throws NoSuchItemStateException if the node state does not exist
     * @throws ItemStateException if a database error occurred
     */
    public NodeState load(NodeId id)
            throws NoSuchItemStateException, ItemStateException {
        final NodeState state = createNew(id);
        load(nodeSelectSQL, id.toString(), new RecordReader() {
            public void readRecord(InputStream stream) throws Exception {
                Serializer.deserialize(state, stream);
            }
        });
        return state;
    }

    /**
     * Loads the identified property state.
     *
     * @param id property identifier
     * @return property state
     * @throws NoSuchItemStateException if the property state does not exist
     * @throws ItemStateException if a database error occurred
     */
    public PropertyState load(PropertyId id)
            throws NoSuchItemStateException, ItemStateException {
        final PropertyState state = createNew(id);
        load(propSelectSQL, id.toString(), new RecordReader() {
            public void readRecord(InputStream stream) throws Exception {
                Serializer.deserialize(state, stream, blobs);
            }
        });
        return state;
    }

    /**
     * Loads references to the identified node.
     *
     * @param id reference identifier
     * @return node references
     * @throws NoSuchItemStateException if there are no references to the node
     * @throws ItemStateException if a database error occurred
     */
    public NodeReferences load(NodeReferencesId id)
            throws NoSuchItemStateException, ItemStateException {
        final NodeReferences references = new NodeReferences(id);
        load(refsSelectSQL, id.toString(), new RecordReader() {
            public void readRecord(InputStream stream) throws Exception {
                Serializer.deserialize(references, stream);
            }
        });
        return references;
    }

    /**
     * Persists all the changes in the given change log. No changes are
     * persisted if an error occurs.
     *
     * @param changeLog change log
     * @throws ItemStateException if a database error occurred
     */
    public void store(ChangeLog changeLog) throws ItemStateException {
        try {
            Connection connection = database.getConnection();
            try {
                storeItemStates(
                        connection, changeLog.addedStates(),
                        nodeInsertSQL, propInsertSQL);
                storeItemStates(
                        connection, changeLog.modifiedStates(),
                        nodeUpdateSQL, propUpdateSQL);
                deleteItemStates(connection, changeLog.deletedStates());
                storeNodeReferences(connection, changeLog.modifiedRefs());
            } finally {
                connection.close();
            }
        } catch (SQLException e) {
            throw new ItemStateException("Database error", e);
        }
    }

    //-------------------------------------------------------------< private >

    private interface RecordReader {

        void readRecord(InputStream stream) throws Exception;

    }

    private interface RecordWriter {

        String getId(Object record);

        void writeRecord(Object record, OutputStream stream) throws Exception;

    }

    /**
     * Checks whether the identified database record exists.
     *
     * @param sql the SQL SELECT statement to use for the check
     * @param id record identifier
     * @return <code>true</code> if the identified record exists,
     *         <code>false</code> otherwise
     * @throws ItemStateException if a database error occurred
     */
    private boolean exists(String sql, String id) throws ItemStateException {
        try {
            Connection connection = database.getConnection();
            try {
                PreparedStatement select = connection.prepareStatement(sql);
                try {
                    select.setString(1, id.toString());
                    ResultSet rs = select.executeQuery();
                    try {
                        return rs.next();
                    } finally {
                        rs.close();
                    }
                } finally {
                    select.close();
                }
            } finally {
                connection.close();
            }
        } catch (SQLException e) {
            throw new ItemStateException("Database error", e);
        }
    }

    /**
     * Loads the identified database record. The record is deserialized using
     * the given deserializer instance.
     *
     * @param sql the SQL SELECT statement to use for loading the record
     * @param id record identifier
     * @param reader record reader
     * @throws NoSuchItemStateException if the record does not exist
     * @throws ItemStateException if a database error occurred
     */
    private void load(String sql, String id, RecordReader reader)
            throws NoSuchItemStateException, ItemStateException {
        try {
            Connection connection = database.getConnection();
            try {
                PreparedStatement select = connection.prepareStatement(sql);
                try {
                    select.setString(1, id);
                    ResultSet rs = select.executeQuery();
                    try {
                        if (rs.next()) {
                            InputStream stream = rs.getBinaryStream(1);
                            try {
                                reader.readRecord(stream);
                            } catch (Exception e) {
                                throw new ItemStateException(
                                        "Deserialization failed: " + id, e);
                            } finally {
                                stream.close();
                            }
                        } else {
                            throw new NoSuchItemStateException(id);
                        }
                    } catch (IOException e) {
                        throw new ItemStateException("Database error", e);
                    } finally {
                        rs.close();
                    }
                } finally {
                    select.close();
                }
            } finally {
                connection.close();
            }
        } catch (SQLException e) {
            throw new ItemStateException("Database error", e);
        }
    }

    private void classifyItemStates(
            Iterator iterator, Collection nodes, Collection props) {
        while (iterator.hasNext()) {
            ItemState state = (ItemState) iterator.next();
            if (state.isNode()) {
                nodes.add(state);
            } else {
                props.add(state);
            }
        }
    }

    private void storeItemStates(
            Connection connection, Iterator iterator,
            String nodeSQL, String propSQL) throws SQLException {
        Collection nodes = new ArrayList();
        Collection props = new ArrayList();
        classifyItemStates(iterator, nodes, props);
        if (!nodes.isEmpty()) {
            storeRecords(connection, nodeSQL, new RecordWriter() {
                public String getId(Object record) {
                    return ((NodeState) record).getId().toString();
                }
                public void writeRecord(
                        Object record, OutputStream stream) throws Exception {
                    Serializer.serialize((NodeState) record, stream);
                }
            }, nodes.iterator());
        }
        if (!props.isEmpty()) {
            storeRecords(connection, propSQL, new RecordWriter() {
                public String getId(Object record) {
                    return ((PropertyState) record).getId().toString();
                }
                public void writeRecord(
                        Object record, OutputStream stream) throws Exception {
                    Serializer.serialize((PropertyState) record, stream, blobs);
                }
            }, props.iterator());
        }
    }

    private void storeRecords(
            Connection connection, String sql,
            RecordWriter writer, Iterator iterator) throws SQLException {
        PreparedStatement statement = connection.prepareStatement(sql);
        try {
            while (iterator.hasNext()) {
                Object record = iterator.next();
                ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                try {
                    writer.writeRecord(record, buffer);
                } catch (Exception e) {
                    throw new SQLException("Serialization failed: " + record);
                }
                byte[] bytes = buffer.toByteArray();
                statement.setBinaryStream(
                        1, new ByteArrayInputStream(bytes), bytes.length);
                statement.setString(2, writer.getId(record));
                statement.execute();
            }
        } finally {
            statement.close();
        }
    }

    private void deleteItemStates(Connection connection, Iterator iterator)
            throws SQLException {
        Collection nodes = new ArrayList();
        Collection props = new ArrayList();
        classifyItemStates(iterator, nodes, props);
        if (!nodes.isEmpty()) {
            deleteItemStates(connection, nodeDeleteSQL, nodes.iterator());
        }
        if (!props.isEmpty()) {
            deleteItemStates(connection, propDeleteSQL, props.iterator());
        }
    }

    private void deleteItemStates(
            Connection connection, String sql, Iterator iterator)
            throws SQLException {
        PreparedStatement statement = connection.prepareStatement(sql);
        try {
            while (iterator.hasNext()) {
                ItemState state = (ItemState) iterator.next();
                statement.setString(1, state.getId().toString());
                statement.execute();
            }
        } finally {
            statement.close();
        }
    }

    private void storeNodeReferences(
            Connection connection, Iterator iterator) throws SQLException {
        if (iterator.hasNext()) {
            Collection insert = new ArrayList();
            Collection update = new ArrayList();
            Collection delete = new ArrayList();
            classifyNodeReferences(connection, iterator, insert,
update, delete);

            if (!insert.isEmpty()) {
                storeNodeReferences(
                        connection, refsInsertSQL, insert.iterator());
            }
            if (!update.isEmpty()) {
                storeNodeReferences(
                        connection, refsUpdateSQL, update.iterator());
            }
            if (!delete.isEmpty()) {
                deleteNodeReferences(connection, delete.iterator());
            }
        }
    }

    private void storeNodeReferences(
            Connection connection, String sql, Iterator iterator)
            throws SQLException {
        storeRecords(connection, sql, new RecordWriter() {
            public String getId(Object record) {
                return ((NodeReferences) record).getId().toString();
            }
            public void writeRecord(Object record, OutputStream stream)
                    throws Exception {
                Serializer.serialize((NodeReferences) record, stream);
            }
        }, iterator);
    }

    private void deleteNodeReferences(Connection connection, Iterator iterator)
            throws SQLException {
        PreparedStatement delete = connection.prepareStatement(refsDeleteSQL);
        try {
            while (iterator.hasNext()) {
                NodeReferences references = (NodeReferences) iterator.next();
                delete.setString(1, references.getId().toString());
                delete.execute();
            }
        } finally {
            delete.close();
        }
    }

    private void classifyNodeReferences(
            Connection connection, Iterator iterator,
            Collection insert, Collection update, Collection delete)
            throws SQLException {
        PreparedStatement select = connection.prepareStatement(refsExistsSQL);
        try {
            while (iterator.hasNext()) {
                NodeReferences references = (NodeReferences) iterator.next();
                if (!references.hasReferences()) {
                    delete.add(references);
                } else {
                    select.setString(1, references.getId().toString());
                    ResultSet rs = select.executeQuery();
                    try {
                        if (rs.next()) {
                            update.add(references);
                        } else {
                            insert.add(references);
                        }
                    } finally {
                        rs.close();
                    }
                }
            }
        } finally {
            select.close();
        }
    }

}

Mime
View raw message