Return-Path: Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: (qmail 31798 invoked from network); 20 Feb 2007 16:07:16 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 20 Feb 2007 16:07:16 -0000 Received: (qmail 6128 invoked by uid 500); 20 Feb 2007 16:07:24 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 6088 invoked by uid 500); 20 Feb 2007 16:07:24 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 6079 invoked by uid 99); 20 Feb 2007 16:07:23 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2007 08:07:23 -0800 X-ASF-Spam-Status: No, hits=-98.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Feb 2007 08:07:12 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id E37041A9819; Tue, 20 Feb 2007 08:06:51 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r509624 [2/3] - in /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core: ./ cluster/ config/ journal/ state/ Date: Tue, 20 Feb 2007 16:06:45 -0000 To: commits@jackrabbit.apache.org From: dpfister@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070220160651.E37041A9819@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseJournal.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,528 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.apache.jackrabbit.name.NamespaceResolver; +import org.apache.jackrabbit.util.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.DatabaseMetaData; +import java.sql.Statement; + +/** + * Database-based journal implementation. Stores records inside a database table named + * JOURNAL, whereas the table GLOBAL_REVISION contains the + * highest available revision number. These tables are located inside the schema specified + * in schemaObjectPrefix. + *

+ * It is configured through the following properties: + *

    + *
  • revision: the filename where the parent cluster node's revision + * file should be written to; this is a required property with no default value
  • + *
  • driver: the JDBC driver class name to use; this is a required + * property with no default value
  • + *
  • url: the JDBC connection url; this is a required property with + * no default value
  • + *
  • schema: the schema to be used; if not specified, this is the + * second field inside the JDBC connection url, delimeted by colons
  • + *
  • schemaObjectPrefix: the schema object prefix to be used; + * defaults to an empty string
  • + *
  • user: username to specify when connecting
  • + *
  • password: password to specify when connecting
  • + *
+ */ +public class DatabaseJournal extends AbstractJournal { + + /** + * Schema object prefix. + */ + private static final String SCHEMA_OBJECT_PREFIX_VARIABLE = + "${schemaObjectPrefix}"; + + /** + * Default DDL script name. + */ + private static final String DEFAULT_DDL_NAME = "default.ddl"; + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(DatabaseJournal.class); + + /** + * Driver name, bean property. + */ + private String driver; + + /** + * Connection URL, bean property. + */ + private String url; + + /** + * Schema name, bean property. + */ + private String schema; + + /** + * Schema object prefix, bean property. + */ + protected String schemaObjectPrefix; + + /** + * User name, bean property. + */ + private String user; + + /** + * Password, bean property. + */ + private String password; + + /** + * JDBC Connection used. + */ + private Connection con; + + /** + * Statement returning all revisions within a range. + */ + private PreparedStatement selectRevisionsStmt; + + /** + * Statement updating the global revision. + */ + private PreparedStatement updateGlobalStmt; + + /** + * Statement returning the global revision. + */ + private PreparedStatement selectGlobalStmt; + + /** + * Statement appending a new record. + */ + private PreparedStatement insertRevisionStmt; + + /** + * Locked revision. + */ + private long lockedRevision; + + /** + * {@inheritDoc} + */ + public void init(String id, NamespaceResolver resolver) + throws JournalException { + + super.init(id, resolver); + + if (driver == null) { + String msg = "Driver not specified."; + throw new JournalException(msg); + } + if (url == null) { + String msg = "Connection URL not specified."; + throw new JournalException(msg); + } + try { + if (schema == null) { + schema = getSchemaFromURL(url); + } + if (schemaObjectPrefix == null) { + schemaObjectPrefix = ""; + } + } catch (IllegalArgumentException e) { + String msg = "Unable to derive schema from URL: " + e.getMessage(); + throw new JournalException(msg); + } + try { + Class.forName(driver); + con = DriverManager.getConnection(url, user, password); + con.setAutoCommit(true); + + checkSchema(); + prepareStatements(); + } catch (Exception e) { + String msg = "Unable to initialize connection."; + throw new JournalException(msg, e); + } + log.info("DatabaseJournal initialized at URL: " + url); + } + + /** + * Derive a schema from a JDBC connection URL. This simply treats the given URL + * as delimeted by colons and takes the 2nd field. + * + * @param url JDBC connection URL + * @return schema + * @throws IllegalArgumentException if the JDBC connection URL is invalid + */ + private String getSchemaFromURL(String url) throws IllegalArgumentException { + int start = url.indexOf(':'); + if (start != -1) { + int end = url.indexOf(':', start + 1); + if (end != -1) { + return url.substring(start + 1, end); + } + } + throw new IllegalArgumentException(url); + } + + /** + * {@inheritDoc} + */ + protected RecordIterator getRecords(long startRevision) + throws JournalException { + + try { + selectRevisionsStmt.clearParameters(); + selectRevisionsStmt.clearWarnings(); + selectRevisionsStmt.setLong(1, startRevision); + selectRevisionsStmt.execute(); + + return new DatabaseRecordIterator( + selectRevisionsStmt.getResultSet(), getResolver()); + } catch (SQLException e) { + String msg = "Unable to return record iterater."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + protected void doLock() throws JournalException { + ResultSet rs = null; + boolean succeeded = false; + + try { + con.setAutoCommit(false); + } catch (SQLException e) { + String msg = "Unable to set autocommit to false."; + throw new JournalException(msg, e); + } + + try { + updateGlobalStmt.clearParameters(); + updateGlobalStmt.clearWarnings(); + updateGlobalStmt.execute(); + + selectGlobalStmt.clearParameters(); + selectGlobalStmt.clearWarnings(); + selectGlobalStmt.execute(); + + rs = selectGlobalStmt.getResultSet(); + if (!rs.next()) { + throw new JournalException("No revision available."); + } + lockedRevision = rs.getLong(1); + succeeded = true; + + } catch (SQLException e) { + String msg = "Unable to lock global revision table."; + throw new JournalException(msg, e); + } finally { + close(rs); + if (!succeeded) { + rollback(con); + + try { + con.setAutoCommit(true); + } catch (SQLException e) { + String msg = "Unable to set autocommit to true."; + log.warn(msg, e); + } + } + } + } + + /** + * {@inheritDoc} + */ + protected void doUnlock(boolean successful) { + if (!successful) { + rollback(con); + } + try { + con.setAutoCommit(true); + } catch (SQLException e) { + String msg = "Unable to set autocommit to true."; + log.warn(msg, e); + } + } + + /** + * {@inheritDoc} + */ + protected long append(String producerId, File file) throws JournalException { + try { + InputStream in = new BufferedInputStream(new FileInputStream(file)); + + try { + insertRevisionStmt.clearParameters(); + insertRevisionStmt.clearWarnings(); + insertRevisionStmt.setLong(1, lockedRevision); + insertRevisionStmt.setString(2, getId()); + insertRevisionStmt.setString(3, producerId); + insertRevisionStmt.setBinaryStream(4, in, (int) file.length()); + insertRevisionStmt.execute(); + + con.commit(); + return lockedRevision; + } finally { + close(in); + + try { + con.setAutoCommit(true); + } catch (SQLException e) { + String msg = "Unable to set autocommit to true."; + log.warn(msg, e); + } + } + } catch (IOException e) { + String msg = "Unable to open journal log '" + file + "'."; + throw new JournalException(msg, e); + } catch (SQLException e) { + String msg = "Unable to append revision " + lockedRevision + "."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void close() { + try { + con.close(); + } catch (SQLException e) { + String msg = "Error while closing connection: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Close some input stream. + * + * @param in input stream, may be null. + */ + private void close(InputStream in) { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + String msg = "Error while closing input stream: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Close some statement. + * + * @param stmt statement, may be null. + */ + private void close(Statement stmt) { + try { + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + String msg = "Error while closing statement: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Close some resultset. + * + * @param rs resultset, may be null. + */ + private void close(ResultSet rs) { + try { + if (rs != null) { + rs.close(); + } + } catch (SQLException e) { + String msg = "Error while closing result set: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Rollback a connection. + * + * @param con connection. + */ + private void rollback(Connection con) { + try { + con.rollback(); + } catch (SQLException e) { + String msg = "Error while rolling back connection: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Checks if the required schema objects exist and creates them if they + * don't exist yet. + * + * @throws Exception if an error occurs + */ + private void checkSchema() throws Exception { + DatabaseMetaData metaData = con.getMetaData(); + String tableName = schemaObjectPrefix + "JOURNAL"; + if (metaData.storesLowerCaseIdentifiers()) { + tableName = tableName.toLowerCase(); + } else if (metaData.storesUpperCaseIdentifiers()) { + tableName = tableName.toUpperCase(); + } + + ResultSet rs = metaData.getTables(null, null, tableName, null); + boolean schemaExists; + try { + schemaExists = rs.next(); + } finally { + rs.close(); + } + + if (!schemaExists) { + // read ddl from resources + InputStream in = DatabaseJournal.class.getResourceAsStream(schema + ".ddl"); + if (in == null) { + String msg = "No schema-specific DDL found: '" + schema + ".ddl" + + "', falling back to '" + DEFAULT_DDL_NAME + "'."; + log.info(msg); + in = DatabaseJournal.class.getResourceAsStream(DEFAULT_DDL_NAME); + if (in == null) { + msg = "Unable to load '" + DEFAULT_DDL_NAME + "'."; + throw new JournalException(msg); + } + } + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + Statement stmt = con.createStatement(); + try { + String sql = reader.readLine(); + while (sql != null) { + // Skip comments and empty lines + if (!sql.startsWith("#") && sql.length() > 0) { + // replace prefix variable + sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix); + // execute sql stmt + stmt.executeUpdate(sql); + } + // read next sql stmt + sql = reader.readLine(); + } + // commit the changes + con.commit(); + } finally { + close(in); + close(stmt); + } + } + } + + /** + * Builds and prepares the SQL statements. + * + * @throws SQLException if an error occurs + */ + private void prepareStatements() throws SQLException { + selectRevisionsStmt = con.prepareStatement( + "select REVISION_ID, JOURNAL_ID, PRODUCER_ID, REVISION_DATA " + + "from " + schemaObjectPrefix + "JOURNAL " + + "where REVISION_ID > ?"); + updateGlobalStmt = con.prepareStatement( + "update " + schemaObjectPrefix + "GLOBAL_REVISION " + + "set revision_id = revision_id + 1"); + selectGlobalStmt = con.prepareStatement( + "select revision_id " + + "from " + schemaObjectPrefix + "GLOBAL_REVISION"); + insertRevisionStmt = con.prepareStatement( + "insert into " + schemaObjectPrefix + "JOURNAL" + + "(REVISION_ID, JOURNAL_ID, PRODUCER_ID, REVISION_DATA) " + + "values (?,?,?,?)"); + } + + /** + * Bean getters + */ + public String getDriver() { + return driver; + } + + public String getUrl() { + return url; + } + + public String getSchema() { + return schema; + } + + public String getSchemaObjectPrefix() { + return schemaObjectPrefix; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } + + /** + * Bean setters + */ + public void setDriver(String driver) { + this.driver = driver; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public void setSchemaObjectPrefix(String schemaObjectPrefix) { + this.schemaObjectPrefix = schemaObjectPrefix.toUpperCase(); + } + + public void setUser(String user) { + this.user = user; + } + + public void setPassword(String password) { + this.password = password; + } +} Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseRecordIterator.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseRecordIterator.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseRecordIterator.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DatabaseRecordIterator.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.jackrabbit.name.NamespaceResolver; + +import java.util.NoSuchElementException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.io.DataInputStream; +import java.io.IOException; + +/** + * RecordIterator interface. + */ +class DatabaseRecordIterator implements RecordIterator { + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(DatabaseJournal.class); + + /** + * Underlying result set. + */ + private final ResultSet rs; + + /** + * Namespace resolver. + */ + private final NamespaceResolver resolver; + + /** + * Current record. + */ + private ReadRecord record; + + /** + * Last record returned. + */ + private ReadRecord lastRecord; + + /** + * Flag indicating whether EOF was reached. + */ + private boolean isEOF; + + /** + * Create a new instance of this class. + */ + public DatabaseRecordIterator(ResultSet rs, NamespaceResolver resolver) { + this.rs = rs; + this.resolver = resolver; + } + + /** + * {@inheritDoc} + */ + public boolean hasNext() { + try { + if (!isEOF && record == null) { + fetchRecord(); + } + return !isEOF; + } catch (SQLException e) { + String msg = "Error while moving to next record."; + log.error(msg, e); + return false; + } + } + + /** + * Return the next record. If there are no more recors, throws + * a NoSuchElementException. If an error occurs, + * throws a JournalException. + * + * @return next record + * @throws java.util.NoSuchElementException if there are no more records + * @throws JournalException if another error occurs + */ + public Record nextRecord() throws NoSuchElementException, JournalException { + if (!hasNext()) { + String msg = "No current record."; + throw new NoSuchElementException(msg); + } + close(lastRecord); + lastRecord = record; + record = null; + + return lastRecord; + } + + /** + * {@inheritDoc} + */ + public void close() { + if (lastRecord != null) { + close(lastRecord); + lastRecord = null; + } + try { + rs.close(); + } catch (SQLException e) { + String msg = "Error while closing result set: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Fetch the next record. + */ + private void fetchRecord() throws SQLException { + if (rs.next()) { + long revision = rs.getLong(1); + String journalId = rs.getString(2); + String producerId = rs.getString(3); + DataInputStream dataIn = new DataInputStream(rs.getBinaryStream(4)); + record = new ReadRecord(journalId, producerId, revision, dataIn, 0, resolver); + } else { + isEOF = true; + } + } + + /** + * Close a record. + * + * @param record record + */ + private static void close(ReadRecord record) { + if (record != null) { + try { + record.close(); + } catch (IOException e) { + String msg = "Error while closing record."; + log.warn(msg, e); + } + } + } +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DefaultRecordProducer.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DefaultRecordProducer.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DefaultRecordProducer.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/DefaultRecordProducer.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +/** + * Produces new records that can be appended to the journal. + */ +public class DefaultRecordProducer implements RecordProducer { + + /** + * Journal. + */ + private final AbstractJournal journal; + + /** + * Producer identifier. + */ + private String id; + + /** + * Create a new instance of this class. + * + * @param journal journal + * @param id producer id + */ + public DefaultRecordProducer(AbstractJournal journal, String id) { + this.journal = journal; + this.id = id; + } + + /** + * {@inheritDoc} + */ + public Record append() throws JournalException { + Record record = null; + + journal.lockAndSync(); + + try { + record = createRecord(); + return record; + } finally { + if (record == null) { + journal.unlock(false); + } + } + } + + /** + * Create a new record. May be overridden by subclasses. + * + * @throws JournalException if an error occurs + */ + protected AppendRecord createRecord() throws JournalException { + return new AppendRecord(journal, id); + } +} Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileJournal.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.jackrabbit.name.NamespaceResolver; + +import java.util.Arrays; +import java.util.Comparator; +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; + +/** + * Default file-based journal implementation. + */ +public class FileJournal extends AbstractJournal { + + /** + * Global revision counter name, located in the journal directory. + */ + private static final String REVISION_NAME = "revision"; + + /** + * Log extension. + */ + private static final String LOG_EXTENSION = "log"; + + /** + * Default base name for journal files. + */ + private static final String DEFAULT_BASENAME = "journal"; + + /** + * Default max size of a journal file (1MB). + */ + private static final int DEFAULT_MAXSIZE = 1048576; + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(FileJournal.class); + + /** + * Directory name, bean property. + */ + private String directory; + + /** + * Journal file base name, bean property. + */ + private String basename; + + /** + * Maximum size of a journal file before a rotation takes place, bean property. + */ + private int maximumSize; + + /** + * Journal root directory. + */ + private File rootDirectory; + + /** + * Journal file. + */ + private File journalFile; + + /** + * Global revision counter. + */ + private LockableFileRevision globalRevision; + + /** + * {@inheritDoc} + */ + public void init(String id, NamespaceResolver resolver) throws JournalException { + super.init(id, resolver); + + if (directory == null) { + String msg = "Directory not specified."; + throw new JournalException(msg); + } + if (basename == null) { + basename = DEFAULT_BASENAME; + } + if (maximumSize == 0) { + maximumSize = DEFAULT_MAXSIZE; + } + rootDirectory = new File(directory); + if (!rootDirectory.exists() || !rootDirectory.isDirectory()) { + String msg = "Directory specified does either not exist " + + "or is not a directory: " + directory; + throw new JournalException(msg); + } + + journalFile = new File(rootDirectory, basename + "." + LOG_EXTENSION); + globalRevision = new LockableFileRevision(new File(rootDirectory, REVISION_NAME)); + + log.info("FileJournal initialized at path: " + directory); + } + + /** + * {@inheritDoc} + */ + protected long getRevision() throws JournalException { + return globalRevision.get(); + } + + /** + * {@inheritDoc} + */ + protected RecordIterator getRecords(long startRevision) + throws JournalException { + + long stopRevision = getRevision(); + + File[] logFiles = null; + if (startRevision < stopRevision) { + logFiles = rootDirectory.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith(basename + "."); + } + }); + Arrays.sort(logFiles, new Comparator() { + public int compare(Object o1, Object o2) { + File f1 = (File) o1; + File f2 = (File) o2; + return f1.compareTo(f2); + } + }); + } + return new FileRecordIterator(logFiles, startRevision, stopRevision, getResolver()); + } + + /** + * {@inheritDoc} + */ + protected void doLock() throws JournalException { + globalRevision.lock(false); + } + + /** + * {@inheritDoc} + */ + protected long append(String producerId, File file) throws JournalException { + try { + FileRecordLog recordLog = new FileRecordLog(journalFile); + if (recordLog.exceeds(maximumSize)) { + switchLogs(); + recordLog = new FileRecordLog(journalFile); + } + if (recordLog.isNew()) { + recordLog.init(globalRevision.get()); + } + long revision = recordLog.append(getId(), producerId, file); + globalRevision.set(revision); + return revision; + + } catch (IOException e) { + String msg = "Unable to append new record to journal '" + journalFile + "'."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + protected void doUnlock(boolean successful) { + globalRevision.unlock(); + } + + /** + * {@inheritDoc} + */ + public void close() {} + + /** + * Bean getters + */ + public String getDirectory() { + return directory; + } + + public String getBasename() { + return basename; + } + + public int getMaximumSize() { + return maximumSize; + } + + /** + * Bean setters + */ + public void setDirectory(String directory) { + this.directory = directory; + } + + public void setBasename(String basename) { + this.basename = basename; + } + + public void setMaximumSize(int maximumSize) { + this.maximumSize = maximumSize; + } + + /** + * Move away current journal file (and all other files), incrementing their + * version counter. A file named journal.N.log gets renamed to + * journal.(N+1).log, whereas the main journal file gets renamed + * to journal.1.log. + */ + private void switchLogs() { + FilenameFilter filter = new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith(basename + "."); + } + }; + File[] files = rootDirectory.listFiles(filter); + Arrays.sort(files, new Comparator() { + public int compare(Object o1, Object o2) { + File f1 = (File) o1; + File f2 = (File) o2; + return f2.compareTo(f1); + } + }); + for (int i = 0; i < files.length; i++) { + File file = files[i]; + String name = file.getName(); + int sep = name.lastIndexOf('.'); + if (sep != -1) { + String ext = name.substring(sep + 1); + if (ext.equals(LOG_EXTENSION)) { + file.renameTo(new File(rootDirectory, name + ".1")); + } else { + try { + int version = Integer.parseInt(ext); + String newName = name.substring(0, sep + 1) + + String.valueOf(version + 1); + file.renameTo(new File(rootDirectory, newName)); + } catch (NumberFormatException e) { + log.warn("Bogusly named journal file, skipped: " + file); + } + } + } + } + } +} Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordIterator.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordIterator.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordIterator.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordIterator.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.apache.jackrabbit.name.NamespaceResolver; + +import java.io.File; +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Record cursor that returns unseen revisions in ascending order on every + * iteration. + */ +class FileRecordIterator implements RecordIterator { + + /** + * Log files to scan for revisions. + */ + private File[] logFiles; + + /** + * Current revision being visited. + */ + private long revision; + + /** + * Last revision to visit. + */ + private long stopRevision; + + /** + * Namespace resolver. + */ + private NamespaceResolver resolver; + + /** + * Current record log, containing file records. + */ + private FileRecordLog recordLog; + + /** + * Current record. + */ + private ReadRecord record; + + /** + * Creates a new instance of this class. + * + * @param logFiles available log files, sorted ascending by age + * @param startRevision start point (exclusive) + * @param stopRevision stop point (inclusive) + */ + public FileRecordIterator(File[] logFiles, long startRevision, long stopRevision, + NamespaceResolver resolver) { + this.logFiles = logFiles; + this.revision = startRevision; + this.stopRevision = stopRevision; + this.resolver = resolver; + } + + + /** + * Return a flag indicating whether there are next records. + */ + public boolean hasNext() { + return revision < stopRevision; + } + + /** + * {@inheritDoc} + */ + public Record nextRecord() throws NoSuchElementException, JournalException { + if (!hasNext()) { + String msg = "No next revision."; + throw new NoSuchElementException(msg); + } + try { + if (record != null) { + record.close(); + record = null; + } + } catch (IOException e) { + close(); + String msg = "Unable to skip over record."; + throw new JournalException(msg, e); + } + + if (recordLog != null) { + if (!recordLog.contains(revision)) { + recordLog.close(); + recordLog = null; + } + } + + try { + if (recordLog == null) { + recordLog = getRecordLog(revision); + } + } catch (IOException e) { + String msg = "Unable to open record log with revision: " + revision; + throw new JournalException(msg, e); + } + + try { + record = recordLog.read(resolver); + revision = record.getRevision(); + return record; + } catch (IOException e) { + String msg = "Unable to read record with revision: " + revision; + throw new JournalException(msg, e); + } + } + + /** + * Close this cursor, releasing its resources. + */ + public void close() { + if (recordLog != null) { + recordLog.close(); + } + } + + /** + * Return record log containing a given revision. + * + * @param revision revision to locate + * @return record log containing that revision + * @throws IOException if an I/O error occurs + */ + private FileRecordLog getRecordLog(long revision) throws IOException { + for (int i = 0; i < logFiles.length; i++) { + FileRecordLog recordLog = new FileRecordLog(logFiles[i]); + if (recordLog.contains(revision)) { + recordLog.seek(revision); + return recordLog; + } + } + String msg = "No log file found containing revision: " + revision; + throw new IOException(msg); + } +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRecordLog.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.jackrabbit.name.NamespaceResolver; + +import java.io.File; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.FileInputStream; +import java.io.BufferedInputStream; +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * A file record log is a file containing {@link Record}s. Physically, + * the first 4 bytes contain a signature, followed by a major and minor version + * (2 bytes each). The next 8 bytes contain the revision this log starts with. + * After this, zero or more ReadRecords follow. + */ +class FileRecordLog { + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(FileRecordLog.class); + + /** + * Record log signature. + */ + private static final byte[] SIGNATURE = { 'J', 'L', 'O', 'G' }; + + /** + * Known major version. + */ + private static final short MAJOR_VERSION = 2; + + /** + * Known minor version. + */ + private static final short MINOR_VERSION = 0; + + /** + * Header size. This is the size of {@link #SIGNATURE}, {@link #MAJOR_VERSION}, + * {@link #MINOR_VERSION} and first revision (8 bytes). + */ + private static final int HEADER_SIZE = 4 + 2 + 2 + 8; + + /** + * Underlying file. + */ + private File logFile; + + /** + * Flag indicating whether this is a new log. + */ + private boolean isNew; + + /** + * Input stream used when seeking a specific record. + */ + private DataInputStream in; + + /** + * Last revision that is not in this log. + */ + private long previousRevision; + + /** + * Relative position inside this log. + */ + private long position; + + /** + * Last revision that is available in this log. + */ + private long lastRevision; + + /** + * Major version found in record log. + */ + private short major; + + /** + * Minor version found in record log. + */ + private short minor; + + /** + * Create a new instance of this class. Opens a record log in read-only mode. + * + * @param logFile file containing record log + * @throws java.io.IOException if an I/O error occurs + */ + public FileRecordLog(File logFile) throws IOException { + this.logFile = logFile; + + if (logFile.exists()) { + DataInputStream in = new DataInputStream(new FileInputStream(logFile)); + + try { + readHeader(in); + previousRevision = in.readLong(); + lastRevision = previousRevision + logFile.length() - HEADER_SIZE; + } finally { + close(in); + } + } else { + isNew = true; + } + } + + /** + * Initialize this record log by writing a header containing the + * previous revision. + */ + public void init(long previousRevision) throws IOException { + if (isNew) { + DataOutputStream out = new DataOutputStream(new FileOutputStream(logFile)); + + try { + writeHeader(out); + out.writeLong(previousRevision); + } finally { + close(out); + } + + this.previousRevision = lastRevision = previousRevision; + isNew = false; + } + } + + /** + * Return a flag indicating whether this record log contains a certain revision. + * + * @param revision revision to look for + * @return true if this record log contain a certain revision; + * false otherwise + */ + public boolean contains(long revision) { + return (revision >= previousRevision && revision < lastRevision); + } + + /** + * Return a flag indicating whether this record log is new. + * + * @return true if this record log is new; + * false otherwise + */ + public boolean isNew() { + return isNew; + } + + /** + * Return a flag indicating whether this record log exceeds a given size. + */ + public boolean exceeds(long size) { + return (lastRevision - previousRevision) > size; + } + + /** + * Seek an entry. This is an operation that allows the unterlying input stream + * to be sequentially scanned and must therefore not be called twice. + * + * @param revision revision to seek + * @throws java.io.IOException if an I/O error occurs + */ + public void seek(long revision) throws IOException { + if (in != null) { + String msg = "Stream already open: seek() only allowed once."; + throw new IllegalStateException(msg); + } + in = new DataInputStream(new BufferedInputStream( + new FileInputStream(logFile))); + skip(revision - previousRevision + HEADER_SIZE); + position = revision - previousRevision; + } + + /** + * Skip exactly n bytes. Throws if less bytes are skipped. + * + * @param n bytes to skip + * @throws java.io.IOException if an I/O error occurs, or less that n bytes + * were skipped. + */ + private void skip(long n) throws IOException { + long skiplen = n; + while (skiplen > 0) { + long skipped = in.skip(skiplen); + if (skipped <= 0) { + break; + } + skiplen -= skipped; + } + if (skiplen != 0) { + String msg = "Unable to skip remaining bytes."; + throw new IOException(msg); + } + } + + /** + * Read the file record at the current seek position. + * + * @param resolver namespace resolver + * @return file record + * @throws java.io.IOException if an I/O error occurs + */ + public ReadRecord read(NamespaceResolver resolver) throws IOException { + String journalId = in.readUTF(); + String producerId = in.readUTF(); + int length = in.readInt(); + + position += 2 + utfLength(journalId) + + 2 + utfLength(producerId) + + 4 + length; + + long revision = previousRevision + position; + return new ReadRecord(journalId, producerId, revision, in, length, resolver); + } + + /** + * Append a record backed by a file to this log. Returns the revision + * following this record. + * + * @param journalId journal identifier + * @param producerId producer identifier + * @param file record to add + * @throws java.io.IOException if an I/O error occurs + */ + public long append(String journalId, String producerId, File file) + throws IOException { + + DataOutputStream out = new DataOutputStream(new FileOutputStream(logFile, true)); + + try { + int recordLength = (int) file.length(); + out.writeUTF(journalId); + out.writeUTF(producerId); + out.writeInt(recordLength); + append(file, out); + lastRevision += 2 + utfLength(journalId) + + 2 + utfLength(producerId) + + 4 + file.length(); + return lastRevision; + } finally { + close(out); + } + } + + /** + * Close this log. + */ + public void close() { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + String msg = "Error while closing record log: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Read signature and major/minor version of file and verify. + * + * @param in input stream + * @throws java.io.IOException if an I/O error occurs or the file does + * not have a valid header. + */ + private void readHeader(DataInputStream in) throws IOException { + byte[] signature = new byte[SIGNATURE.length]; + in.readFully(signature); + + for (int i = 0; i < SIGNATURE.length; i++) { + if (signature[i] != SIGNATURE[i]) { + String msg = "Record log '" + logFile.getPath() + + "' has wrong signature: " + toHexString(signature); + throw new IOException(msg); + } + } + + major = in.readShort(); + if (major != MAJOR_VERSION) { + String msg = "Record log '" + logFile.getPath() + + "' has incompatible major version: " + major; + throw new IOException(msg); + } + minor = in.readShort(); + } + + /** + * Write signature and major/minor. + * + * @param out input stream + * @throws java.io.IOException if an I/O error occurs. + */ + private void writeHeader(DataOutputStream out) throws IOException { + out.write(SIGNATURE); + out.writeShort(MAJOR_VERSION); + out.writeShort(MINOR_VERSION); + } + + /** + * Close an input stream, logging a warning if an error occurs. + */ + private static void close(InputStream in) { + try { + in.close(); + } catch (IOException e) { + String msg = "I/O error while closing input stream."; + log.warn(msg, e); + } + } + + /** + * Close an output stream, logging a warning if an error occurs. + */ + private static void close(OutputStream out) { + try { + out.close(); + } catch (IOException e) { + String msg = "I/O error while closing input stream."; + log.warn(msg, e); + } + } + + /** + * Append a file to this log's output stream. + * + * @param file file to append + * @param out where to append to + */ + private static void append(File file, DataOutputStream out) throws IOException { + byte[] buffer = new byte[8192]; + int len; + + InputStream in = new BufferedInputStream(new FileInputStream(file)); + try { + while ((len = in.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + out.flush(); + } finally { + close(in); + } + } + + /** + * Convert a byte array to its hexadecimal string representation. + */ + private static String toHexString(byte[] b) { + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < b.length; i++) { + String s = Integer.toHexString(b[i] & 0xff).toUpperCase(); + if (s.length() == 1) { + buf.append('0'); + } + buf.append(s); + } + return buf.toString(); + } + + /** + * Return the length of a string when converted to its Java modified + * UTF-8 encoding, as used by DataInput.readUTF and + * DataOutput.writeUTF. + */ + private static int utfLength(String s) { + char[] ac = s.toCharArray(); + int utflen = 0; + + for (int i = 0; i < ac.length; i++) { + char c = ac[i]; + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + return utflen; + } +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/FileRevision.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +/** + * Maintains a file-based revision counter with locking, assuring uniqueness. + */ +public class FileRevision { + + /** + * Logger. + */ + private static final Logger log = LoggerFactory.getLogger(FileRevision.class); + + /** + * Underlying random access file. + */ + private final RandomAccessFile raf; + + /** + * Cached value. + */ + private long value; + + /** + * Creates a new file based revision counter. + * + * @param file holding global counter + * @throws JournalException if some error occurs + */ + public FileRevision(File file) throws JournalException { + try { + if (!file.exists()) { + file.createNewFile(); + } + raf = new RandomAccessFile(file, "rw"); + if (raf.length() == 0) { + set(0); + } + } catch (IOException e) { + String msg = "I/O error while attempting to create new file '" + file + "'."; + throw new JournalException(msg, e); + } + } + + /** + * Return current counter value. + * + * @return counter value + * @throws JournalException if some error occurs + */ + public synchronized long get() throws JournalException { + try { + raf.seek(0L); + value = raf.readLong(); + return value; + } catch (IOException e) { + throw new JournalException("I/O error occurred.", e); + } + } + + /** + * Set current counter value. + * + * @param value new counter value + * @throws JournalException if some error occurs + */ + public synchronized void set(long value) throws JournalException { + try { + raf.seek(0L); + raf.writeLong(value); + raf.getFD().sync(); + this.value = value; + } catch (IOException e) { + throw new JournalException("I/O error occurred.", e); + } + } +} Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Journal.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.apache.jackrabbit.name.NamespaceResolver; + +/** + * Generic journal interface. + */ +public interface Journal { + + /** + * Initialize journal. + * + * @param id id this journal should use to write its own records + * @param resolver resolver used when reading/writing records + * @throws JournalException if an error occurs + */ + public void init(String id, NamespaceResolver resolver) throws JournalException; + + /** + * Register a record consumer. + * + * @param consumer record consumer + * @throws JournalException if an error occurs + */ + public void register(RecordConsumer consumer) throws JournalException; + + /** + * Unregister a record processor. + * + * @param consumer record processor to unregister + * @return true if the consumer was previously registered; + * false otherwise + */ + public boolean unregister(RecordConsumer consumer); + + /** + * Synchronize contents from journal. This will compare the journal's + * revision with the revisions of all registered consumers and invoke + * their {@link RecordConsumer#consume} method when their identifier + * matches the one found in the records. + * + * @throws JournalException if an error occurs + */ + public void sync() throws JournalException; + + /** + * Return the record producer for a given identifier. + * + * @param identifier identifier + * @throws JournalException if an error occurs + */ + public RecordProducer getProducer(String identifier) throws JournalException; + + /** + * Close this journal. This should release any resources still held by this journal. + */ + public void close(); +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/JournalException.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/JournalException.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/JournalException.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/JournalException.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.apache.jackrabbit.BaseException; + +/** + * The JournalException signals an error within a journal operation. + */ +public class JournalException extends BaseException { + + /** + * Constructs a new instance of this class with the specified detail + * message. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public JournalException(String message) { + super(message); + } + + /** + * Constructs a new instance of this class with the specified detail + * message and root cause. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + * @param rootCause root failure cause + */ + public JournalException(String message, Throwable rootCause) { + super(message, rootCause); + } +} Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/LockableFileRevision.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/LockableFileRevision.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/LockableFileRevision.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/LockableFileRevision.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileLock; + +/** + * Maintains a file-based revision counter with locking, assuring uniqueness. + */ +class LockableFileRevision { + + /** + * Logger. + */ + private static final Logger log = LoggerFactory.getLogger(LockableFileRevision.class); + + /** + * Underlying file. + */ + private final File file; + + /** + * Underlying random access file. + */ + private RandomAccessFile raf; + + /** + * File lock. + */ + private FileLock lock; + + /** + * Current lock count. + */ + private int locks; + + /** + * Creates a new file based revision counter. + * + * @param file holding global counter + */ + public LockableFileRevision(File file) { + this.file = file; + + try { + if (!file.exists()) { + file.createNewFile(); + } + } catch (IOException e) { + String msg = "I/O error while attempting to create new file '" + file + "': " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Lock underlying file. + * + * @param shared whether to allow other readers or not + */ + public void lock(boolean shared) throws JournalException { + if (lock == null) { + try { + raf = new RandomAccessFile(file, shared ? "r" : "rw"); + lock = raf.getChannel().lock(0L, Long.MAX_VALUE, shared); + } catch (IOException e) { + String msg = "I/O error occurred."; + throw new JournalException(msg, e); + } finally { + if (lock == null && raf != null) { + try { + raf.close(); + } catch (IOException e) { + String msg = "I/O error while closing file " + file.getPath() + ": " + e.getMessage(); + log.warn(msg); + } + raf = null; + } + } + } + locks++; + } + + /** + * Unlock underlying file. + */ + public void unlock() { + if (lock != null && --locks == 0) { + try { + lock.release(); + } catch (IOException e) { + String msg = "I/O error while releasing lock: " + e.getMessage(); + log.warn(msg); + } + lock = null; + + if (raf != null) { + try { + raf.close(); + } catch (IOException e) { + String msg = "I/O error while closing file: " + e.getMessage(); + log.warn(msg); + } + } + raf = null; + } + } + + /** + * Return current counter value. + * + * @return counter value + * @throws JournalException if some error occurs + */ + public long get() throws JournalException { + lock(true); + + try { + long value = 0L; + if (raf.length() > 0) { + raf.seek(0L); + value = raf.readLong(); + } + return value; + + } catch (IOException e) { + throw new JournalException("I/O error occurred: ", e); + } finally { + unlock(); + } + } + + /** + * Set current counter value. + * + * @param value new counter value + * @throws JournalException if some error occurs + */ + public void set(long value) throws JournalException { + lock(false); + + try { + raf.seek(0L); + raf.writeLong(value); + } catch (IOException e) { + throw new JournalException("I/O error occurred.", e); + } finally { + unlock(); + } + } +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/ReadRecord.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.apache.jackrabbit.name.QName; +import org.apache.jackrabbit.name.NamespaceResolver; + +import java.io.IOException; +import java.io.DataInputStream; + +/** + * Record used for reading. + */ +class ReadRecord extends AbstractRecord { + + /** + * This record's journal id. + */ + private final String journalId; + + /** + * This record's producer id. + */ + private final String producerId; + + /** + * This record's revision. + */ + private final long revision; + + /** + * Underlying data input. + */ + private final DataInputStream dataIn; + + /** + * This record's length. + */ + private final int length; + + /** + * Flag indicating whether this record was consumed. + */ + private boolean consumed; + + /** + * Create a new instance of this class. + */ + public ReadRecord(String journalId, String producerId, + long revision, DataInputStream dataIn, int length, + NamespaceResolver resolver) { + + super(resolver); + + this.journalId = journalId; + this.producerId = producerId; + this.revision = revision; + this.dataIn = dataIn; + this.length = length; + } + + /** + * {@inheritDoc} + */ + public String getJournalId() { + return journalId; + } + + /** + * {@inheritDoc} + */ + public String getProducerId() { + return producerId; + } + + /** + * {@inheritDoc} + */ + public long getRevision() { + return revision; + } + + /** + * {@inheritDoc} + */ + public byte readByte() throws JournalException { + consumed = true; + + try { + return dataIn.readByte(); + } catch (IOException e) { + String msg = "I/O error while reading byte."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public char readChar() throws JournalException { + consumed = true; + + try { + return dataIn.readChar(); + } catch (IOException e) { + String msg = "I/O error while reading character."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public boolean readBoolean() throws JournalException { + consumed = true; + + try { + return dataIn.readBoolean(); + } catch (IOException e) { + String msg = "I/O error while reading boolean."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public int readInt() throws JournalException { + consumed = true; + + try { + return dataIn.readInt(); + } catch (IOException e) { + String msg = "I/O error while reading integer."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public String readString() throws JournalException { + consumed = true; + + try { + boolean isNull = dataIn.readBoolean(); + if (isNull) { + return null; + } else { + return dataIn.readUTF(); + } + } catch (IOException e) { + String msg = "I/O error while reading string."; + throw new JournalException(msg, e); + } + } + + /** + * {@inheritDoc} + */ + public void readFully(byte[] b) throws JournalException { + consumed = true; + + try { + dataIn.readFully(b); + } catch (IOException e) { + String msg = "I/O error while reading byte array."; + throw new JournalException(msg, e); + } + } + + /** + * Close this record, eventually skipping unconsumed bytes. + * + * @throws IOException if an I/O error occurs + */ + public void close() throws IOException { + if (length != 0) { + if (!consumed) { + skip(length); + } + } else { + dataIn.close(); + } + } + + /** + * Skip exactly n bytes. Throws if less bytes are skipped. + * + * @param n bytes to skip + * @throws IOException if an I/O error occurs, or less than + * n bytes were skipped. + */ + private void skip(long n) throws IOException { + long skiplen = n; + while (skiplen > 0) { + long skipped = dataIn.skip(skiplen); + if (skipped <= 0) { + break; + } + skiplen -= skipped; + } + if (skiplen != 0) { + String msg = "Should have skipped " + n + + " bytes, only " + (n - skiplen) + " skipped."; + throw new IOException(msg); + } + } + + /** + * Unsupported methods when appending. + */ + public void writeByte(int n) throws JournalException { + throw unsupported(); + } + + public void writeChar(char c) throws JournalException { + throw unsupported(); + } + + public void writeBoolean(boolean b) throws JournalException { + throw unsupported(); + } + + public void writeInt(int n) throws JournalException { + throw unsupported(); + } + + public void writeString(String s) throws JournalException { + throw unsupported(); + } + + public void writeQName(QName name) throws JournalException { + throw unsupported(); + } + + public void write(byte[] b) throws JournalException { + throw unsupported(); + } + + public void update() throws JournalException { + throw unsupported(); + } + + public void cancelUpdate() { + } + + private JournalException unsupported() { + String msg = "Record has been opened read-only."; + return new JournalException(msg); + } +} Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Record.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Record.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Record.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/Record.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import org.apache.jackrabbit.core.NodeId; +import org.apache.jackrabbit.core.PropertyId; +import org.apache.jackrabbit.core.nodetype.NodeTypeDef; +import org.apache.jackrabbit.name.QName; +import org.apache.jackrabbit.name.Path; + +/** + * Record interface. + */ +public interface Record { + + /** + * Returns the revision this record represents. + * + * @return revision + */ + public long getRevision(); + + /** + * Return this record's journal identifier. + * + * @return journal identifier + */ + public String getJournalId(); + + /** + * Return this record's producer identifier. + * + * @return producer identifier + */ + public String getProducerId(); + + /** + * Read a byte from the underlying stream. + * + * @return byte + * @throws JournalException if an error occurs + */ + public byte readByte() throws JournalException; + + /** + * Read a character from the underlying stream. + * + * @return character + * @throws JournalException if an error occurs + */ + public char readChar() throws JournalException; + + /** + * Read a boolean from the underlying stream. + * + * @return boolean + * @throws JournalException if an error occurs + */ + public boolean readBoolean() throws JournalException; + + /** + * Read an integer from the underlying stream. + * + * @return integer + * @throws JournalException if an error occurs + */ + public int readInt() throws JournalException; + + /** + * Read a string from the underlying stream. + * + * @return string or null + * @throws JournalException if an error occurs + */ + public String readString() throws JournalException; + + /** + * Fully read an array of bytes from the underlying stream. + * + * @param b byte array + * @throws JournalException if an error occurs + */ + public void readFully(byte[] b) throws JournalException; + + /** + * Read a QName frmo the underlying stream. + * + * @return name name + * @throws JournalException if an error occurs + */ + public QName readQName() throws JournalException; + + /** + * Read a Path.PathElement from the underlying stream. + * + * @return path element + * @throws JournalException if an error occurs + */ + public Path.PathElement readPathElement() throws JournalException; + + /** + * Read a Path from the underlying stream. + * + * @return path + * @throws JournalException if an error occurs + */ + public Path readPath() throws JournalException; + + /** + * Read a NodeId from the underlying stream. + * + * @return node id + * @throws JournalException if an error occurs + */ + public NodeId readNodeId() throws JournalException; + + /** + * Read a PropertyId from the underlying stream. + * + * @return property id + * @throws JournalException if an error occurs + */ + public PropertyId readPropertyId() throws JournalException; + + /** + * Read a NodeTypeDef from the underlying stream. + * + * @return node type definition + * @throws JournalException if an error occurs + */ + public NodeTypeDef readNodeTypeDef() throws JournalException; + + /** + * Write a byte to the underlying stream. + * + * @param n byte + * @throws JournalException if an error occurs + */ + public void writeByte(int n) throws JournalException; + + /** + * Write a character to the underlying stream. + * + * @param c character + * @throws JournalException if an error occurs + */ + public void writeChar(char c) throws JournalException; + + /** + * Write a boolean from the underlying stream. + * + * @param b boolean + * @throws JournalException if an error occurs + */ + public void writeBoolean(boolean b) throws JournalException; + + /** + * Write an integer to the underlying stream. + * + * @param n integer + * @throws JournalException if an error occurs + */ + public void writeInt(int n) throws JournalException; + + /** + * Write a string to the underlying stream. + * + * @param s string, may be null + * @throws JournalException if an error occurs + */ + public void writeString(String s) throws JournalException; + + /** + * Write an array of bytes to the underlying stream. + * + * @param b byte array + * @throws JournalException if an error occurs + */ + public void write(byte[] b) throws JournalException; + + /** + * Write a QName to the underlying stream. + * + * @param name name + * @throws JournalException if an error occurs + */ + public void writeQName(QName name) throws JournalException; + + /** + * Write a Path.PathElement to the underlying stream. + * + * @param element path element + * @throws JournalException if an error occurs + */ + public void writePathElement(Path.PathElement element) throws JournalException; + + /** + * Write a Path to the underlying stream. + * + * @param path path + * @throws JournalException if an error occurs + */ + public void writePath(Path path) throws JournalException; + + /** + * Write a NodeId to the underlying stream. + * + * @param nodeId node id + * @throws JournalException if an error occurs + */ + public void writeNodeId(NodeId nodeId) throws JournalException; + + /** + * Write a PropertyId to the underlying stream. + * + * @param propertyId property id + * @throws JournalException if an error occurs + */ + public void writePropertyId(PropertyId propertyId) throws JournalException; + + /** + * Write a NodeTypeDef to the underlying stream. + * + * @param ntd node type definition + * @throws JournalException if an error occurs + */ + public void writeNodeTypeDef(NodeTypeDef ntd) throws JournalException; + + /** + * Update the changes made to an appended record. This will also update + * this record's revision. + * + * @throws JournalException if this record has not been appended, + * or if another error occurs + */ + public void update() throws JournalException; + + /** + * Cancel the changes made to an appended record. + */ + public void cancelUpdate(); +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordConsumer.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordConsumer.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordConsumer.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordConsumer.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +/** + * Listener interface on a journal that gets called back for records that should be consumed. + */ +public interface RecordConsumer { + + /** + * Return the unique identifier of the records this consumer + * will be able to handle. + * + * @return unique identifier + */ + public String getId(); + + /** + * Return the revision this consumer has last seen. + * + * @return revision + */ + public long getRevision(); + + /** + * Consume a record. + * + * @param record record to consume + */ + public void consume(Record record); + + /** + * Set the revision this consumer has last seen. + * + * @param revision revision + */ + public void setRevision(long revision); +} Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordIterator.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordIterator.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordIterator.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordIterator.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +import java.util.NoSuchElementException; + +/** + * RecordIterator interface. + */ +public interface RecordIterator { + + /** + * Return a flag indicating whether there are more records. + * + * @return true if there are more records; + * false otherwise + */ + public boolean hasNext(); + + /** + * Return the next record. If there are no more recors, throws + * a NoSuchElementException. If an error occurs, + * throws a JournalException. + * + * @return next record + * @throws NoSuchElementException if there are no more records + * @throws JournalException if another error occurs + */ + public Record nextRecord() throws NoSuchElementException, JournalException; + + /** + * Close this iterator. Releases all associated resources. + */ + public void close(); +} \ No newline at end of file Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordProducer.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordProducer.java?view=auto&rev=509624 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordProducer.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/journal/RecordProducer.java Tue Feb 20 08:06:42 2007 @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.journal; + +/** + * Produces new records that can be appended to the journal. + */ +public interface RecordProducer { + + /** + * Append a record. This operation implicitely locks the journal revision + * and must be followed by either {@link Record#update} or {@link Record#cancelUpdate}. + * on the record returned. + * + * @return appended record + * @throws JournalException if an error occurs + */ + public Record append() throws JournalException; +}