jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From thom...@apache.org
Subject svn commit: r603499 - in /jackrabbit/trunk/jackrabbit-core/src: main/java/org/apache/jackrabbit/core/data/ main/java/org/apache/jackrabbit/core/data/db/ main/resources/org/apache/jackrabbit/core/data/db/ test/java/org/apache/jackrabbit/core/data/
Date Wed, 12 Dec 2007 07:35:30 GMT
Author: thomasm
Date: Tue Dec 11 23:35:28 2007
New Revision: 603499

URL: http://svn.apache.org/viewvc?rev=603499&view=rev
Log:
JCR-1154 Database data store: support multiple concurrent connections

Added:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/Pool.java
  (with props)
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java
  (with props)
Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/DataStoreException.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/FileDataRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataRecord.java
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
    jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/derby.properties
    jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/h2.properties
    jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/mysql.properties
    jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/postgresql.properties
    jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/sqlserver.properties
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/PersistenceManagerIteratorTest.java
    jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/DataStoreException.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/DataStoreException.java?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/DataStoreException.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/DataStoreException.java
Tue Dec 11 23:35:28 2007
@@ -43,4 +43,14 @@
     public DataStoreException(String message, Throwable cause) {
         super(message, cause);
     }
+    
+    /**
+     * Constructs a new instance of this class with the specified root cause.
+     *
+     * @param rootCause root failure cause
+     */
+    public DataStoreException(Throwable rootCause) {
+        super(rootCause);
+    }
+ 
 }

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/FileDataRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/FileDataRecord.java?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/FileDataRecord.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/FileDataRecord.java
Tue Dec 11 23:35:28 2007
@@ -21,11 +21,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 
-
 /**
  * Data record that is based on a normal file.
  */
-class FileDataRecord extends AbstractDataRecord {
+public class FileDataRecord extends AbstractDataRecord {
 
     /**
      * The file that contains the binary stream.
@@ -38,7 +37,7 @@
      * @param identifier data identifier
      * @param file file that contains the binary stream
      */
-    public FileDataRecord(DataIdentifier identifier, File file) {
+    protected FileDataRecord(DataIdentifier identifier, File file) {
         super(identifier);
         assert file.isFile();
         this.file = file;

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataRecord.java?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataRecord.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataRecord.java
Tue Dec 11 23:35:28 2007
@@ -27,9 +27,9 @@
  */
 public class DbDataRecord extends AbstractDataRecord {
     
-    private final DbDataStore store;
-    private final long length;
-    private long lastModified;
+    protected final DbDataStore store;
+    protected final long length;
+    protected long lastModified;
 
     /**
      * Creates a data record based on the given identifier and length.

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/DbDataStore.java
Tue Dec 11 23:35:28 2007
@@ -23,17 +23,15 @@
 import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager;
 import org.apache.jackrabbit.core.persistence.bundle.util.TrackingInputStream;
 import org.apache.jackrabbit.core.persistence.bundle.util.ConnectionRecoveryManager.StreamWrapper;
+import org.apache.jackrabbit.util.Text;
 import org.apache.jackrabbit.uuid.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedInputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.lang.ref.WeakReference;
 import java.security.DigestInputStream;
 import java.security.MessageDigest;
@@ -46,6 +44,8 @@
 import java.util.Properties;
 import java.util.WeakHashMap;
 
+import javax.jcr.RepositoryException;
+
 /**
  * A data store implementation that stores the records in a database using JDBC.
  * 
@@ -58,6 +58,8 @@
  * <li>&lt;param name="{@link #setDatabaseType(String) databaseType}" value="postgresql"/>
  * <li>&lt;param name="{@link #setDriver(String) driver}" value="org.postgresql.Driver"/>
  * <li>&lt;param name="{@link #setMinRecordLength(int) minRecordLength}" value="1024"/>
+ * <li>&lt;param name="{@link #setMaxConnections(int) maxConnections}" value="2"/>
+ * <li>&lt;param name="{@link #setCopyWhenReading(int) copyWhenReading}" value="true"/>
  * </ul>
  * 
  * <p>
@@ -68,13 +70,21 @@
  * A three level directory structure is used to avoid placing too many
  * files in a single directory. The chosen structure is designed to scale
  * up to billions of distinct records.
+ * <p>
+ * For Microsoft SQL Server 2005, there is a problem reading large BLOBs. You will need to
use
+ * the JDBC driver version 1.2 or newer, and append ;responseBuffering=adaptive to the database
URL.
+ * Don't append ;selectMethod=cursor, otherwise it can still run out of memory.
+ * Example database URL: jdbc:sqlserver://localhost:4220;DatabaseName=test;responseBuffering=adaptive
+ * <p>
+ * By default, the data is copied to a temp file when reading, to avoid problems when reading
multiple
+ * blobs at the same time.
  */
 public class DbDataStore implements DataStore {
     
     /**
      * The digest algorithm used to uniquely identify records.
      */
-    private static final String DIGEST = "SHA-1";
+    protected static final String DIGEST = "SHA-1";
     
     /**
      * Logger instance
@@ -84,142 +94,168 @@
     /**
      * The default value for the minimum object size.
      */
-    private static final int DEFAULT_MIN_RECORD_LENGTH = 100;
+    public static final int DEFAULT_MIN_RECORD_LENGTH = 100;
+    
+    /**
+     * The default value for the maximum connections.
+     */
+    public static final int DEFAULT_MAX_CONNECTIONS = 3;
     
     /**
      * The minimum modified date. If a file is accessed (read or write) with a modified date

      * older than this value, the modified date is updated to the current time.
      */
-    private long minModifiedDate;
+    protected long minModifiedDate;
     
     /**
      * The database URL used.
      */
-    private String url;
+    protected String url;
     
     /**
      * The database driver.
      */
-    private String driver;
+    protected String driver;
     
     /**
      * The user name.
      */
-    private String user;
+    protected String user;
     
     /**
      * The password
      */
-    private String password;
+    protected String password;
     
     /**
      * The database type used.
      */
-    private String databaseType;
+    protected String databaseType;
     
     /**
      * The minimum size of an object that should be stored in this data store.
      */
-    private int minRecordLength = DEFAULT_MIN_RECORD_LENGTH;
+    protected int minRecordLength = DEFAULT_MIN_RECORD_LENGTH;
     
-    private ConnectionRecoveryManager conn;
+    /**
+     * The maximum number of open connections.
+     */
+    protected int maxConnections = DEFAULT_MAX_CONNECTIONS;
     
-    private static final String TEMP_PREFIX = "TEMP_";
+    /**
+     * A list of connections
+     */
+    protected Pool connectionPool;
     
-    private String tableSQL = "DATASTORE";
-    private String createTableSQL = 
-        "CREATE TABLE DATASTORE(ID VARCHAR(255) PRIMARY KEY, LENGTH BIGINT, LAST_MODIFIED
BIGINT, DATA BLOB)";
-    private String insertTempSQL = 
-        "INSERT INTO DATASTORE VALUES(?, 0, ?, NULL)";
-    private String updateDataSQL = 
-        "UPDATE DATASTORE SET DATA=? WHERE ID=?";
-    private String updateLastModifiedSQL = 
-        "UPDATE DATASTORE SET LAST_MODIFIED=? WHERE ID=? AND LAST_MODIFIED<?";
-    private String updateSQL = 
-        "UPDATE DATASTORE SET ID=?, LENGTH=?, LAST_MODIFIED=? WHERE ID=? AND NOT EXISTS(SELECT
ID FROM DATASTORE WHERE ID=?)";
-    private String deleteSQL = 
-        "DELETE FROM DATASTORE WHERE ID=?";
-    private String deleteOlderSQL = 
-        "DELETE FROM DATASTORE WHERE LAST_MODIFIED<?";
-    private String selectMetaSQL = 
-        "SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID=?";
-    private String selectAllSQL = 
-        "SELECT ID FROM DATASTORE";
-    private String selectDataSQL = 
-        "SELECT DATA FROM DATASTORE WHERE ID=?";
-    private String storeStream = STORE_TEMP_FILE;
-    
-    // write to a temporary file to get the length (slow, but always works)
-    // this is the default setting
-    private static final String STORE_TEMP_FILE = "tempFile";
+    /**
+     * The prefix used for temporary objects.
+     */
+    protected static final String TEMP_PREFIX = "TEMP_";
+    
+    /**
+     * The prefix for the datastore table, empty by default.
+     */
+    protected String tablePrefix = "";
     
-    // call PreparedStatement.setBinaryStream(..., -1)
-    private static final String STORE_SIZE_MINUS_ONE = "-1";
+    protected String tableSQL = "DATASTORE";
+    protected String createTableSQL = 
+        "CREATE TABLE ${tablePrefix}${table}(ID VARCHAR(255) PRIMARY KEY, LENGTH BIGINT,
LAST_MODIFIED BIGINT, DATA BLOB)";
+    protected String insertTempSQL = 
+        "INSERT INTO ${tablePrefix}${table} VALUES(?, 0, ?, NULL)";
+    protected String updateDataSQL = 
+        "UPDATE ${tablePrefix}${table} SET DATA=? WHERE ID=?";
+    protected String updateLastModifiedSQL = 
+        "UPDATE ${tablePrefix}${table} SET LAST_MODIFIED=? WHERE ID=? AND LAST_MODIFIED<?";
+    protected String updateSQL = 
+        "UPDATE ${tablePrefix}${table} SET ID=?, LENGTH=?, LAST_MODIFIED=? WHERE ID=? AND
NOT EXISTS(SELECT ID FROM ${tablePrefix}${table} WHERE ID=?)";
+    protected String deleteSQL = 
+        "DELETE FROM ${tablePrefix}${table} WHERE ID=?";
+    protected String deleteOlderSQL = 
+        "DELETE FROM ${tablePrefix}${table} WHERE LAST_MODIFIED<?";
+    protected String selectMetaSQL = 
+        "SELECT LENGTH, LAST_MODIFIED FROM ${tablePrefix}${table} WHERE ID=?";
+    protected String selectAllSQL = 
+        "SELECT ID FROM ${tablePrefix}${table}";
+    protected String selectDataSQL = 
+        "SELECT ID, DATA FROM ${tablePrefix}${table} WHERE ID=?";
     
-    // call PreparedStatement.setBinaryStream(..., Integer.MAX_VALUE)
-    private static final String STORE_SIZE_MAX = "max";
+    /**
+     * The stream storing mechanism used.
+     */
+    protected String storeStream = STORE_TEMP_FILE;
+
+    /**
+     * Write to a temporary file to get the length (slow, but always works).
+     * This is the default setting.
+     */
+    public static final String STORE_TEMP_FILE = "tempFile";
+    
+    /**
+     * Call PreparedStatement.setBinaryStream(..., -1)
+     */
+    public static final String STORE_SIZE_MINUS_ONE = "-1";
+    
+    /**
+     * Call PreparedStatement.setBinaryStream(..., Integer.MAX_VALUE)
+     */
+    public static final String STORE_SIZE_MAX = "max";
+    
+    /**
+     * Copy the stream to a temp file before returning it. 
+     * Enabled by default to support concurrent reads.
+     */
+    private boolean copyWhenReading = true;
     
     /**
      * All data identifiers that are currently in use are in this set until they are garbage
collected.
      */
-    private WeakHashMap inUse = new WeakHashMap();    
+    protected WeakHashMap inUse = new WeakHashMap();    
 
     /**
      * {@inheritDoc}
      */
-    public synchronized DataRecord addRecord(InputStream stream) throws DataStoreException
{
-        conn.setAutoReconnect(false);
-        String id = null, tempId = null;            
-        long now;            
-        for (int i = 0; i < ConnectionRecoveryManager.TRIALS; i++) {
-            try {
-                now = System.currentTimeMillis();
-                id = UUID.randomUUID().toString();
-                tempId = TEMP_PREFIX + id;
-                PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{tempId});
-                ResultSet rs = prep.getResultSet();
-                if (rs.next()) {
-                    // re-try in the very, very unlikely event that the row already exists
-                    continue;
-                }
-                conn.executeStmt(insertTempSQL, new Object[]{tempId, new Long(now)});
-                break;
-            } catch (Exception e) {
-                throw convert("Can not insert new record", e);
-            }
-        }
-        if (id == null) {
-            String msg = "Can not create new record";
-            log.error(msg);
-            throw new DataStoreException(msg);
-        }
+    public DataRecord addRecord(InputStream stream) throws DataStoreException {
         ResultSet rs = null;
+        TempFileInputStream fileInput = null;
+        ConnectionRecoveryManager conn = getConnection();
         try {
+            conn.setAutoReconnect(false);
+            String id = null, tempId = null;            
+            long now;            
+            for (int i = 0; i < ConnectionRecoveryManager.TRIALS; i++) {
+                try {
+                    now = System.currentTimeMillis();
+                    id = UUID.randomUUID().toString();
+                    tempId = TEMP_PREFIX + id;
+                    PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{tempId});
+                    rs = prep.getResultSet();
+                    if (rs.next()) {
+                        // re-try in the very, very unlikely event that the row already exists
+                        continue;
+                    }
+                    conn.executeStmt(insertTempSQL, new Object[]{tempId, new Long(now)});
+                    break;
+                } catch (Exception e) {
+                    throw convert("Can not insert new record", e);
+                }
+            }
+            if (id == null) {
+                String msg = "Can not create new record";
+                log.error(msg);
+                throw new DataStoreException(msg);
+            }
             MessageDigest digest = getDigest();
             DigestInputStream dIn = new DigestInputStream(stream, digest);
             TrackingInputStream in = new TrackingInputStream(dIn);
-            File temp = null;
-            InputStream fileInput = null;
             StreamWrapper wrapper;
             if (STORE_SIZE_MINUS_ONE.equals(storeStream)) {
                 wrapper = new StreamWrapper(in, -1);
             } else if (STORE_SIZE_MAX.equals(storeStream)) {
-                    wrapper = new StreamWrapper(in, Integer.MAX_VALUE);
+                wrapper = new StreamWrapper(in, Integer.MAX_VALUE);
             } else if (STORE_TEMP_FILE.equals(storeStream)) {
-                int length = 0;
-                temp = File.createTempFile("dbRecord", null);
-                OutputStream out = new FileOutputStream(temp);
-                byte[] b = new byte[4096];
-                while (true) {
-                    int n = in.read(b);
-                    if (n < 0) {
-                        break;
-                    }
-                    out.write(b, 0, n);
-                    length += n;
-                }
-                out.close();
-                fileInput = new BufferedInputStream(new FileInputStream(temp));
+                File temp = moveToTempFile(in);
+                fileInput = new TempFileInputStream(temp);
+                long length = temp.length();
                 wrapper = new StreamWrapper(fileInput, length);
             } else {
                 throw new DataStoreException("Unsupported stream store algorithm: " + storeStream);
@@ -228,6 +264,7 @@
             now = System.currentTimeMillis();
             long length = in.getPosition();
             DataIdentifier identifier = new DataIdentifier(digest.digest());
+            usesIdentifier(identifier);
             id = identifier.toString();
             // UPDATE DATASTORE SET ID=?, LENGTH=?, LAST_MODIFIED=? 
             // WHERE ID=? 
@@ -236,10 +273,6 @@
                     id, new Long(length), new Long(now), 
                     tempId, id});
             int count = prep.getUpdateCount();
-            if (temp != null) {
-                fileInput.close();
-                temp.delete();
-            }
             if (count == 0) {
                 // update count is 0, meaning such a row already exists
                 // DELETE FROM DATASTORE WHERE ID=?
@@ -266,13 +299,36 @@
             throw convert("Can not insert new record", e);
         } finally {
             conn.closeSilently(rs);
+            putBack(conn);
+            if (fileInput != null) {
+                try {
+                    fileInput.close();
+                } catch (IOException e) {
+                    throw convert("Can not close temporary file", e);
+                }
+            }
         }
     }
+    
+    /**
+     * Creates a temp file and copies the data there.
+     * The input stream is closed afterwards.
+     * 
+     * @param in the input stream
+     * @return the file
+     * @throws IOException
+     */
+    private File moveToTempFile(InputStream in) throws IOException {
+        File temp = File.createTempFile("dbRecord", null);
+        TempFileInputStream.writeToFileAndClose(in, temp);
+        return temp;
+    }
 
     /**
      * {@inheritDoc}
      */
     public synchronized int deleteAllOlderThan(long min) throws DataStoreException {
+        ConnectionRecoveryManager conn = getConnection();
         try {
             Iterator it = inUse.keySet().iterator();
             while (it.hasNext()) {
@@ -286,6 +342,8 @@
             return prep.getUpdateCount();
         } catch (Exception e) {
             throw convert("Can not delete records", e);
+        } finally {
+            putBack(conn);
         }
     }
 
@@ -293,6 +351,7 @@
      * {@inheritDoc}
      */
     public Iterator getAllIdentifiers() throws DataStoreException {
+        ConnectionRecoveryManager conn = getConnection();
         ArrayList list = new ArrayList();
         ResultSet rs = null;
         try {
@@ -311,6 +370,7 @@
             throw convert("Can not read records", e);
         } finally {
             conn.closeSilently(rs);
+            putBack(conn);
         }        
     }
 
@@ -333,12 +393,13 @@
     /**
      * {@inheritDoc}
      */
-    public synchronized DataRecord getRecord(DataIdentifier identifier) throws DataStoreException
{
+    public DataRecord getRecord(DataIdentifier identifier) throws DataStoreException {
+        ConnectionRecoveryManager conn = getConnection();
         usesIdentifier(identifier);
         ResultSet rs = null;
         try {
-            // SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID = ?
             String id = identifier.toString();
+            // SELECT LENGTH, LAST_MODIFIED FROM DATASTORE WHERE ID = ?
             PreparedStatement prep = conn.executeStmt(selectMetaSQL, new Object[]{id});
             rs = prep.getResultSet();
             if (!rs.next()) {
@@ -352,6 +413,7 @@
             throw convert("Can not read identifier " + identifier, e);
         } finally {
             conn.closeSilently(rs);
+            putBack(conn);
         }
     }
 
@@ -361,21 +423,26 @@
     public synchronized void init(String homeDir) throws DataStoreException {
         try {
             initDatabaseType();
+            connectionPool = new Pool(this, maxConnections);
+            ConnectionRecoveryManager conn = getConnection();
             conn = new ConnectionRecoveryManager(false, driver, url, user, password);
             conn.setAutoReconnect(true);
             DatabaseMetaData meta = conn.getConnection().getMetaData();
+            log.info("Using JDBC driver " + meta.getDriverName() + " " + meta.getDriverVersion());
+            meta.getDriverVersion();
             ResultSet rs = meta.getTables(null, null, tableSQL, null);
             boolean exists = rs.next();
             rs.close();
             if (!exists) {
                 conn.executeStmt(createTableSQL, null);
             }
+            putBack(conn);
         } catch (Exception e) {
             throw convert("Can not init data store, driver=" + driver + " url=" + url + "
user=" + user, e);
         }
     }
     
-    private void initDatabaseType() throws DataStoreException {
+    protected void initDatabaseType() throws DataStoreException {
         boolean failIfNotFound;
         if (databaseType == null) {
             if (!url.startsWith("jdbc:")) {
@@ -407,20 +474,20 @@
             throw new DataStoreException(msg);
         }
         if (driver == null) {
-            driver = prop.getProperty("driver", driver);
+            driver = getProperty(prop, "driver", driver);
         }
-        tableSQL = prop.getProperty("table", tableSQL);
-        createTableSQL = prop.getProperty("createTable", createTableSQL);
-        insertTempSQL = prop.getProperty("insertTemp", insertTempSQL);
-        updateDataSQL = prop.getProperty("updateData", updateDataSQL);
-        updateLastModifiedSQL = prop.getProperty("updateLastModified", updateLastModifiedSQL);
-        updateSQL = prop.getProperty("update", updateSQL);
-        deleteSQL = prop.getProperty("delete", deleteSQL);
-        deleteOlderSQL = prop.getProperty("deleteOlder", deleteOlderSQL);
-        selectMetaSQL = prop.getProperty("selectMeta", selectMetaSQL);
-        selectAllSQL = prop.getProperty("selectAll", selectAllSQL);
-        selectDataSQL = prop.getProperty("selectData", selectDataSQL);
-        storeStream = prop.getProperty("storeStream", storeStream);
+        tableSQL = getProperty(prop, "table", tableSQL);
+        createTableSQL = getProperty(prop, "createTable", createTableSQL);
+        insertTempSQL = getProperty(prop, "insertTemp", insertTempSQL);
+        updateDataSQL = getProperty(prop, "updateData", updateDataSQL);
+        updateLastModifiedSQL = getProperty(prop, "updateLastModified", updateLastModifiedSQL);
+        updateSQL = getProperty(prop, "update", updateSQL);
+        deleteSQL = getProperty(prop, "delete", deleteSQL);
+        deleteOlderSQL = getProperty(prop, "deleteOlder", deleteOlderSQL);
+        selectMetaSQL = getProperty(prop, "selectMeta", selectMetaSQL);
+        selectAllSQL = getProperty(prop, "selectAll", selectAllSQL);
+        selectDataSQL = getProperty(prop, "selectData", selectDataSQL);
+        storeStream = getProperty(prop, "storeStream", storeStream);
         if (STORE_SIZE_MINUS_ONE.equals(storeStream)) {
         } else if (STORE_TEMP_FILE.equals(storeStream)) {
         } else if (STORE_SIZE_MAX.equals(storeStream)) {            
@@ -432,10 +499,38 @@
             throw new DataStoreException(msg);
         }
     }
+    
+    /**
+     * Get the expanded property value. The following placeholders are supported:
+     * ${table}: the table name (the default is DATASTORE) and
+     * ${tablePrefix}: the prefix as set in the configuration (empty by default).
+     * 
+     * @param prop the properties object
+     * @param key the key
+     * @param defaultValue the default value
+     * @return the property value (placeholders are replaced)
+     */
+    protected String getProperty(Properties prop, String key, String defaultValue) {
+        String sql = prop.getProperty(key, defaultValue);
+        sql = Text.replace(sql, "${table}", tableSQL).trim();
+        sql = Text.replace(sql, "${tablePrefix}", tablePrefix).trim();
+        return sql;
+    }
 
-    private DataStoreException convert(String cause, Exception e) {
+    /**
+     * Convert an exception to a data store exception.
+     * 
+     * @param cause the message
+     * @param e the root cause
+     * @return the data store exception
+     */
+    protected DataStoreException convert(String cause, Exception e) {
         log.warn(cause, e);
-        return new DataStoreException(cause, e);
+        if (e instanceof DataStoreException) {
+            return (DataStoreException) e;
+        } else {
+            return new DataStoreException(cause, e);
+        }
     }
 
     /**
@@ -446,19 +541,29 @@
         minModifiedDate = before;
     }
 
-    synchronized long touch(DataIdentifier identifier, long lastModified) throws DataStoreException
{
+    /**
+     * Update the modified date of an entry if required.
+     * 
+     * @param identifier the entry identifier
+     * @param lastModified the current last modified date
+     * @return the new modified date
+     */
+    long touch(DataIdentifier identifier, long lastModified) throws DataStoreException {
         usesIdentifier(identifier);
         if (lastModified < minModifiedDate) {
             long now = System.currentTimeMillis();
             Long n = new Long(now);
-            // UPDATE DATASTORE SET LAST_MODIFIED = ? WHERE ID = ? AND LAST_MODIFIED <
?
+            ConnectionRecoveryManager conn = getConnection();
             try {
+                // UPDATE DATASTORE SET LAST_MODIFIED = ? WHERE ID = ? AND LAST_MODIFIED
< ?
                 conn.executeStmt(updateLastModifiedSQL, new Object[]{
                         n, identifier.toString(), n
                 });
                 return now;
             } catch (Exception e) {
                 throw convert("Can not update lastModified", e);
+            } finally {
+                putBack(conn);
             }
         }
         return lastModified;
@@ -468,17 +573,25 @@
      * {@inheritDoc}
      */    
     public InputStream getInputStream(DataIdentifier identifier) throws DataStoreException
{
+        ConnectionRecoveryManager conn = getConnection();
         try {
-            // SELECT DATA FROM DATASTORE WHERE ID = ?
             String id = identifier.toString();
+            // SELECT ID, DATA FROM DATASTORE WHERE ID = ?
             PreparedStatement prep = conn.executeStmt(selectDataSQL, new Object[]{id});
             ResultSet rs = prep.getResultSet();
             if (!rs.next()) {
                 throw new DataStoreException("Record not found: " + identifier);
             }
-            return rs.getBinaryStream(1);
+            InputStream in = new BufferedInputStream(rs.getBinaryStream(2));
+            if (copyWhenReading) {
+                File temp = moveToTempFile(in);
+                in = new TempFileInputStream(temp);
+            }
+            return in;
         } catch (Exception e) {
             throw convert("Can not read identifier " + identifier, e);
+        } finally {
+            putBack(conn);
         }
     }
 
@@ -578,11 +691,16 @@
     /**
      * {@inheritDoc}
      */
-    public void close() {
-        conn.close();
+    public synchronized void close() {
+        ArrayList list = connectionPool.getAll();
+        for (int i = 0; i < list.size(); i++) {
+            ConnectionRecoveryManager conn = (ConnectionRecoveryManager) list.get(i);
+            conn.close();
+        }
+        list.clear();
     }
     
-    private void usesIdentifier(DataIdentifier identifier) {
+    protected void usesIdentifier(DataIdentifier identifier) {
         inUse.put(identifier, new WeakReference(identifier));
     }
     
@@ -593,12 +711,95 @@
         inUse.clear();
     }    
     
-    private synchronized MessageDigest getDigest() throws DataStoreException {
+    protected synchronized MessageDigest getDigest() throws DataStoreException {
         try {
             return MessageDigest.getInstance(DIGEST);
         } catch (NoSuchAlgorithmException e) {
             throw convert("No such algorithm: " + DIGEST, e);
         }
+    }
+    
+    protected ConnectionRecoveryManager getConnection() throws DataStoreException {
+        try {
+            return (ConnectionRecoveryManager) connectionPool.get();
+        } catch (InterruptedException e) {
+            throw new DataStoreException("Interrupted", e);
+        } catch (RepositoryException e) {
+            throw new DataStoreException("Can not open a new connection", e);
+        }
+    }
+    
+    protected void putBack(ConnectionRecoveryManager conn) throws DataStoreException {
+        try {
+            connectionPool.add(conn);
+        } catch (InterruptedException e) {
+            throw new DataStoreException("Interrupted", e);
+        }
+    }
+
+    /**
+     * Get the maximum number of concurrent connections.
+     * 
+     * @return the maximum number of connections.
+     */
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    /**
+     * Set the maximum number of concurrent connections.
+     * 
+     * @param maxConnections the new value
+     */
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    /**
+     * Create a new connection.
+     * 
+     * @return the new connection
+     */
+    public ConnectionRecoveryManager createNewConnection() throws RepositoryException {
+        ConnectionRecoveryManager conn = new ConnectionRecoveryManager(false, driver, url,
user, password);
+        return conn;
+    }
+
+    /**
+     * Is a stream copied to a temporary file before returning?
+     * 
+     * @return the setting
+     */
+    public boolean getCopyWhenReading() {
+        return copyWhenReading;
+    }
+
+    /**
+     * The the copy setting. If enabled,
+     * a stream is always copied to a temporary file when reading a stream.
+     * 
+     * @param copyWhenReading the new setting
+     */
+    public void setCopyWhenReading(boolean copyWhenReading) {
+        this.copyWhenReading = copyWhenReading;
+    }
+
+    /**
+     * Get the table prefix. The default is empty.
+     * 
+     * @return the table prefix.
+     */
+    public String getTablePrefix() {
+        return tablePrefix;
+    }
+
+    /**
+     * Set the new table prefix.
+     * 
+     * @param tablePrefix the new value
+     */
+    public void setTablePrefix(String tablePrefix) {
+        this.tablePrefix = tablePrefix;
     }
 
 }

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/Pool.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/Pool.java?rev=603499&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/Pool.java
(added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/Pool.java
Tue Dec 11 23:35:28 2007
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.data.db;
+
+import java.util.ArrayList;
+
+import javax.jcr.RepositoryException;
+
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+
+/**
+ * Implementation of a simple ConnectionRecoveryManager pool.
+ * The maximum number of pooled objects can be set, and if more objects
+ * are requested the pool waits until one object is put back.
+ * 
+ * @author Thomas Mueller
+ */
+public class Pool {
+    protected final int maxSize;
+    protected final ArrayList all = new ArrayList();
+    protected final DbDataStore factory;
+    protected final LinkedQueue pool = new LinkedQueue();
+    
+    /**
+     * Create a new pool using the given factory and maximum pool size.
+     * 
+     * @param factory the db data store
+     * @param maxSize the maximum number of objects in the pool.
+     */
+    protected Pool(DbDataStore factory, int maxSize) {
+        this.factory = factory;
+        this.maxSize = Math.max(1, maxSize);
+    }
+    
+    /**
+     * Get a connection from the pool. This method may open a new connection if
+     * required, or if the maximum number of connections are opened, it will
+     * wait for one to be returned.
+     * 
+     * @return the connection
+     */
+    protected Object get() throws InterruptedException, RepositoryException {
+        Object o = pool.poll(0);
+        if (o == null) {
+            synchronized (all) {
+                if (all.size() < maxSize) {
+                    o = factory.createNewConnection();
+                    all.add(o);
+                }
+            }
+            if (o == null) {
+                o = pool.take();
+            }
+        }
+        return o;
+    }
+    
+    /**
+     * But a connection back into the pool.
+     * 
+     * @param o the connection
+     */
+    protected void add(Object o) throws InterruptedException {
+        pool.put(o);
+    }
+    
+    /**
+     * Get all connections (even if they are currently being used).
+     * 
+     * @return all connections
+     */
+    protected ArrayList getAll() {
+        return all;
+    }
+}

Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/Pool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java?rev=603499&view=auto
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java
(added)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java
Tue Dec 11 23:35:28 2007
@@ -0,0 +1,104 @@
+package org.apache.jackrabbit.core.data.db;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * An input stream from a temp file that self-destructs when fully read or closed.
+ * 
+ * @author Thomas Mueller
+ */
+public class TempFileInputStream extends InputStream {
+    
+    private final File file;
+    private final InputStream in;
+    private boolean closed;
+    
+    /**
+     * Copy the data to a file and close the input stream afterwards.
+     * 
+     * @param in the input stream
+     * @param file the target file
+     * @return the size of the file
+     */
+    public static long writeToFileAndClose(InputStream in, File file) throws IOException
{
+        OutputStream out = new FileOutputStream(file);
+        byte[] b = new byte[4096];
+        while (true) {
+            int n = in.read(b);
+            if (n < 0) {
+                break;
+            }
+            out.write(b, 0, n);
+        }
+        out.close();
+        in.close();
+        return file.length();
+    }
+    
+    /**
+     * Construct a new temporary file input stream.
+     * The file is deleted if the input stream is closed or fully read.
+     * Deleting is only attempted once.
+     * 
+     * @param file the temporary file
+     */
+    TempFileInputStream(File file) throws FileNotFoundException {
+        this.file = file;
+        in = new BufferedInputStream(new FileInputStream(file));
+    }
+    
+    private int closeIfEOF(int read) throws IOException {
+        if (read < 0) {
+            close();
+        }
+        return read;
+    }
+    
+    public void close() throws IOException {
+        if (!closed) {
+            in.close();
+            file.delete();
+            closed = true;
+        }
+    }
+    
+    public int available() throws IOException {
+        return in.available();
+    }
+    
+    public void mark(int readlimit) {
+        in.mark(readlimit);
+    }
+    
+    public boolean markSupported() {
+        return in.markSupported();
+    }
+    
+    public long skip(long n) throws IOException {
+        return in.skip(n);
+    }
+    
+    public void reset() throws IOException {
+        in.reset();
+    }
+    
+    public int read(byte[] b, int off, int len) throws IOException {
+        return closeIfEOF(in.read(b, off, len));
+    }
+
+    public int read(byte[] b) throws IOException {
+        return closeIfEOF(in.read(b));
+    }
+
+    public int read() throws IOException {
+        return closeIfEOF(in.read());
+    }
+
+}

Propchange: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/data/db/TempFileInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/derby.properties
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/derby.properties?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/derby.properties
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/derby.properties
Tue Dec 11 23:35:28 2007
@@ -1 +1,2 @@
+# Tested with Apache Derby 10.3.1.4 on Windows XP (2007-12-11)
 driver=org.apache.derby.jdbc.EmbeddedDriver

Modified: jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/h2.properties
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/h2.properties?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/h2.properties
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/h2.properties
Tue Dec 11 23:35:28 2007
@@ -1,2 +1,3 @@
+# Tested with H2 1.0.63 on Windows XP (2007-12-11)
 driver=org.h2.Driver
 storeStream=-1

Modified: jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/mysql.properties
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/mysql.properties?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/mysql.properties
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/mysql.properties
Tue Dec 11 23:35:28 2007
@@ -1 +1,5 @@
-driver=com.mysql.jdbc.Driver
\ No newline at end of file
+# Tested with MySQL 5.0.27-community-nt on Windows XP (2007-12-11)
+# currently, the objects must fit in memory
+driver=com.mysql.jdbc.Driver
+createTable=CREATE TABLE ${tablePrefix}${table}(ID VARCHAR(255) PRIMARY KEY, LENGTH BIGINT,
LAST_MODIFIED BIGINT, DATA BLOB(2147483647))
+update=UPDATE ${tablePrefix}${table} SET ID=?, LENGTH=?, LAST_MODIFIED=? WHERE ID=? AND NOT
EXISTS(SELECT * FROM(SELECT ID FROM ${tablePrefix}${table} WHERE ID=?) X)

Modified: jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/postgresql.properties
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/postgresql.properties?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/postgresql.properties
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/postgresql.properties
Tue Dec 11 23:35:28 2007
@@ -1 +1,5 @@
-driver=org.postgresql.Driver
\ No newline at end of file
+# Tested with PostgreSQL 8.2.4 on Windows XP (2007-12-11)
+# currently, the objects must fit in memory
+driver=org.postgresql.Driver
+table=datastore
+createTable=CREATE TABLE ${tablePrefix}${table}(ID VARCHAR(255) PRIMARY KEY, LENGTH BIGINT,
LAST_MODIFIED BIGINT, DATA BYTEA)

Modified: jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/sqlserver.properties
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/sqlserver.properties?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/sqlserver.properties
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/resources/org/apache/jackrabbit/core/data/db/sqlserver.properties
Tue Dec 11 23:35:28 2007
@@ -1 +1,3 @@
-driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
\ No newline at end of file
+# Tested with Microsoft SQL Server 2005 4 on Windows XP (2007-12-11)
+driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
+createTable=CREATE TABLE ${tablePrefix}${table}(ID VARCHAR(255) PRIMARY KEY, LENGTH BIGINT,
LAST_MODIFIED BIGINT, DATA IMAGE)

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/PersistenceManagerIteratorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/PersistenceManagerIteratorTest.java?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/PersistenceManagerIteratorTest.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/PersistenceManagerIteratorTest.java
Tue Dec 11 23:35:28 2007
@@ -52,6 +52,8 @@
             return;
         }
 
+        // TODO: make getWorkspaceNames public or create a utility class
+
         RepositoryImpl r = (RepositoryImpl) rep;
         Method m = r.getClass().getDeclaredMethod("getWorkspaceNames",
                 new Class[0]);

Modified: jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java?rev=603499&r1=603498&r2=603499&view=diff
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/test/java/org/apache/jackrabbit/core/data/RandomInputStream.java
Tue Dec 11 23:35:28 2007
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.core.data;
 
+import java.io.BufferedInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 
 public class RandomInputStream extends InputStream {
@@ -23,7 +25,7 @@
     private static final long MUL = 0x5DEECE66DL;
     private static final long ADD = 0xBL;
     private static final long MASK = (1L << 48) - 1;
-    private static final int MAX_READ_BLOCK = 15;
+    private static final int DEFAULT_MAX_READ_BLOCK_SIZE = 15;
 
     private final long initialSeed;
     private final long len;
@@ -31,14 +33,38 @@
     private long pos;
     private long markedPos;
     private long state;
+    private int maxReadBlockSize;
 
     public String toString() {
         return "new RandomInputStream(" + initialSeed + ", " + len + ")";
     }
 
     public RandomInputStream(long seed, long len) {
+        this(seed, len, DEFAULT_MAX_READ_BLOCK_SIZE);
+    }
+
+    public static void compareStreams(InputStream a, InputStream b) throws IOException {
+        a = new BufferedInputStream(a);
+        b = new BufferedInputStream(b);
+        long pos = 0;
+        while (true) {
+            int x = a.read();
+            int y = b.read();
+            if (x == -1 || y == -1) {
+                if (x == y) {
+                    break;
+                }
+            } 
+            if (x != y) {
+                throw new IOException("Incorrect byte at position " + pos + ": x=" + x +
" y=" + y);
+            }
+        }
+    }
+
+    public RandomInputStream(long seed, long len, int maxReadBlockSize) {
         this.initialSeed = seed;
         this.len = len;
+        this.maxReadBlockSize = maxReadBlockSize;
         setSeed(seed);
         reset();
     }
@@ -56,8 +82,8 @@
         if (n > (len - pos)) {
             n = (len - pos);
         }
-        if (n > MAX_READ_BLOCK) {
-            n = MAX_READ_BLOCK;
+        if (n > maxReadBlockSize) {
+            n = maxReadBlockSize;
         } else if (n < 0) {
             n = 0;
         }



Mime
View raw message