jackrabbit-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dpfis...@apache.org
Subject svn commit: r541244 - /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
Date Thu, 24 May 2007 09:52:38 GMT
Author: dpfister
Date: Thu May 24 02:52:38 2007
New Revision: 541244

URL: http://svn.apache.org/viewvc?view=rev&rev=541244
Log:
JCR-927: DatabaseJournal needs connection reestablishment logic

Modified:
    jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java

Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java?view=diff&rev=541244&r1=541243&r2=541244
==============================================================================
--- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
(original)
+++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java
Thu May 24 02:52:38 2007
@@ -53,6 +53,8 @@
  * defaults to an empty string</li>
  * <li><code>user</code>: username to specify when connecting</li>
  * <li><code>password</code>: password to specify when connecting</li>
+ * <li><code>reconnectDelayMs</code>: number of milliseconds to wait before
+ * trying to reconnect to the database.
  * </ul>
  */
 public class DatabaseJournal extends AbstractJournal {
@@ -69,6 +71,11 @@
     private static final String DEFAULT_DDL_NAME = "default.ddl";
 
     /**
+     * Default reconnect delay in milliseconds.
+     */
+    private static final long DEFAULT_RECONNECT_DELAY_MS = 10000;
+
+    /**
      * Logger.
      */
     private static Logger log = LoggerFactory.getLogger(DatabaseJournal.class);
@@ -104,9 +111,14 @@
     private String password;
 
     /**
+     * Reconnect delay in milliseconds, bean property.
+     */
+    private long reconnectDelayMs;
+
+    /**
      * JDBC Connection used.
      */
-    private Connection con;
+    private Connection connection;
 
     /**
      * Statement returning all revisions within a range.
@@ -134,6 +146,11 @@
     private long lockedRevision;
 
     /**
+     * Next time in milliseconds to reattempt connecting to the database.
+     */
+    private long reconnectTimeMs;
+
+    /**
      * {@inheritDoc}
      */
     public void init(String id, NamespaceResolver resolver)
@@ -141,6 +158,39 @@
 
         super.init(id, resolver);
 
+        // Provide valid defaults for arguments
+        if (schemaObjectPrefix == null) {
+            schemaObjectPrefix = "";
+        }
+        if (reconnectDelayMs == 0) {
+            reconnectDelayMs = DEFAULT_RECONNECT_DELAY_MS;
+        }
+
+        init();
+
+        try {
+            connection = getConnection();
+            connection.setAutoCommit(true);
+            checkSchema();
+            prepareStatements();
+        } catch (Exception e) {
+            String msg = "Unable to create connection.";
+            throw new JournalException(msg, e);
+        }
+        log.info("DatabaseJournal initialized.");
+    }
+
+    /**
+     * Completes initialization of this database journal. Base implementation
+     * checks whether the required bean properties <code>driver</code> and
+     * <code>url</code> have been specified and optionally deduces a valid
+     * schema. Should be overridden by subclasses that use a different way to
+     * create a connection and therefore require other arguments.
+     *
+     * @see #getConnection()
+     * @throws JournalException if initialization fails
+     */
+    protected void init() throws JournalException {
         if (driver == null) {
             String msg = "Driver not specified.";
             throw new JournalException(msg);
@@ -149,29 +199,36 @@
             String msg = "Connection URL not specified.";
             throw new JournalException(msg);
         }
-        try {
-            if (schema == null) {
+
+        if (schema == null) {
+            try {
                 schema = getSchemaFromURL(url);
+            } catch (IllegalArgumentException e) {
+                String msg = "Unable to derive schema from URL: " + e.getMessage();
+                throw new JournalException(msg);
             }
-            if (schemaObjectPrefix == null) {
-                schemaObjectPrefix = "";
-            }
-        } catch (IllegalArgumentException e) {
-            String msg = "Unable to derive schema from URL: " + e.getMessage();
-            throw new JournalException(msg);
         }
+
         try {
             Class.forName(driver);
-            con = DriverManager.getConnection(url, user, password);
-            con.setAutoCommit(true);
-
-            checkSchema();
-            prepareStatements();
-        } catch (Exception e) {
-            String msg = "Unable to initialize connection.";
+        } catch (ClassNotFoundException e) {
+            String msg = "Unable to load JDBC driver class.";
             throw new JournalException(msg, e);
         }
-        log.info("DatabaseJournal initialized at URL: " + url);
+    }
+
+    /**
+     * Creates a new database connection. This method is called inside
+     * {@link #init()} or when a connection has been dropped and must be
+     * reacquired. Base implementation uses <code>java.sql.DriverManager</code>
+     * to get the connection. May be overridden by subclasses.
+     *
+     * @see #init()
+     * @return new connection
+     * @throws SQLException if an error occurs
+     */
+    protected Connection getConnection() throws SQLException {
+        return DriverManager.getConnection(url, user, password);
     }
 
     /**
@@ -182,7 +239,7 @@
      * @return schema
      * @throws IllegalArgumentException if the JDBC connection URL is invalid
      */
-    private String getSchemaFromURL(String url) throws IllegalArgumentException {
+    private static String getSchemaFromURL(String url) throws IllegalArgumentException {
         int start = url.indexOf(':');
         if (start != -1) {
             int end = url.indexOf(':', start + 1);
@@ -200,6 +257,8 @@
             throws JournalException {
 
         try {
+            checkConnection();
+
             selectRevisionsStmt.clearParameters();
             selectRevisionsStmt.clearWarnings();
             selectRevisionsStmt.setLong(1, startRevision);
@@ -208,6 +267,8 @@
             return new DatabaseRecordIterator(
                     selectRevisionsStmt.getResultSet(), getResolver());
         } catch (SQLException e) {
+            close(true);
+
             String msg = "Unable to return record iterater.";
             throw new JournalException(msg, e);
         }
@@ -226,8 +287,12 @@
         boolean succeeded = false;
 
         try {
-            con.setAutoCommit(false);
+            checkConnection();
+
+            connection.setAutoCommit(false);
         } catch (SQLException e) {
+            close(true);
+
             String msg = "Unable to set autocommit to false.";
             throw new JournalException(msg, e);
         }
@@ -249,19 +314,15 @@
             succeeded = true;
 
         } catch (SQLException e) {
+            close(true);
+
             String msg = "Unable to lock global revision table.";
             throw new JournalException(msg, e);
         } finally {
             close(rs);
             if (!succeeded) {
-                rollback(con);
-
-                try {
-                    con.setAutoCommit(true);
-                } catch (SQLException e) {
-                    String msg = "Unable to set autocommit to true.";
-                    log.warn(msg, e);
-                }
+                rollback(connection);
+                setAutoCommit(connection, true);
             }
         }
     }
@@ -271,14 +332,9 @@
      */
     protected void doUnlock(boolean successful) {
         if (!successful) {
-            rollback(con);
-        }
-        try {
-            con.setAutoCommit(true);
-        } catch (SQLException e) {
-            String msg = "Unable to set autocommit to true.";
-            log.warn(msg, e);
+            rollback(connection);
         }
+        setAutoCommit(connection, true);
     }
 
     /**
@@ -299,6 +355,8 @@
             throws JournalException {
 
         try {
+            checkConnection();
+
             try {
                 insertRevisionStmt.clearParameters();
                 insertRevisionStmt.clearWarnings();
@@ -308,16 +366,13 @@
                 insertRevisionStmt.setBinaryStream(4, in, length);
                 insertRevisionStmt.execute();
 
-                con.commit();
+                connection.commit();
             } finally {
-                try {
-                    con.setAutoCommit(true);
-                } catch (SQLException e) {
-                    String msg = "Unable to set autocommit to true.";
-                    log.warn(msg, e);
-                }
+                setAutoCommit(connection, true);
             }
         } catch (SQLException e) {
+            close(true);
+
             String msg = "Unable to append revision " + lockedRevision + ".";
             throw new JournalException(msg, e);
         }
@@ -327,73 +382,154 @@
      * {@inheritDoc}
      */
     public void close() {
-        try {
-            con.close();
-        } catch (SQLException e) {
-            String msg = "Error while closing connection: " + e.getMessage();
-            log.warn(msg);
+        close(false);
+    }
+
+    /**
+     * Close database connections and statements. If closing was due to an
+     * error that occurred, calculates the next time a reconnect should
+     * be attempted.
+     *
+     * @param failure whether closing is due to a failure
+     */
+    private void close(boolean failure) {
+        if (failure) {
+            reconnectTimeMs = System.currentTimeMillis() + reconnectDelayMs;
         }
+
+        close(selectRevisionsStmt);
+        selectRevisionsStmt = null;
+        close(updateGlobalStmt);
+        updateGlobalStmt = null;
+        close(selectGlobalStmt);
+        selectGlobalStmt = null;
+        close(insertRevisionStmt);
+        insertRevisionStmt = null;
+
+        close(connection);
+        connection = null;
     }
 
     /**
-     * Close some input stream.
+     * Set the autocommit flag of a connection. Does nothing if the connection
+     * passed is <code>null</code> and logs any exception as warning.
+     *
+     * @param connection database connection
+     * @param autoCommit where to enable or disable autocommit
+     */
+    private static void setAutoCommit(Connection connection, boolean autoCommit) {
+        if (connection != null) {
+            try {
+                connection.setAutoCommit(autoCommit);
+            } catch (SQLException e) {
+                String msg = "Unable to set autocommit flag to " + autoCommit;
+                log.warn(msg, e);
+            }
+        }
+    }
+
+    /**
+     * Rollback a connection. Does nothing if the connection passed is
+     * <code>null</code> and logs any exception as warning.
+     *
+     * @param connection connection.
+     */
+    private static void rollback(Connection connection) {
+        if (connection != null) {
+            try {
+                connection.rollback();
+            } catch (SQLException e) {
+                String msg = "Error while rolling back connection: " + e.getMessage();
+                log.warn(msg);
+            }
+        }
+    }
+
+    /**
+     * Closes the given database connection. Does nothing if the connection
+     * passed is <code>null</code> and logs any exception as warning.
+     *
+     * @param connection database connection
+     */
+    private static void close(Connection connection) {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                String msg = "Error while closing connection: " + e.getMessage();
+                log.warn(msg);
+            }
+        }
+    }
+
+    /**
+     * Close some input stream.  Does nothing if the input stream
+     * passed is <code>null</code> and logs any exception as warning.
      *
      * @param in input stream, may be <code>null</code>.
      */
-    private void close(InputStream in) {
-        try {
-            if (in != null) {
+    private static void close(InputStream in) {
+        if (in != null) {
+            try {
                 in.close();
+            } catch (IOException e) {
+                String msg = "Error while closing input stream: " + e.getMessage();
+                log.warn(msg);
             }
-        } catch (IOException e) {
-            String msg = "Error while closing input stream: " + e.getMessage();
-            log.warn(msg);
         }
     }
 
     /**
-     * Close some statement.
+     * Close some statement.  Does nothing if the statement
+     * passed is <code>null</code> and logs any exception as warning.
      *
      * @param stmt statement, may be <code>null</code>.
      */
-    private void close(Statement stmt) {
-        try {
-            if (stmt != null) {
+    private static void close(Statement stmt) {
+        if (stmt != null) {
+            try {
                 stmt.close();
+            } catch (SQLException e) {
+                String msg = "Error while closing statement: " + e.getMessage();
+                log.warn(msg);
             }
-        } catch (SQLException e) {
-            String msg = "Error while closing statement: " + e.getMessage();
-            log.warn(msg);
         }
     }
 
     /**
-     * Close some resultset.
+     * Close some resultset.  Does nothing if the result set
+     * passed is <code>null</code> and logs any exception as warning.
      *
      * @param rs resultset, may be <code>null</code>.
      */
-    private void close(ResultSet rs) {
-        try {
-            if (rs != null) {
+    private static void close(ResultSet rs) {
+        if (rs != null) {
+            try {
                 rs.close();
+            } catch (SQLException e) {
+                String msg = "Error while closing result set: " + e.getMessage();
+                log.warn(msg);
             }
-        } catch (SQLException e) {
-            String msg = "Error while closing result set: " + e.getMessage();
-            log.warn(msg);
         }
     }
 
     /**
-     * Rollback a connection.
-     *
-     * @param con connection.
+     * Checks the currently established connection. If the connection no longer
+     * exists, waits until at least <code>reconnectTimeMs</code> have passed
+     * since the error occurred and recreates the connection.
      */
-    private void rollback(Connection con) {
-        try {
-            con.rollback();
-        } catch (SQLException e) {
-            String msg = "Error while rolling back connection: " + e.getMessage();
-            log.warn(msg);
+    private void checkConnection() throws SQLException {
+        if (connection == null) {
+            long delayMs = reconnectTimeMs - System.currentTimeMillis();
+            if (delayMs > 0) {
+                try {
+                    Thread.sleep(delayMs);
+                } catch (InterruptedException e) {
+                    /* ignore */
+                }
+            }
+            connection = getConnection();
+            prepareStatements();
         }
     }
 
@@ -404,7 +540,7 @@
      * @throws Exception if an error occurs
      */
     private void checkSchema() throws Exception {
-        DatabaseMetaData metaData = con.getMetaData();
+        DatabaseMetaData metaData = connection.getMetaData();
         String tableName = schemaObjectPrefix + "JOURNAL";
         if (metaData.storesLowerCaseIdentifiers()) {
             tableName = tableName.toLowerCase();
@@ -434,7 +570,7 @@
                 }
             }
             BufferedReader reader = new BufferedReader(new InputStreamReader(in));
-            Statement stmt = con.createStatement();
+            Statement stmt = connection.createStatement();
             try {
                 String sql = reader.readLine();
                 while (sql != null) {
@@ -461,17 +597,17 @@
      * @throws SQLException if an error occurs
      */
     private void prepareStatements() throws SQLException {
-        selectRevisionsStmt = con.prepareStatement(
+        selectRevisionsStmt = connection.prepareStatement(
                 "select REVISION_ID, JOURNAL_ID, PRODUCER_ID, REVISION_DATA " +
                 "from " + schemaObjectPrefix + "JOURNAL " +
                 "where REVISION_ID > ?");
-        updateGlobalStmt = con.prepareStatement(
+        updateGlobalStmt = connection.prepareStatement(
                 "update " + schemaObjectPrefix + "GLOBAL_REVISION " +
                 "set revision_id = revision_id + 1");
-        selectGlobalStmt = con.prepareStatement(
+        selectGlobalStmt = connection.prepareStatement(
                 "select revision_id " +
                 "from " + schemaObjectPrefix + "GLOBAL_REVISION");
-        insertRevisionStmt = con.prepareStatement(
+        insertRevisionStmt = connection.prepareStatement(
                 "insert into " + schemaObjectPrefix + "JOURNAL" +
                 "(REVISION_ID, JOURNAL_ID, PRODUCER_ID, REVISION_DATA) " +
                 "values (?,?,?,?)");
@@ -504,6 +640,10 @@
         return password;
     }
 
+    public long getReconnectDelayMs() {
+        return reconnectDelayMs;
+    }
+
     /**
      * Bean setters
      */
@@ -529,5 +669,9 @@
 
     public void setPassword(String password) {
         this.password = password;
+    }
+
+    public void setReconnectDelayMs(long reconnectDelayMs) {
+        this.reconnectDelayMs = reconnectDelayMs;
     }
 }



Mime
View raw message