Return-Path: Delivered-To: apmail-jackrabbit-commits-archive@www.apache.org Received: (qmail 85612 invoked from network); 11 Jan 2007 14:41:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Jan 2007 14:41:06 -0000 Received: (qmail 21133 invoked by uid 500); 11 Jan 2007 14:41:12 -0000 Delivered-To: apmail-jackrabbit-commits-archive@jackrabbit.apache.org Received: (qmail 21101 invoked by uid 500); 11 Jan 2007 14:41:12 -0000 Mailing-List: contact commits-help@jackrabbit.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@jackrabbit.apache.org Delivered-To: mailing list commits@jackrabbit.apache.org Received: (qmail 21092 invoked by uid 99); 11 Jan 2007 14:41:12 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jan 2007 06:41:12 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jan 2007 06:41:02 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 27FCD1A981C; Thu, 11 Jan 2007 06:40:02 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r495239 [1/2] - /jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ Date: Thu, 11 Jan 2007 14:40:01 -0000 To: commits@jackrabbit.apache.org From: dpfister@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070111144002.27FCD1A981C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dpfister Date: Thu Jan 11 06:40:00 2007 New Revision: 495239 URL: http://svn.apache.org/viewvc?view=rev&rev=495239 Log: JCR-702: Allow database as backend for clustering JCR-703: Add signature and major/minor version to the journal files used for clustering Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/Record.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordInput.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/RecordOutput.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/default.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/derby.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/h2.ddl jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/oracle.ddl Removed: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRevision.java Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java?view=auto&rev=495239 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/AbstractJournal.java Thu Jan 11 06:40:00 2007 @@ -0,0 +1,613 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.cluster; + +import org.apache.jackrabbit.core.NodeId; +import org.apache.jackrabbit.core.nodetype.NodeTypeDef; +import org.apache.jackrabbit.core.nodetype.compact.ParseException; +import org.apache.jackrabbit.core.state.ChangeLog; +import org.apache.jackrabbit.core.state.ItemState; +import org.apache.jackrabbit.core.state.NodeState; +import org.apache.jackrabbit.core.state.PropertyState; +import org.apache.jackrabbit.core.observation.EventState; +import org.apache.jackrabbit.core.observation.EventStateCollection; +import org.apache.jackrabbit.name.Path; +import org.apache.jackrabbit.name.QName; +import org.apache.jackrabbit.name.NamespaceResolver; +import org.apache.jackrabbit.name.NoPrefixDeclaredException; +import org.apache.jackrabbit.name.NameException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.io.FileOutputStream; +import java.io.DataOutputStream; +import java.util.Iterator; +import java.util.Set; +import java.util.HashSet; +import java.util.Collection; + +import EDU.oswego.cs.dl.util.concurrent.Mutex; + +import javax.jcr.observation.Event; +import javax.jcr.Session; + +/** + * Base journal implementation, providing common functionality. + *

+ * It manages the following bean properties : + *

    + *
  • revision: the filename where the parent cluster node's revision + * file should be written to; this is a required property with no default value
  • + *
+ */ +public abstract class AbstractJournal implements Journal { + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(AbstractJournal.class); + + /** + * Journal id. + */ + protected String id; + + /** + * Namespace resolver used to map prefixes to URIs and vice-versa. + */ + protected NamespaceResolver resolver; + + /** + * Record processor. + */ + private RecordProcessor processor; + + /** + * Mutex used when writing journal. + */ + private final Mutex writeMutex = new Mutex(); + + /** + * Revision file name, bean property. + */ + private String revision; + + /** + * Current temporary journal log. + */ + private File tempLog; + + /** + * Current file record output. + */ + private RecordOutput out; + + /** + * Last used session for event sources. + */ + private Session lastSession; + + /** + * Next revision that will be available. + */ + private long nextRevision; + + /** + * Instance counter, file-based. + */ + private FileRevision instanceRevision; + + /** + * Bean getter for revision file. + * @return revision file + */ + public String getRevision() { + return revision; + } + + /** + * Bean setter for journal directory. + * @param revision directory used for journaling + */ + public void setRevision(String revision) { + this.revision = revision; + } + + /** + * {@inheritDoc} + */ + public void init(String id, RecordProcessor processor, NamespaceResolver resolver) + throws JournalException { + + this.id = id; + this.resolver = resolver; + this.processor = processor; + + if (revision == null) { + String msg = "Revision not specified."; + throw new JournalException(msg); + } + instanceRevision = new FileRevision(new File(revision)); + } + + /** + * Process a record. + * + * @param revision revision + * @param in record data + * @throws JournalException if an error occurs + */ + protected void process(long revision, RecordInput in) throws JournalException { + log.info("Processing revision: " + revision); + + String workspace = null; + + try { + workspace = in.readString(); + processor.start(workspace); + + for (;;) { + char c = in.readChar(); + if (c == '\0') { + break; + } + if (c == 'N') { + NodeOperation operation = NodeOperation.create(in.readByte()); + operation.setId(in.readNodeId()); + processor.process(operation); + } else if (c == 'P') { + PropertyOperation operation = PropertyOperation.create(in.readByte()); + operation.setId(in.readPropertyId()); + processor.process(operation); + } else if (c == 'E') { + int type = in.readByte(); + NodeId parentId = in.readNodeId(); + Path parentPath = in.readPath(); + NodeId childId = in.readNodeId(); + Path.PathElement childRelPath = in.readPathElement(); + QName ntName = in.readQName(); + + Set mixins = new HashSet(); + int mixinCount = in.readInt(); + for (int i = 0; i < mixinCount; i++) { + mixins.add(in.readQName()); + } + String userId = in.readString(); + processor.process(createEventState(type, parentId, parentPath, childId, + childRelPath, ntName, mixins, userId)); + } else if (c == 'L') { + NodeId nodeId = in.readNodeId(); + boolean isLock = in.readBoolean(); + if (isLock) { + boolean isDeep = in.readBoolean(); + String owner = in.readString(); + processor.process(nodeId, isDeep, owner); + } else { + processor.process(nodeId); + } + } else if (c == 'S') { + String oldPrefix = in.readString(); + String newPrefix = in.readString(); + String uri = in.readString(); + processor.process(oldPrefix, newPrefix, uri); + } else if (c == 'T') { + int size = in.readInt(); + HashSet ntDefs = new HashSet(); + for (int i = 0; i < size; i++) { + ntDefs.add(in.readNodeTypeDef()); + } + processor.process(ntDefs); + } else { + throw new IllegalArgumentException("Unknown entry type: " + c); + } + } + processor.end(); + + } catch (NameException e) { + String msg = "Unable to read revision '" + revision + "'."; + throw new JournalException(msg, e); + } catch (ParseException e) { + String msg = "Unable to read revision '" + revision + "'."; + throw new JournalException(msg, e); + } catch (IOException e) { + String msg = "Unable to read revision '" + revision + "'."; + throw new JournalException(msg, e); + } catch (IllegalArgumentException e) { + String msg = "Error while processing revision " + + revision + ": " + e.getMessage(); + throw new JournalException(msg); + } + } + + /** + * {@inheritDoc} + */ + public void begin(String workspace) throws JournalException { + try { + writeMutex.acquire(); + } catch (InterruptedException e) { + String msg = "Interrupted while waiting for write lock."; + throw new JournalException(msg); + } + + boolean succeeded = false; + + try { + sync(); + + tempLog = File.createTempFile("journal", ".tmp"); + + out = new RecordOutput(new DataOutputStream( + new FileOutputStream(tempLog)), resolver); + out.writeString(workspace); + + succeeded = true; + } catch (IOException e) { + String msg = "Unable to create journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } finally { + if (!succeeded) { + writeMutex.release(); + } + } + } + + /** + * {@inheritDoc} + */ + public void log(ChangeLog changeLog, EventStateCollection esc) throws JournalException { + Iterator addedStates = changeLog.addedStates(); + while (addedStates.hasNext()) { + ItemState state = (ItemState) addedStates.next(); + if (state.isNode()) { + log(NodeAddedOperation.create((NodeState) state)); + } else { + log(PropertyAddedOperation.create((PropertyState) state)); + } + } + Iterator modifiedStates = changeLog.modifiedStates(); + while (modifiedStates.hasNext()) { + ItemState state = (ItemState) modifiedStates.next(); + if (state.isNode()) { + log(NodeModifiedOperation.create((NodeState) state)); + } else { + log(PropertyModifiedOperation.create((PropertyState) state)); + } + } + Iterator deletedStates = changeLog.deletedStates(); + while (deletedStates.hasNext()) { + ItemState state = (ItemState) deletedStates.next(); + if (state.isNode()) { + log(NodeDeletedOperation.create((NodeState) state)); + } else { + log(PropertyDeletedOperation.create((PropertyState) state)); + } + } + + Iterator events = esc.getEvents().iterator(); + while (events.hasNext()) { + EventState event = (EventState) events.next(); + log(event); + } + } + + /** + * {@inheritDoc} + */ + public void log(String oldPrefix, String newPrefix, String uri) throws JournalException { + try { + out.writeChar('S'); + out.writeString(oldPrefix); + out.writeString(newPrefix); + out.writeString(uri); + } catch (IOException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } + } + + /** + * {@inheritDoc} + */ + public void log(NodeId nodeId, boolean isDeep, String owner) throws JournalException { + log(nodeId, true, isDeep, owner); + } + + /** + * {@inheritDoc} + */ + public void log(NodeId nodeId) throws JournalException { + log(nodeId, false, false, null); + } + + /** + * {@inheritDoc} + */ + public void log(Collection ntDefs) throws JournalException { + try { + out.writeChar('T'); + out.writeInt(ntDefs.size()); + + Iterator iter = ntDefs.iterator(); + while (iter.hasNext()) { + out.writeNodeTypeDef((NodeTypeDef) iter.next()); + } + } catch (IOException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } + + } + + /** + * Log a property operation. + * + * @param operation property operation + */ + protected void log(PropertyOperation operation) throws JournalException { + try { + out.writeChar('P'); + out.writeByte(operation.getOperationType()); + out.writePropertyId(operation.getId()); + } catch (NoPrefixDeclaredException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } catch (IOException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } + } + + /** + * Log a node operation. + * + * @param operation node operation + */ + protected void log(NodeOperation operation) throws JournalException { + try { + out.writeChar('N'); + out.writeByte(operation.getOperationType()); + out.writeNodeId(operation.getId()); + } catch (IOException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } + } + + /** + * Log an event. Subclass responsibility. + * + * @param event event to log + */ + protected void log(EventState event) throws JournalException { + try { + out.writeChar('E'); + out.writeByte(event.getType()); + out.writeNodeId(event.getParentId()); + out.writePath(event.getParentPath()); + out.writeNodeId(event.getChildId()); + out.writePathElement(event.getChildRelPath()); + out.writeQName(event.getNodeType()); + + Set mixins = event.getMixinNames(); + out.writeInt(mixins.size()); + Iterator iter = mixins.iterator(); + while (iter.hasNext()) { + out.writeQName((QName) iter.next()); + } + out.writeString(event.getUserId()); + } catch (NoPrefixDeclaredException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } catch (IOException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } + } + + /** + * Log either a lock or an unlock operation. + * + * @param nodeId node id + * @param isLock true if this is a lock; + * false if this is an unlock + * @param isDeep flag indicating whether lock is deep + * @param owner lock owner + */ + protected void log(NodeId nodeId, boolean isLock, boolean isDeep, String owner) + throws JournalException { + + try { + out.writeChar('L'); + out.writeNodeId(nodeId); + out.writeBoolean(isLock); + if (isLock) { + out.writeBoolean(isDeep); + out.writeString(owner); + } + } catch (IOException e) { + String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } + } + + /** + * Lock the global revision, disallowing changes from other sources until + * {@link #unlockRevision} has been called. + * + * @return current global revision, passed to {@link #append} when changes + * are committed + * @throws JournalException + */ + protected abstract long lockRevision() throws JournalException; + + /** + * Unlock the global revision. An additional flag indicates whether the + * append operation was successful. + * + * @param successful whether the append operation was successful + */ + protected abstract void unlockRevision(boolean successful); + + /** + * {@inheritDoc} + */ + public void prepare() throws JournalException { + nextRevision = lockRevision(); + + boolean succeeded = false; + + try { + sync(); + + succeeded = true; + } finally { + if (!succeeded) { + unlockRevision(false); + writeMutex.release(); + } + } + } + + /** + * Append the given record to the journal. On exit, the global and local revision + * should have been updated as well. + * + * @param revision record revision, as returned by {@link #lockRevision()} + * @param record record to append + * @throws JournalException if an error occurs + */ + protected abstract void append(long revision, File record) throws JournalException; + + /** + * Returns the current local revision. + * + * @return current local revision + * @throws JournalException if an error occurs + */ + protected long getLocalRevision() throws JournalException { + return instanceRevision.get(); + } + + /** + * Sets the current local revision. + * + * @param revision revision + * @throws JournalException if an error occurs + */ + protected void setLocalRevision(long revision) throws JournalException { + instanceRevision.set(revision); + } + + /** + * {@inheritDoc} + */ + public void commit() throws JournalException { + boolean succeeded = false; + + try { + out.writeChar('\0'); + out.close(); + + append(nextRevision, tempLog); + + succeeded = true; + + } catch (IOException e) { + String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage(); + throw new JournalException(msg); + } finally { + out = null; + tempLog.delete(); + unlockRevision(succeeded); + writeMutex.release(); + } + } + + /** + * {@inheritDoc} + */ + public void cancel() { + if (out != null) { + try { + out.close(); + tempLog.delete(); + } catch (IOException e) { + String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage(); + log.warn(msg); + } finally { + out = null; + unlockRevision(false); + writeMutex.release(); + } + } + } + + /** + * Create an event state. + * + * @param type event type + * @param parentId parent id + * @param parentPath parent path + * @param childId child id + * @param childRelPath child relative path + * @param ntName ndoe type name + * @param userId user id + * @return event + */ + protected EventState createEventState(int type, NodeId parentId, Path parentPath, + NodeId childId, Path.PathElement childRelPath, + QName ntName, Set mixins, String userId) { + switch (type) { + case Event.NODE_ADDED: + return EventState.childNodeAdded(parentId, parentPath, childId, childRelPath, + ntName, mixins, getOrCreateSession(userId)); + case Event.NODE_REMOVED: + return EventState.childNodeRemoved(parentId, parentPath, childId, childRelPath, + ntName, mixins, getOrCreateSession(userId)); + case Event.PROPERTY_ADDED: + return EventState.propertyAdded(parentId, parentPath, childRelPath, + ntName, mixins, getOrCreateSession(userId)); + case Event.PROPERTY_CHANGED: + return EventState.propertyChanged(parentId, parentPath, childRelPath, + ntName, mixins, getOrCreateSession(userId)); + case Event.PROPERTY_REMOVED: + return EventState.propertyRemoved(parentId, parentPath, childRelPath, + ntName, mixins, getOrCreateSession(userId)); + default: + String msg = "Unexpected event type: " + type; + throw new IllegalArgumentException(msg); + } + } + + + /** + * Return a session matching a certain user id. + * + * @param userId user id + * @return session + */ + protected Session getOrCreateSession(String userId) { + if (lastSession == null || !lastSession.getUserID().equals(userId)) { + lastSession = new ClusterSession(userId); + } + return lastSession; + } +} Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=diff&rev=495239&r1=495238&r2=495239 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Thu Jan 11 06:40:00 2007 @@ -320,10 +320,9 @@ /** * Return the instance id to be used for this node in the cluster. - * * @param id configured id, null to take random id */ - private String getClusterNodeId(String id) { + private String getClusterNodeId(String id) throws ClusterException { if (id == null) { id = System.getProperty(SYSTEM_PROPERTY_NODE_ID); if (id == null) { Added: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java?view=auto&rev=495239 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java (added) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/DatabaseJournal.java Thu Jan 11 06:40:00 2007 @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.core.cluster; + +import org.apache.jackrabbit.name.NamespaceResolver; +import org.apache.jackrabbit.util.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.DataInputStream; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.DatabaseMetaData; +import java.sql.Statement; + +/** + * Database-based journal implementation. Stores records inside a database. + *

+ * It is configured through the following properties: + *

    + *
  • revision: the filename where the parent cluster node's revision + * file should be written to; this is a required property with no default value
  • + *
  • driver: the JDBC driver class name to use; this is a required + * property with no default value
  • + *
  • url: the JDBC connection url; this is a required property with + * no default value
  • + *
  • schema: the schema to be used; if not specified, this is the + * second field inside the JDBC connection url, delimeted by colons
  • + *
  • schemaObjectPrefix: the schema object prefix to be used; + * defaults to an empty string
  • + *
  • user: username to specify when connecting
  • + *
  • password: password to specify when connecting
  • + *
  • schema:
  • + *
+ * This implementation maintains a database table, containing exactly one record with the + * last available revision. + */ +public class DatabaseJournal extends AbstractJournal { + + /** + * Schema object prefix. + */ + private static final String SCHEMA_OBJECT_PREFIX_VARIABLE = + "${schemaObjectPrefix}"; + + /** + * Default DDL script name. + */ + private static final String DEFAULT_DDL_NAME = "default.ddl"; + + /** + * Logger. + */ + private static Logger log = LoggerFactory.getLogger(DatabaseJournal.class); + + /** + * Driver name, bean property. + */ + private String driver; + + /** + * Connection URL, bean property. + */ + private String url; + + /** + * Schema name, bean property. + */ + private String schema; + + /** + * Schema object prefix, bean property. + */ + protected String schemaObjectPrefix; + + /** + * User name, bean property. + */ + private String user; + + /** + * Password, bean property. + */ + private String password; + + /** + * JDBC Connection used. + */ + private Connection con; + + /** + * Statement returning all revisions within a range. + */ + private PreparedStatement selectRevisionsStmt; + + /** + * Statement updating the global revision. + */ + private PreparedStatement updateGlobalStmt; + + /** + * Statement returning the global revision. + */ + private PreparedStatement selectGlobalStmt; + + /** + * Statement appending a new record. + */ + private PreparedStatement insertRevisionStmt; + + /** + * Bean getter for driver. + * @return driver + */ + public String getDriver() { + return driver; + } + + /** + * Bean setter for driver. + * @param driver driver + */ + public void setDriver(String driver) { + this.driver = driver; + } + + /** + * Bean getter for url. + * @return url + */ + public String getUrl() { + return url; + } + + /** + * Bean setter for url. + * @param url url + */ + public void setUrl(String url) { + this.url = url; + } + + /** + * Bean getter for schema. + * @return schema + */ + public String getSchema() { + return schema; + } + + /** + * Bean getter for schema. + * @param schema schema + */ + public void setSchema(String schema) { + this.schema = schema; + } + + /** + * Bean getter for schema object prefix. + * @return schema object prefix + */ + public String getSchemaObjectPrefix() { + return schemaObjectPrefix; + } + + /** + * Bean getter for schema object prefix. + * @param schemaObjectPrefix schema object prefix + */ + public void setSchemaObjectPrefix(String schemaObjectPrefix) { + this.schemaObjectPrefix = schemaObjectPrefix.toUpperCase(); + } + + /** + * Bean getter for user. + * @return user + */ + public String getUser() { + return user; + } + + /** + * Bean setter for user. + * @param user user + */ + public void setUser(String user) { + this.user = user; + } + + /** + * Bean getter for password. + * @return password + */ + public String getPassword() { + return password; + } + + /** + * Bean setter for password. + * @param password password + */ + public void setPassword(String password) { + this.password = password; + } + + /** + * {@inheritDoc} + */ + public void init(String id, RecordProcessor processor, NamespaceResolver resolver) + throws JournalException { + + super.init(id, processor, resolver); + + if (driver == null) { + String msg = "Driver not specified."; + throw new JournalException(msg); + } + if (url == null) { + String msg = "Connection URL not specified."; + throw new JournalException(msg); + } + try { + if (schema == null) { + schema = getSchemaFromURL(url); + } + if (schemaObjectPrefix == null) { + schemaObjectPrefix = ""; + } + } catch (IllegalArgumentException e) { + String msg = "Unable to derive schema from URL: " + e.getMessage(); + throw new JournalException(msg); + } + try { + Class.forName(driver); + con = DriverManager.getConnection(url, user, password); + con.setAutoCommit(false); + + checkSchema(); + prepareStatements(); + } catch (Exception e) { + String msg = "Unable to initialize connection."; + throw new JournalException(msg, e); + } + log.info("DatabaseJournal initialized at URL: " + url); + } + + /** + * Derive a schema from a JDBC connection URL. This simply treats the given URL + * as delimeted by colons and takes the 2nd field. + * + * @param url JDBC connection URL + * @return schema + * @throws IllegalArgumentException if the JDBC connection URL is invalid + */ + private String getSchemaFromURL(String url) throws IllegalArgumentException { + int start = url.indexOf(':'); + if (start != -1) { + int end = url.indexOf(':', start + 1); + if (end != -1) { + return url.substring(start + 1, end); + } + } + throw new IllegalArgumentException(url); + } + + /** + * {@inheritDoc} + */ + public void sync() throws JournalException { + long oldRevision = getLocalRevision(); + ResultSet rs = null; + + try { + selectRevisionsStmt.clearParameters(); + selectRevisionsStmt.clearWarnings(); + selectRevisionsStmt.setLong(1, oldRevision); + selectRevisionsStmt.execute(); + + rs = selectRevisionsStmt.getResultSet(); + while (rs.next()) { + long revision = rs.getLong(1); + String creator = rs.getString(2); + if (!creator.equals(id)) { + DataInputStream in = new DataInputStream(rs.getBinaryStream(3)); + try { + process(revision, in); + } catch (IllegalArgumentException e) { + String msg = "Error while processing revision " + + revision + ": " + e.getMessage(); + throw new JournalException(msg); + } finally { + close(in); + } + } else { + log.info("Log entry matches journal id, skipped: " + revision); + } + setLocalRevision(revision); + } + } catch (SQLException e) { + String msg = "Unable to iterate over modified records."; + throw new JournalException(msg, e); + } finally { + close(rs); + } + + long currentRevision = getLocalRevision(); + if (oldRevision < currentRevision) { + log.info("Sync finished, instance revision is: " + currentRevision); + } + } + + /** + * Process a record. + * + * @param revision revision + * @param dataIn data input + * @throws JournalException if an error occurs + */ + private void process(long revision, DataInputStream dataIn) throws JournalException { + RecordInput in = new RecordInput(dataIn, resolver); + + try { + process(revision, in); + } finally { + in.close(); + } + } + + /** + * {@inheritDoc} + */ + protected long lockRevision() throws JournalException { + ResultSet rs = null; + boolean succeeded = false; + + try { + updateGlobalStmt.clearParameters(); + updateGlobalStmt.clearWarnings(); + updateGlobalStmt.execute(); + + selectGlobalStmt.clearParameters(); + selectGlobalStmt.clearWarnings(); + selectGlobalStmt.execute(); + + rs = selectGlobalStmt.getResultSet(); + if (!rs.next()) { + throw new JournalException("No revision available."); + } + long globalRevision = rs.getLong(1); + succeeded = true; + return globalRevision; + + } catch (SQLException e) { + String msg = "Unable to lock global revision table: " + e.getMessage(); + throw new JournalException(msg); + } finally { + close(rs); + if (!succeeded) { + rollback(con); + } + } + } + + /** + * {@inheritDoc} + */ + protected void unlockRevision(boolean successful) { + if (!successful) { + rollback(con); + } + } + + /** + * {@inheritDoc} + */ + protected void append(long revision, File record) throws JournalException { + try { + InputStream in = new BufferedInputStream(new FileInputStream(record)); + + try { + insertRevisionStmt.clearParameters(); + insertRevisionStmt.clearWarnings(); + insertRevisionStmt.setLong(1, revision); + insertRevisionStmt.setString(2, id); + insertRevisionStmt.setBinaryStream(3, in, (int) record.length()); + insertRevisionStmt.execute(); + + con.commit(); + + setLocalRevision(revision); + } finally { + in.close(); + } + } catch (IOException e) { + String msg = "Unable to open journal log " + record + ": " + e.getMessage(); + throw new JournalException(msg); + } catch (SQLException e) { + String msg = "Unable to append revision: " + revision + ": " + e.getMessage(); + throw new JournalException(msg); + } + } + + /** + * {@inheritDoc} + */ + public void close() { + try { + con.close(); + } catch (SQLException e) { + String msg = "Error while closing connection: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Close some input stream. + * + * @param in input stream, may be null. + */ + private void close(InputStream in) { + try { + if (in != null) { + in.close(); + } + } catch (IOException e) { + String msg = "Error while closing input stream: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Close some statement. + * + * @param stmt statement, may be null. + */ + private void close(Statement stmt) { + try { + if (stmt != null) { + stmt.close(); + } + } catch (SQLException e) { + String msg = "Error while closing statement: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Close some resultset. + * + * @param rs resultset, may be null. + */ + private void close(ResultSet rs) { + try { + if (rs != null) { + rs.close(); + } + } catch (SQLException e) { + String msg = "Error while closing result set: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Rollback a connection. + * + * @param con connection. + */ + private void rollback(Connection con) { + try { + con.rollback(); + } catch (SQLException e) { + String msg = "Error while rolling back connection: " + e.getMessage(); + log.warn(msg); + } + } + + /** + * Checks if the required schema objects exist and creates them if they + * don't exist yet. + * + * @throws Exception if an error occurs + */ + private void checkSchema() throws Exception { + DatabaseMetaData metaData = con.getMetaData(); + String tableName = schemaObjectPrefix + "JOURNAL"; + if (metaData.storesLowerCaseIdentifiers()) { + tableName = tableName.toLowerCase(); + } else if (metaData.storesUpperCaseIdentifiers()) { + tableName = tableName.toUpperCase(); + } + + ResultSet rs = metaData.getTables(null, null, tableName, null); + boolean schemaExists; + try { + schemaExists = rs.next(); + } finally { + rs.close(); + } + + if (!schemaExists) { + // read ddl from resources + InputStream in = DatabaseJournal.class.getResourceAsStream(schema + ".ddl"); + if (in == null) { + String msg = "No schema-specific DDL found: '" + schema + ".ddl" + + "', falling back to '" + DEFAULT_DDL_NAME + "'."; + log.info(msg); + in = DatabaseJournal.class.getResourceAsStream(DEFAULT_DDL_NAME); + if (in == null) { + msg = "Unable to load '" + DEFAULT_DDL_NAME + "'."; + throw new JournalException(msg); + } + } + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + Statement stmt = con.createStatement(); + try { + String sql = reader.readLine(); + while (sql != null) { + // Skip comments and empty lines + if (!sql.startsWith("#") && sql.length() > 0) { + // replace prefix variable + sql = Text.replace(sql, SCHEMA_OBJECT_PREFIX_VARIABLE, schemaObjectPrefix); + // execute sql stmt + stmt.executeUpdate(sql); + } + // read next sql stmt + sql = reader.readLine(); + } + // commit the changes + con.commit(); + } finally { + close(in); + close(stmt); + } + } + } + + /** + * Builds and prepares the SQL statements. + * + * @throws SQLException if an error occurs + */ + private void prepareStatements() throws SQLException { + selectRevisionsStmt = con.prepareStatement( + "select REVISION_ID, REVISION_CREATOR, REVISION_DATA " + + "from " + schemaObjectPrefix + "JOURNAL " + + "where REVISION_ID > ?"); + updateGlobalStmt = con.prepareStatement( + "update " + schemaObjectPrefix + "GLOBAL_REVISION " + + "set revision_id = revision_id + 1"); + selectGlobalStmt = con.prepareStatement( + "select revision_id " + + "from " + schemaObjectPrefix + "GLOBAL_REVISION"); + insertRevisionStmt = con.prepareStatement( + "insert into " + schemaObjectPrefix + "JOURNAL" + + "(REVISION_ID, REVISION_CREATOR, REVISION_DATA) " + + "values (?,?,?)"); + } +} Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java?view=diff&rev=495239&r1=495238&r2=495239 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java Thu Jan 11 06:40:00 2007 @@ -16,60 +16,45 @@ */ package org.apache.jackrabbit.core.cluster; -import org.apache.jackrabbit.core.NodeId; -import org.apache.jackrabbit.core.nodetype.NodeTypeDef; -import org.apache.jackrabbit.core.nodetype.compact.ParseException; -import org.apache.jackrabbit.core.state.ChangeLog; -import org.apache.jackrabbit.core.state.ItemState; -import org.apache.jackrabbit.core.state.NodeState; -import org.apache.jackrabbit.core.state.PropertyState; -import org.apache.jackrabbit.core.observation.EventState; -import org.apache.jackrabbit.core.observation.EventStateCollection; -import org.apache.jackrabbit.name.Path; -import org.apache.jackrabbit.name.QName; import org.apache.jackrabbit.name.NamespaceResolver; -import org.apache.jackrabbit.name.NoPrefixDeclaredException; -import org.apache.jackrabbit.name.NameException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.util.Arrays; import java.util.Comparator; -import java.util.Iterator; -import java.util.Set; -import java.util.HashSet; -import java.util.Collection; - -import EDU.oswego.cs.dl.util.concurrent.Mutex; - -import javax.jcr.observation.Event; -import javax.jcr.Session; /** * File-based journal implementation. A directory specified as directory * bean property will contain log files and a global revision file, containing the - * latest revision file. When the current log file's size exceeds maxSize + * next available revision. When the current log file's size exceeds maxSize * bytes, it gets renamed to its name appended by '1'. At the same time, all log files * already having a version counter, get their version counter incremented by 1. *

* It is configured through the following properties: *

    - *
  • directory: the shared directory where journal logs and read from - * and written to; this is a required property with no default value
  • *
  • revision: the filename where the parent cluster node's revision * file should be written to; this is a required property with no default value
  • + *
  • directory: the shared directory where journal logs and read from + * and written to; this is a required property with no default value
  • *
  • basename: this is the basename of the journal logs created in * the shared directory; its default value is journal
  • *
  • maximumSize: this is the maximum size in bytes of a journal log * before a new log will be created; its default value is 1048576 (1MB)
  • *
+ *

+ * Technically, the global revision file contains the cumulated file position, i.e. if + * there are N journal files, with file lengths L[1]... + * L[N] (excluding the size of the file headers), then the global revision + * will be L[1]+...+L[N]. * * todo after some iterations, old files should be automatically compressed to save space */ -public class FileJournal implements Journal { +public class FileJournal extends AbstractJournal { /** * Global revision counter name, located in the journal directory. @@ -97,31 +82,11 @@ private static Logger log = LoggerFactory.getLogger(FileJournal.class); /** - * Journal id. - */ - private String id; - - /** - * Namespace resolver used to map prefixes to URIs and vice-versa. - */ - private NamespaceResolver resolver; - - /** - * Record processor. - */ - private RecordProcessor processor; - - /** * Directory name, bean property. */ private String directory; /** - * Revision file name, bean property. - */ - private String revision; - - /** * Journal file base name, bean property. */ private String basename; @@ -142,39 +107,14 @@ private File journal; /** - * Instance counter. - */ - private FileRevision instanceRevision; - - /** - * Global journal counter. + * Global revision counter. */ private FileRevision globalRevision; /** - * Mutex used when writing journal. - */ - private final Mutex writeMutex = new Mutex(); - - /** - * Current temporary journal log. + * Id as byte array. */ - private File tempLog; - - /** - * Current file record output. - */ - private FileRecordOutput out; - - /** - * Current file record. - */ - private FileRecord record; - - /** - * Last used session for event sources. - */ - private Session lastSession; + private byte[] rawId; /** * Bean getter for journal directory. @@ -193,22 +133,6 @@ } /** - * Bean getter for revision file. - * @return revision file - */ - public String getRevision() { - return revision; - } - - /** - * Bean setter for journal directory. - * @param revision directory used for journaling - */ - public void setRevision(String revision) { - this.revision = revision; - } - - /** * Bean getter for base name. * @return base name */ @@ -243,19 +167,15 @@ /** * {@inheritDoc} */ - public void init(String id, RecordProcessor processor, NamespaceResolver resolver) throws JournalException { - this.id = id; - this.resolver = resolver; - this.processor = processor; + public void init(String id, RecordProcessor processor, NamespaceResolver resolver) + throws JournalException { + + super.init(id, processor, resolver); if (directory == null) { String msg = "Directory not specified."; throw new JournalException(msg); } - if (revision == null) { - String msg = "Revision not specified."; - throw new JournalException(msg); - } if (basename == null) { basename = DEFAULT_BASENAME; } @@ -267,9 +187,14 @@ String msg = "Directory specified does either not exist or is not a directory: " + directory; throw new JournalException(msg); } - journal = new File(root, basename + "." + LOG_EXTENSION); + try { + rawId = toRawId(id); + } catch (IOException e) { + String msg = "Unable to convert '" + id + "' to its binary representation."; + throw new JournalException(msg, e); + } - instanceRevision = new FileRevision(new File(revision)); + journal = new File(root, basename + "." + LOG_EXTENSION); globalRevision = new FileRevision(new File(root, REVISION_NAME)); log.info("FileJournal initialized at path: " + directory); @@ -292,35 +217,28 @@ } }); - long instanceValue = instanceRevision.get(); + long instanceValue = getLocalRevision(); long globalValue = globalRevision.get(); if (instanceValue < globalValue) { - FileRecordCursor cursor = new FileRecordCursor(logFiles, - instanceValue, globalValue); + FileRecordCursor cursor = new FileRecordCursor(logFiles, instanceValue, globalValue); try { while (cursor.hasNext()) { FileRecord record = cursor.next(); - if (!record.getCreator().equals(id)) { + if (!Arrays.equals(rawId, record.getCreator())) { process(record); } else { log.info("Log entry matches journal id, skipped: " + record.getRevision()); } - instanceRevision.set(record.getNextRevision()); + setLocalRevision(record.getNextRevision()); } } catch (IOException e) { - String msg = "Unable to iterate over modified records: " + e.getMessage(); + String msg = "Unable to iterate over modified records."; throw new JournalException(msg, e); - } finally { - try { - cursor.close(); - } catch (IOException e) { - String msg = "I/O error while closing record cursor: " + e.getMessage(); - log.warn(msg); - } + cursor.close(); } - log.info("Sync finished, instance revision is: " + instanceRevision.get()); + log.info("Sync finished, instance revision is: " + getLocalRevision()); } } @@ -330,89 +248,11 @@ * @param record record to process * @throws JournalException if an error occurs */ - void process(FileRecord record) throws JournalException { - log.info("Processing revision: " + record.getRevision()); - - FileRecordInput in = record.getInput(resolver); - String workspace = null; + private void process(FileRecord record) throws JournalException { + RecordInput in = record.getInput(resolver); try { - workspace = in.readString(); - processor.start(workspace); - - for (;;) { - char c = in.readChar(); - if (c == '\0') { - break; - } - if (c == 'N') { - NodeOperation operation = NodeOperation.create(in.readByte()); - operation.setId(in.readNodeId()); - processor.process(operation); - } else if (c == 'P') { - PropertyOperation operation = PropertyOperation.create(in.readByte()); - operation.setId(in.readPropertyId()); - processor.process(operation); - } else if (c == 'E') { - int type = in.readByte(); - NodeId parentId = in.readNodeId(); - Path parentPath = in.readPath(); - NodeId childId = in.readNodeId(); - Path.PathElement childRelPath = in.readPathElement(); - QName ntName = in.readQName(); - - Set mixins = new HashSet(); - int mixinCount = in.readInt(); - for (int i = 0; i < mixinCount; i++) { - mixins.add(in.readQName()); - } - String userId = in.readString(); - processor.process(createEventState(type, parentId, parentPath, childId, - childRelPath, ntName, mixins, userId)); - } else if (c == 'L') { - NodeId nodeId = in.readNodeId(); - boolean isLock = in.readBoolean(); - if (isLock) { - boolean isDeep = in.readBoolean(); - String owner = in.readString(); - processor.process(nodeId, isDeep, owner); - } else { - processor.process(nodeId); - } - } else if (c == 'S') { - String oldPrefix = in.readString(); - String newPrefix = in.readString(); - String uri = in.readString(); - processor.process(oldPrefix, newPrefix, uri); - } else if (c == 'T') { - int size = in.readInt(); - HashSet ntDefs = new HashSet(); - for (int i = 0; i < size; i++) { - ntDefs.add(in.readNodeTypeDef()); - } - processor.process(ntDefs); - } else { - throw new IllegalArgumentException("Unknown entry type: " + c); - } - } - processor.end(); - - } catch (NameException e) { - String msg = "Unable to read revision " + record.getRevision() + - ": " + e.getMessage(); - throw new JournalException(msg); - } catch (ParseException e) { - String msg = "Unable to read revision " + record.getRevision() + - ": " + e.getMessage(); - throw new JournalException(msg); - } catch (IOException e) { - String msg = "Unable to read revision " + record.getRevision() + - ": " + e.getMessage(); - throw new JournalException(msg); - } catch (IllegalArgumentException e) { - String msg = "Error while processing revision " + - record.getRevision() + ": " + e.getMessage(); - throw new JournalException(msg); + process(record.getRevision(), in); } finally { in.close(); } @@ -421,286 +261,37 @@ /** * {@inheritDoc} */ - public void begin(String workspace) throws JournalException { - try { - writeMutex.acquire(); - } catch (InterruptedException e) { - String msg = "Interrupted while waiting for write lock."; - throw new JournalException(msg); - } - - boolean succeeded = false; - - try { - sync(); - - tempLog = File.createTempFile("journal", ".tmp", root); - - record = new FileRecord(id, tempLog); - out = record.getOutput(resolver); - out.writeString(workspace); - - succeeded = true; - } catch (IOException e) { - String msg = "Unable to create journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } finally { - if (!succeeded) { - writeMutex.release(); - } - } - } - - /** - * {@inheritDoc} - */ - public void log(ChangeLog changeLog, EventStateCollection esc) throws JournalException { - Iterator addedStates = changeLog.addedStates(); - while (addedStates.hasNext()) { - ItemState state = (ItemState) addedStates.next(); - if (state.isNode()) { - log(NodeAddedOperation.create((NodeState) state)); - } else { - log(PropertyAddedOperation.create((PropertyState) state)); - } - } - Iterator modifiedStates = changeLog.modifiedStates(); - while (modifiedStates.hasNext()) { - ItemState state = (ItemState) modifiedStates.next(); - if (state.isNode()) { - log(NodeModifiedOperation.create((NodeState) state)); - } else { - log(PropertyModifiedOperation.create((PropertyState) state)); - } - } - Iterator deletedStates = changeLog.deletedStates(); - while (deletedStates.hasNext()) { - ItemState state = (ItemState) deletedStates.next(); - if (state.isNode()) { - log(NodeDeletedOperation.create((NodeState) state)); - } else { - log(PropertyDeletedOperation.create((PropertyState) state)); - } - } - - Iterator events = esc.getEvents().iterator(); - while (events.hasNext()) { - EventState event = (EventState) events.next(); - log(event); - } - } - - /** - * {@inheritDoc} - */ - public void log(String oldPrefix, String newPrefix, String uri) throws JournalException { - try { - out.writeChar('S'); - out.writeString(oldPrefix); - out.writeString(newPrefix); - out.writeString(uri); - } catch (IOException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } - } - - /** - * {@inheritDoc} - */ - public void log(NodeId nodeId, boolean isDeep, String owner) throws JournalException { - log(nodeId, true, isDeep, owner); - } - - /** - * {@inheritDoc} - */ - public void log(NodeId nodeId) throws JournalException { - log(nodeId, false, false, null); - } - - /** - * {@inheritDoc} - */ - public void log(Collection ntDefs) throws JournalException { - try { - out.writeChar('T'); - out.writeInt(ntDefs.size()); - - Iterator iter = ntDefs.iterator(); - while (iter.hasNext()) { - out.writeNodeTypeDef((NodeTypeDef) iter.next()); - } - } catch (IOException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } - - } - - /** - * Log a property operation. - * - * @param operation property operation - */ - protected void log(PropertyOperation operation) throws JournalException { - try { - out.writeChar('P'); - out.writeByte(operation.getOperationType()); - out.writePropertyId(operation.getId()); - } catch (NoPrefixDeclaredException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } catch (IOException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } - } - - /** - * Log a node operation. - * - * @param operation node operation - */ - protected void log(NodeOperation operation) throws JournalException { - try { - out.writeChar('N'); - out.writeByte(operation.getOperationType()); - out.writeNodeId(operation.getId()); - } catch (IOException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } - } - - /** - * Log an event. Subclass responsibility. - * - * @param event event to log - */ - protected void log(EventState event) throws JournalException { - try { - out.writeChar('E'); - out.writeByte(event.getType()); - out.writeNodeId(event.getParentId()); - out.writePath(event.getParentPath()); - out.writeNodeId(event.getChildId()); - out.writePathElement(event.getChildRelPath()); - out.writeQName(event.getNodeType()); - - Set mixins = event.getMixinNames(); - out.writeInt(mixins.size()); - Iterator iter = mixins.iterator(); - while (iter.hasNext()) { - out.writeQName((QName) iter.next()); - } - out.writeString(event.getUserId()); - } catch (NoPrefixDeclaredException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } catch (IOException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } - } - - /** - * Log either a lock or an unlock operation. - * - * @param nodeId node id - * @param isLock true if this is a lock; - * false if this is an unlock - * @param isDeep flag indicating whether lock is deep - * @param owner lock owner - */ - protected void log(NodeId nodeId, boolean isLock, boolean isDeep, String owner) - throws JournalException { - - try { - out.writeChar('L'); - out.writeNodeId(nodeId); - out.writeBoolean(isLock); - if (isLock) { - out.writeBoolean(isDeep); - out.writeString(owner); - } - } catch (IOException e) { - String msg = "Unable to write to journal log " + tempLog + ": " + e.getMessage(); - throw new JournalException(msg); - } + protected long lockRevision() throws JournalException { + globalRevision.lock(false); + return globalRevision.get(); } /** * {@inheritDoc} */ - public void prepare() throws JournalException { - globalRevision.lock(false); - - boolean prepared = false; - - try { - sync(); - - record.setRevision(globalRevision.get()); - - prepared = true; - } finally { - if (!prepared) { - globalRevision.unlock(); - writeMutex.release(); - } - } + protected void unlockRevision(boolean successful) { + globalRevision.unlock(); } /** * {@inheritDoc} */ - public void commit() throws JournalException { + protected void append(long revision, File record) throws JournalException { try { - out.writeChar('\0'); - out.close(); - - long nextRevision = record.getNextRevision(); - FileRecordLog recordLog = new FileRecordLog(journal); if (!recordLog.isNew()) { - if (nextRevision - recordLog.getFirstRevision() > maximumSize) { + if (revision - recordLog.getFirstRevision() > maximumSize) { switchLogs(); recordLog = new FileRecordLog(journal); } } - recordLog.append(record); - - tempLog.delete(); + long nextRevision = recordLog.append(revision, rawId, record); globalRevision.set(nextRevision); - instanceRevision.set(nextRevision); + setLocalRevision(nextRevision); } catch (IOException e) { - String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage(); + String msg = "Unable to append new record to journal " + journal + ": " + e.getMessage(); throw new JournalException(msg); - } finally { - out = null; - globalRevision.unlock(); - writeMutex.release(); - } - } - - /** - * {@inheritDoc} - */ - public void cancel() { - if (out != null) { - try { - out.close(); - tempLog.delete(); - } catch (IOException e) { - String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage(); - log.warn(msg); - } finally { - out = null; - globalRevision.unlock(); - writeMutex.release(); - } } } @@ -710,57 +301,6 @@ public void close() {} /** - * Create an event state. - * - * @param type event type - * @param parentId parent id - * @param parentPath parent path - * @param childId child id - * @param childRelPath child relative path - * @param ntName ndoe type name - * @param userId user id - * @return event - */ - protected EventState createEventState(int type, NodeId parentId, Path parentPath, - NodeId childId, Path.PathElement childRelPath, - QName ntName, Set mixins, String userId) { - switch (type) { - case Event.NODE_ADDED: - return EventState.childNodeAdded(parentId, parentPath, childId, childRelPath, - ntName, mixins, getOrCreateSession(userId)); - case Event.NODE_REMOVED: - return EventState.childNodeRemoved(parentId, parentPath, childId, childRelPath, - ntName, mixins, getOrCreateSession(userId)); - case Event.PROPERTY_ADDED: - return EventState.propertyAdded(parentId, parentPath, childRelPath, - ntName, mixins, getOrCreateSession(userId)); - case Event.PROPERTY_CHANGED: - return EventState.propertyChanged(parentId, parentPath, childRelPath, - ntName, mixins, getOrCreateSession(userId)); - case Event.PROPERTY_REMOVED: - return EventState.propertyRemoved(parentId, parentPath, childRelPath, - ntName, mixins, getOrCreateSession(userId)); - default: - String msg = "Unexpected event type: " + type; - throw new IllegalArgumentException(msg); - } - } - - - /** - * Return a session matching a certain user id. - * - * @param userId user id - * @return session - */ - protected Session getOrCreateSession(String userId) { - if (lastSession == null || !lastSession.getUserID().equals(userId)) { - lastSession = new ClusterSession(userId); - } - return lastSession; - } - - /** * Move away current journal file (and all other files), incrementing their * version counter. A file named journal.N.log gets renamed to * journal.(N+1).log, whereas the main journal file gets renamed @@ -800,5 +340,23 @@ } } } + } + + /** + * Convert an id given as string, to its raw form, i.e. to its binary + * representation, encoded as UTF-8. + * + * @throws IOException if an I/O error occurs, which is very unlikely. + */ + private static byte[] toRawId(String id) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + dos.writeUTF(id); + dos.close(); + + byte[] b = bos.toByteArray(); + byte[] rawId = new byte[b.length - 2]; + System.arraycopy(b, 2, rawId, 0, rawId.length); + return rawId; } } Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java?view=diff&rev=495239&r1=495238&r2=495239 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java Thu Jan 11 06:40:00 2007 @@ -19,108 +19,52 @@ import org.apache.jackrabbit.name.NamespaceResolver; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.DataInput; -import java.io.File; -import java.io.FileOutputStream; -import java.io.BufferedInputStream; -import java.io.FileInputStream; /** - * Represents a file-based record. Physically, a file record contains its length in the - * first 4 bytes, immediately followed by its creator in a length-prefixed, UTF-encoded - * string. All further fields are record-specific. + * Represents a file-based record. Physically, a file record starts with its creator + * in a length-prefixed, UTF-encoded string, followed by a 4 byte indicating the + * length of data. All further fields are record-specific. */ class FileRecord { /** - * Indicator for a literal UUID. + * Record creator. */ - static final byte UUID_LITERAL = 'L'; - - /** - * Indicator for a UUID index. - */ - static final byte UUID_INDEX = 'I'; - - /** - * Revision. - */ - private long revision; - - /** - * Underlying input stream. - */ - private DataInputStream in; - - /** - * File use when creating a new record. - */ - private File file; - - /** - * Underlying output stream. - */ - private DataOutputStream out; + //private final String creator; + private final byte[] creator; /** * Record length. */ - private int length; + private final int length; /** - * Creator of a record. + * Input stream associated with record data. */ - private String creator; + private final DataInputStream dataIn; /** - * Bytes used by creator when written in UTF encoding and length-prefixed. + * Revision. */ - private int creatorLength; + private long revision; /** - * Flag indicating whether bytes need to be skipped at the end. + * Flag indicating whether the data associated with this record has been consumed. */ private boolean consumed; /** - * Creates a new file record. Used when opening an existing record. - * - * @param revision revision this record represents - * @param in underlying input stream - * @throws IOException if reading the creator fails - */ - public FileRecord(long revision, InputStream in) - throws IOException { - - this.revision = revision; - if (in instanceof DataInputStream) { - this.in = (DataInputStream) in; - } else { - this.in = new DataInputStream(in); - } - this.length = this.in.readInt(); - - readCreator(); - } - - /** - * Creates a new file record. Used when creating a new record. + * Creates a new instance of this class. Used when opening an existing record. * * @param creator creator of this record - * @param file underlying (temporary) file - * @throws IOException if writing the creator fails + * @param length record length + * @param dataIn input stream containing record data */ - public FileRecord(String creator, File file) throws IOException { - + public FileRecord(byte[] creator, int length, DataInputStream dataIn) { this.creator = creator; - this.file = file; - - this.out = new DataOutputStream(new FileOutputStream(file)); - - writeCreator(); + this.length = length; + this.dataIn = dataIn; } /** @@ -133,21 +77,24 @@ } /** - * Set the journal revision associated with this record. + * Set the journal revision associated with this record. Called after creation + * of the file record. * - * @param revision journal revision + * @param revision revision */ - public void setRevision(long revision) { + void setRevision(long revision) { this.revision = revision; } /** - * Return the journal counter associated with the next record. + * Return the journal counter associated with the next record. A file record's + * size is the size of the length-prefixed creator string plus the size of + * the length-prefixed data. * * @return next revision */ public long getNextRevision() { - return revision + length + 4; + return revision + FileRecordLog.getRecordSize(creator, length); } /** @@ -155,7 +102,7 @@ * * @return creator */ - public String getCreator() { + public byte[] getCreator() { return creator; } @@ -165,41 +112,9 @@ * @param resolver resolver to use when mapping prefixes to full names * @return record input */ - public FileRecordInput getInput(NamespaceResolver resolver) { + public RecordInput getInput(NamespaceResolver resolver) { consumed = true; - return new FileRecordInput(in, resolver); - } - - /** - * Return an output on this record. - * - * @param resolver resolver to use when mapping full names to prefixes - * @return record output - */ - public FileRecordOutput getOutput(NamespaceResolver resolver) { - return new FileRecordOutput(this, out, resolver); - } - - /** - * Append this record to some output stream. - * - * @param out outputstream to append to - */ - void append(DataOutputStream out) throws IOException { - out.writeInt(length); - - byte[] buffer = new byte[8192]; - int len; - - InputStream in = new BufferedInputStream(new FileInputStream(file)); - try { - while ((len = in.read(buffer)) > 0) { - out.write(buffer, 0, len); - } - out.flush(); - } finally { - in.close(); - } + return new RecordInput(dataIn, resolver); } /** @@ -208,11 +123,11 @@ * * @throws IOException if an I/O error occurs */ - void skip() throws IOException { + public void skip() throws IOException { if (!consumed) { - long skiplen = length - creatorLength; + long skiplen = length; while (skiplen > 0) { - long skipped = in.skip(skiplen); + long skipped = dataIn.skip(skiplen); if (skipped <= 0) { break; } @@ -222,158 +137,6 @@ String msg = "Unable to skip remaining bytes."; throw new IOException(msg); } - } - } - - /** - * Invoked when output has been closed. - */ - void closed() { - length = (int) file.length(); - } - - /** - * Read creator from the underlying data input stream. - * - * @throws IOException if an I/O error occurs - */ - private void readCreator() throws IOException { - UTFByteCounter counter = new UTFByteCounter(in); - creator = DataInputStream.readUTF(counter); - creatorLength = counter.getBytes(); - } - - /** - * Write creator to the underlying data output stream. - * - * @throws IOException if an I/O error occurs - */ - private void writeCreator() throws IOException { - out.writeUTF(creator); - } - - /** - * UTF byte counter. Counts the bytes actually read from a given - * DataInputStream that make up a UTF-encoded string. - */ - static class UTFByteCounter implements DataInput { - - /** - * Underlying input stream. - */ - private final DataInputStream in; - - /** - * UTF length. - */ - private int bytes; - - /** - * Create a new instance of this class. - * - * @param in underlying data input stream - */ - public UTFByteCounter(DataInputStream in) { - this.in = in; - } - - /** - * Return the number of bytes read from the underlying input stream. - * - * @return number of bytes - */ - public int getBytes() { - return bytes; - } - - /** - * @see java.io.DataInputStream#readUnsignedShort() - * - * Remember number of bytes read. - */ - public int readUnsignedShort() throws IOException { - try { - return in.readUnsignedShort(); - } finally { - bytes += 2; - } - } - - /** - * @see java.io.DataInputStream#readUnsignedShort() - * - * Remember number of bytes read. - */ - public void readFully(byte b[]) throws IOException { - try { - in.readFully(b); - } finally { - bytes += b.length; - } - } - - /** - * @see java.io.DataInputStream#readUnsignedShort() - * - * Remember number of bytes read. - */ - public void readFully(byte b[], int off, int len) throws IOException { - try { - in.readFully(b, off, len); - } finally { - bytes += b.length; - } - } - - /** - * Methods not implemented. - */ - public byte readByte() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public char readChar() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public double readDouble() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public float readFloat() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public int readInt() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public int readUnsignedByte() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public long readLong() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public short readShort() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public boolean readBoolean() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public int skipBytes(int n) throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public String readLine() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); - } - - public String readUTF() throws IOException { - throw new IllegalStateException("Unexpected call, deliberately not implemented."); } } } Modified: jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java?view=diff&rev=495239&r1=495238&r2=495239 ============================================================================== --- jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java (original) +++ jackrabbit/trunk/jackrabbit-core/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java Thu Jan 11 06:40:00 2007 @@ -96,9 +96,9 @@ } if (recordLog == null) { recordLog = getRecordLog(nextRevision); - recordLog.seek(nextRevision); } - record = new FileRecord(nextRevision, recordLog.getInputStream()); + record = recordLog.read(); + record.setRevision(nextRevision); nextRevision = record.getNextRevision(); return record; } @@ -114,6 +114,7 @@ for (int i = 0; i < logFiles.length; i++) { FileRecordLog recordLog = new FileRecordLog(logFiles[i]); if (recordLog.contains(revision)) { + recordLog.seek(revision); return recordLog; } } @@ -123,10 +124,8 @@ /** * Close this cursor, releasing its resources. - * - * @throws IOException if an I/O error occurs */ - public void close() throws IOException { + public void close() { if (recordLog != null) { recordLog.close(); }