From commits-return-4054-apmail-jackrabbit-commits-archive=jackrabbit.apache.org@jackrabbit.apache.org Thu May 24 09:53:02 2007 Return-Path: Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: (qmail 41364 invoked from network); 24 May 2007 09:53:01 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 24 May 2007 09:53:01 -0000 Received: (qmail 75928 invoked by uid 500); 24 May 2007 09:53:06 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 75902 invoked by uid 500); 24 May 2007 09:53:06 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 75879 invoked by uid 99); 24 May 2007 09:53:06 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 May 2007 02:53:06 -0700 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 May 2007 02:52:59 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id E900A1A981A; Thu, 24 May 2007 02:52:38 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r541244 - /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java Date: Thu, 24 May 2007 09:52:38 -0000 To: commits@jackrabbit.apache.org From: dpfister@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070524095238.E900A1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dpfister Date: Thu May 24 02:52:38 2007 New Revision: 541244 URL: http://svn.apache.org/viewvc?view=rev&rev=541244 Log: JCR-927: DatabaseJournal needs connection reestablishment logic Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java?view=diff&rev=541244&r1=541243&r2=541244 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java Thu May 24 02:52:38 2007 @@ -53,6 +53,8 @@ * defaults to an empty string *
  • user: username to specify when connecting
  • *
  • password: password to specify when connecting
  • + *
  • reconnectDelayMs: number of milliseconds to wait before + * trying to reconnect to the database. * */ public class DatabaseJournal extends AbstractJournal { @@ -69,6 +71,11 @@ private static final String DEFAULT_DDL_NAME = "default.ddl"; /** + * Default reconnect delay in milliseconds. + */ + private static final long DEFAULT_RECONNECT_DELAY_MS = 10000; + + /** * Logger. */ private static Logger log = LoggerFactory.getLogger(DatabaseJournal.class); @@ -104,9 +111,14 @@ private String password; /** + * Reconnect delay in milliseconds, bean property. + */ + private long reconnectDelayMs; + + /** * JDBC Connection used. */ - private Connection con; + private Connection connection; /** * Statement returning all revisions within a range. @@ -134,6 +146,11 @@ private long lockedRevision; /** + * Next time in milliseconds to reattempt connecting to the database. + */ + private long reconnectTimeMs; + + /** * {@inheritDoc} */ public void init(String id, NamespaceResolver resolver) @@ -141,6 +158,39 @@ super.init(id, resolver); + // Provide valid defaults for arguments + if (schemaObjectPrefix == null) { + schemaObjectPrefix = ""; + } + if (reconnectDelayMs == 0) { + reconnectDelayMs = DEFAULT_RECONNECT_DELAY_MS; + } + + init(); + + try { + connection = getConnection(); + connection.setAutoCommit(true); + checkSchema(); + prepareStatements(); + } catch (Exception e) { + String msg = "Unable to create connection."; + throw new JournalException(msg, e); + } + log.info("DatabaseJournal initialized."); + } + + /** + * Completes initialization of this database journal. Base implementation + * checks whether the required bean properties driver and + * url have been specified and optionally deduces a valid + * schema. Should be overridden by subclasses that use a different way to + * create a connection and therefore require other arguments. + * + * @see #getConnection() + * @throws JournalException if initialization fails + */ + protected void init() throws JournalException { if (driver == null) { String msg = "Driver not specified."; throw new JournalException(msg); @@ -149,29 +199,36 @@ String msg = "Connection URL not specified."; throw new JournalException(msg); } - try { - if (schema == null) { + + if (schema == null) { + try { schema = getSchemaFromURL(url); + } catch (IllegalArgumentException e) { + String msg = "Unable to derive schema from URL: " + e.getMessage(); + throw new JournalException(msg); } - if (schemaObjectPrefix == null) { - schemaObjectPrefix = ""; - } - } catch (IllegalArgumentException e) { - String msg = "Unable to derive schema from URL: " + e.getMessage(); - throw new JournalException(msg); } + try { Class.forName(driver); - con = DriverManager.getConnection(url, user, password); - con.setAutoCommit(true); - - checkSchema(); - prepareStatements(); - } catch (Exception e) { - String msg = "Unable to initialize connection."; + } catch (ClassNotFoundException e) { + String msg = "Unable to load JDBC driver class."; throw new JournalException(msg, e); } - log.info("DatabaseJournal initialized at URL: " + url); + } + + /** + * Creates a new database connection. This method is called inside + * {@link #init()} or when a connection has been dropped and must be + * reacquired. Base implementation uses java.sql.DriverManager + * to get the connection. May be overridden by subclasses. + * + * @see #init() + * @return new connection + * @throws SQLException if an error occurs + */ + protected Connection getConnection() throws SQLException { + return DriverManager.getConnection(url, user, password); } /** @@ -182,7 +239,7 @@ * @return schema * @throws IllegalArgumentException if the JDBC connection URL is invalid */ - private String getSchemaFromURL(String url) throws IllegalArgumentException { + private static String getSchemaFromURL(String url) throws IllegalArgumentException { int start = url.indexOf(':'); if (start != -1) { int end = url.indexOf(':', start + 1); @@ -200,6 +257,8 @@ throws JournalException { try { + checkConnection(); + selectRevisionsStmt.clearParameters(); selectRevisionsStmt.clearWarnings(); selectRevisionsStmt.setLong(1, startRevision); @@ -208,6 +267,8 @@ return new DatabaseRecordIterator( selectRevisionsStmt.getResultSet(), getResolver()); } catch (SQLException e) { + close(true); + String msg = "Unable to return record iterater."; throw new JournalException(msg, e); } @@ -226,8 +287,12 @@ boolean succeeded = false; try { - con.setAutoCommit(false); + checkConnection(); + + connection.setAutoCommit(false); } catch (SQLException e) { + close(true); + String msg = "Unable to set autocommit to false."; throw new JournalException(msg, e); } @@ -249,19 +314,15 @@ succeeded = true; } catch (SQLException e) { + close(true); + String msg = "Unable to lock global revision table."; throw new JournalException(msg, e); } finally { close(rs); if (!succeeded) { - rollback(con); - - try { - con.setAutoCommit(true); - } catch (SQLException e) { - String msg = "Unable to set autocommit to true."; - log.warn(msg, e); - } + rollback(connection); + setAutoCommit(connection, true); } } } @@ -271,14 +332,9 @@ */ protected void doUnlock(boolean successful) { if (!successful) { - rollback(con); - } - try { - con.setAutoCommit(true); - } catch (SQLException e) { - String msg = "Unable to set autocommit to true."; - log.warn(msg, e); + rollback(connection); } + setAutoCommit(connection, true); } /** @@ -299,6 +355,8 @@ throws JournalException { try { + checkConnection(); + try { insertRevisionStmt.clearParameters(); insertRevisionStmt.clearWarnings(); @@ -308,16 +366,13 @@ insertRevisionStmt.setBinaryStream(4, in, length); insertRevisionStmt.execute(); - con.commit(); + connection.commit(); } finally { - try { - con.setAutoCommit(true); - } catch (SQLException e) { - String msg = "Unable to set autocommit to true."; - log.warn(msg, e); - } + setAutoCommit(connection, true); } } catch (SQLException e) { + close(true); + String msg = "Unable to append revision " + lockedRevision + "."; throw new JournalException(msg, e); } @@ -327,73 +382,154 @@ * {@inheritDoc} */ public void close() { - try { - con.close(); - } catch (SQLException e) { - String msg = "Error while closing connection: " + e.getMessage(); - log.warn(msg); + close(false); + } + + /** + * Close database connections and statements. If closing was due to an + * error that occurred, calculates the next time a reconnect should + * be attempted. + * + * @param failure whether closing is due to a failure + */ + private void close(boolean failure) { + if (failure) { + reconnectTimeMs = System.currentTimeMillis() + reconnectDelayMs; } + + close(selectRevisionsStmt); + selectRevisionsStmt = null; + close(updateGlobalStmt); + updateGlobalStmt = null; + close(selectGlobalStmt); + selectGlobalStmt = null; + close(insertRevisionStmt); + insertRevisionStmt = null; + + close(connection); + connection = null; } /** - * Close some input stream. + * Set the autocommit flag of a connection. Does nothing if the connection + * passed is null and logs any exception as warning. + * + * @param connection database connection + * @param autoCommit where to enable or disable autocommit + */ + private static void setAutoCommit(Connection connection, boolean autoCommit) { + if (connection != null) { + try { + connection.setAutoCommit(autoCommit); + } catch (SQLException e) { + String msg = "Unable to set autocommit flag to " + autoCommit; + log.warn(msg, e); + } + } + } + + /** + * Rollback a connection. Does nothing if the connection passed is + * null and logs any exception as warning. + * + * @param connection connection. + */ + private static void rollback(Connection connection) { + if (connection != null) { + try { + connection.rollback(); + } catch (SQLException e) { + String msg = "Error while rolling back connection: " + e.getMessage(); + log.warn(msg); + } + } + } + + /** + * Closes the given database connection. Does nothing if the connection + * passed is null and logs any exception as warning. + * + * @param connection database connection + */ + private static void close(Connection connection) { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + String msg = "Error while closing connection: " + e.getMessage(); + log.warn(msg); + } + } + } + + /** + * Close some input stream. Does nothing if the input stream + * passed is null and logs any exception as warning. * * @param in input stream, may be null. */ - private void close(InputStream in) { - try { - if (in != null) { + private static void close(InputStream in) { + if (in != null) { + try { in.close(); + } catch (IOException e) { + String msg = "Error while closing input stream: " + e.getMessage(); + log.warn(msg); } - } catch (IOException e) { - String msg = "Error while closing input stream: " + e.getMessage(); - log.warn(msg); } } /** - * Close some statement. + * Close some statement. Does nothing if the statement + * passed is null and logs any exception as warning. * * @param stmt statement, may be null. */ - private void close(Statement stmt) { - try { - if (stmt != null) { + private static void close(Statement stmt) { + if (stmt != null) { + try { stmt.close(); + } catch (SQLException e) { + String msg = "Error while closing statement: " + e.getMessage(); + log.warn(msg); } - } catch (SQLException e) { - String msg = "Error while closing statement: " + e.getMessage(); - log.warn(msg); } } /** - * Close some resultset. + * Close some resultset. Does nothing if the result set + * passed is null and logs any exception as warning. * * @param rs resultset, may be null. */ - private void close(ResultSet rs) { - try { - if (rs != null) { + private static void close(ResultSet rs) { + if (rs != null) { + try { rs.close(); + } catch (SQLException e) { + String msg = "Error while closing result set: " + e.getMessage(); + log.warn(msg); } - } catch (SQLException e) { - String msg = "Error while closing result set: " + e.getMessage(); - log.warn(msg); } } /** - * Rollback a connection. - * - * @param con connection. + * Checks the currently established connection. If the connection no longer + * exists, waits until at least reconnectTimeMs have passed + * since the error occurred and recreates the connection. */ - private void rollback(Connection con) { - try { - con.rollback(); - } catch (SQLException e) { - String msg = "Error while rolling back connection: " + e.getMessage(); - log.warn(msg); + private void checkConnection() throws SQLException { + if (connection == null) { + long delayMs = reconnectTimeMs - System.currentTimeMillis(); + if (delayMs > 0) { + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + /* ignore */ + } + } + connection = getConnection(); + prepareStatements(); } } @@ -404,7 +540,7 @@ * @throws Exception if an error occurs */ private void checkSchema() throws Exception { - DatabaseMetaData metaData = con.getMetaData(); + DatabaseMetaData metaData = connection.getMetaData(); String tableName = schemaObjectPrefix + "JOURNAL"; if (metaData.storesLowerCaseIdentifiers()) { tableName = tableName.toLowerCase(); @@ -434,7 +570,7 @@ } } BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - Statement stmt = con.createStatement(); + Statement stmt = connection.createStatement(); try { String sql = reader.readLine(); while (sql != null) { @@ -461,17 +597,17 @@ * @throws SQLException if an error occurs */ private void prepareStatements() throws SQLException { - selectRevisionsStmt = con.prepareStatement( + selectRevisionsStmt = connection.prepareStatement( "select REVISION_ID, JOURNAL_ID, PRODUCER_ID, REVISION_DATA " + "from " + schemaObjectPrefix + "JOURNAL " + "where REVISION_ID > ?"); - updateGlobalStmt = con.prepareStatement( + updateGlobalStmt = connection.prepareStatement( "update " + schemaObjectPrefix + "GLOBAL_REVISION " + "set revision_id = revision_id + 1"); - selectGlobalStmt = con.prepareStatement( + selectGlobalStmt = connection.prepareStatement( "select revision_id " + "from " + schemaObjectPrefix + "GLOBAL_REVISION"); - insertRevisionStmt = con.prepareStatement( + insertRevisionStmt = connection.prepareStatement( "insert into " + schemaObjectPrefix + "JOURNAL" + "(REVISION_ID, JOURNAL_ID, PRODUCER_ID, REVISION_DATA) " + "values (?,?,?,?)"); @@ -504,6 +640,10 @@ return password; } + public long getReconnectDelayMs() { + return reconnectDelayMs; + } + /** * Bean setters */ @@ -529,5 +669,9 @@ public void setPassword(String password) { this.password = password; + } + + public void setReconnectDelayMs(long reconnectDelayMs) { + this.reconnectDelayMs = reconnectDelayMs; } }