Return-Path: X-Original-To: apmail-directory-commits-archive@www.apache.org Delivered-To: apmail-directory-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC7D27479 for ; Mon, 10 Oct 2011 19:40:08 +0000 (UTC) Received: (qmail 85935 invoked by uid 500); 10 Oct 2011 19:40:08 -0000 Delivered-To: apmail-directory-commits-archive@directory.apache.org Received: (qmail 85881 invoked by uid 500); 10 Oct 2011 19:40:08 -0000 Mailing-List: contact commits-help@directory.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@directory.apache.org Delivered-To: mailing list commits@directory.apache.org Received: (qmail 85874 invoked by uid 99); 10 Oct 2011 19:40:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2011 19:40:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2011 19:40:01 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1D4F12388ACC for ; Mon, 10 Oct 2011 19:39:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1181167 [2/2] - in /directory/apacheds/branches/apacheds-txns: core-api/src/main/java/org/apache/directory/server/core/log/ core-api/src/main/java/org/apache/directory/server/core/txn/ core-api/src/main/java/org/apache/directory/server/cor... Date: Mon, 10 Oct 2011 19:39:37 -0000 To: commits@directory.apache.org From: saya@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111010193939.1D4F12388ACC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,497 @@ + +package org.apache.directory.server.core.log; + +import java.nio.ByteBuffer; + +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Lock; + +import java.io.IOException; +import java.io.FileNotFoundException; +import java.io.EOFException; + +import org.apache.directory.server.i18n.I18n; + +class LogManager +{ + + /** Controlfile record size */ + private final static int CONTROLFILE_RECORD_SIZE = 44; + + /** Controlfile file magic number */ + private final static int CONTROLFILE_MAGIC_NUMBER = 0xFF11FF11; + + /** Controlfile log file number */ + private final static long CONTROLFILE_LOG_FILE_NUMBER = -1; + + /** Shadow Controlfile log file number */ + private final static long CONTROLFILE_SHADOW_LOG_FILE_NUMBER = -2; + + /** buffer used to do IO on controlfile */ + byte controlFileBuffer[] = new byte[CONTROLFILE_RECORD_SIZE]; + + /** ByteBuffer used to to IO on checkpoint file */ + ByteBuffer controlFileMarker = ByteBuffer.wrap( controlFileBuffer ); + + /** Current checkpoint record in memory */ + ControlFileRecord controlFileRecord = new ControlFileRecord(); + + /** Min neeeded point in the log */ + LogAnchor minLogAnchor = new LogAnchor(); + + /** Protects minLogAchor */ + Lock minLogAnchorLock = new ReentrantLock(); + + /** Log file manager */ + LogFileManager logFileManager; + + /** Log Anchor comparator */ + LogAnchorComparator anchorComparator = new LogAnchorComparator(); + + /** Current log file */ + private long currentLogFileNumber; + + /** Buffer used to read log file markers */ + byte markerBuffer[] = new byte[LogFileRecords.LOG_FILE_HEADER_SIZE]; + + /** ByteBuffer wrapper for the marker buffer */ + ByteBuffer markerHead = ByteBuffer.wrap( markerBuffer ); + + + public LogManager( LogFileManager logFileManager ) + { + this.logFileManager = logFileManager; + } + + /** + *Initializes the log management: + * 1) Checks if control file exists and creates it if necesssary. If it exists, it reads it and loads the latest checkpoint. + * 2) Starts from the lates checkpoint ans scans forwards the logs to check for corrupted logs and determine the end of the log. + * This scan ends either when a properly ended log file is found or a partially written log record is found. + * + * @throws IOException + * @throws InvalidLogException + */ + public void initLogManager() throws IOException, InvalidLogException + { + LogAnchor scanPoint = new LogAnchor(); + LogScannerInternal scanner; + UserLogRecord logRecord; + LogFileManager.LogFileReader reader; + + + + // Read and verify control file + boolean controlFileExists = true; + try + { + this.readControlFile(); + } + catch( FileNotFoundException e ) + { + controlFileExists = false; + } + + if ( controlFileExists ) + { + boolean invalidLog = false; + + // Set the min log anchor from the control file + minLogAnchor.resetLogAnchor( controlFileRecord.minNeededLogFile, + controlFileRecord.minNeededLogFileOffset, controlFileRecord.minNeededLSN ); + + scanPoint.resetLogAnchor( minLogAnchor ); + + logRecord = new UserLogRecord(); + scanner = new DefaultLogScanner(); + scanner.init( scanPoint, logFileManager ); + + try + { + while ( scanner.getNextRecord( logRecord ) ) + { + // No need to do anything with the log record + } + } + catch( InvalidLogException e ) + { + invalidLog = true; + } + finally + { + scanner.close(); + } + + long lastGoodLogFileNumber = scanner.getLastGoodFileNumber(); + long lastGoodLogFileOffset = scanner.getLastGoodOffset(); + currentLogFileNumber = lastGoodLogFileNumber; + + if ( ( lastGoodLogFileNumber < LogAnchor.MIN_LOG_NUMBER ) || + ( lastGoodLogFileOffset < LogAnchor.MIN_LOG_OFFSET )) + { + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); + } + + scanPoint.resetLogAnchor( lastGoodLogFileNumber, lastGoodLogFileOffset, + LogAnchor.UNKNOWN_LSN ); + + if ( anchorComparator.compare( scanPoint, minLogAnchor ) < 0 ) + { + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); + } + + /* + * If invalid content at the end of file: + * if we are past the header of file, then + * truncate the file to the end of the last + * read log record, otherwise we read a partially + * written log file header, in this case reformat the log file. + * Also check next for the existence of next file to make + * sure we really read the last log file. + */ + if ( invalidLog ) + { + // Check if next log file exists + reader = null; + try + { + reader = logFileManager.getReaderForLogFile( ( lastGoodLogFileNumber + 1 ) ); + + } + catch ( FileNotFoundException e ) + { + // Fine, this is what we want + } + finally + { + if ( reader != null ) + { + reader.close(); + } + } + + if ( reader != null ) + { + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); + } + + if ( lastGoodLogFileOffset >= LogFileRecords.LOG_FILE_HEADER_SIZE ) + { + logFileManager.truncateLogFile( lastGoodLogFileNumber, lastGoodLogFileOffset ); + } + else + { + // Reformat the existing log file + this.createNextLogFile( true); + } + } + + } + { + /* + * Control file does not exist. Either we are at the very beginning or + * maybe we crashed in the middle of creating the first log file. + * We should have the min log file at most with the file header formatted. + */ + reader = null; + boolean fileExists = false; + currentLogFileNumber = LogAnchor.MIN_LOG_NUMBER - 1; + try + { + reader = logFileManager.getReaderForLogFile( LogAnchor.MIN_LOG_NUMBER ); + + if ( reader.getLength() > LogFileRecords.LOG_FILE_HEADER_SIZE ) + { + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); + } + fileExists = true; + currentLogFileNumber++; + } + catch ( FileNotFoundException e ) + { + // Fine, we will create the file + } + finally + { + if ( reader != null ) + { + reader.close(); + } + } + + + + this.createNextLogFile( fileExists ); + + // Prepare the min log anchor and control file and write the control file + minLogAnchor.resetLogAnchor( LogAnchor.MIN_LOG_NUMBER, LogFileRecords.LOG_FILE_HEADER_SIZE, LogAnchor.UNKNOWN_LSN ); + + this.writeControlFile(); + } + } + + /** + * Called by LogFlushManager to switch to the next file. + * + * Note:Currently we do a checkpoint and delete unnecessary log files when we switch to a new file. Some + * of this tasks can be delegated to a background thread later. + * + * @param currentWriter current log file used by the flush manager. Null if the flush manager is just starting up. + * @return new lgo file to be used. + * @throws IOException + * @throws InvalidLogException + */ + public LogFileManager.LogFileWriter switchToNextLogFile( LogFileManager.LogFileWriter currentWriter ) throws IOException, InvalidLogException + { + if ( currentWriter != null ) + { + currentWriter.close(); + this.writeControlFile(); + this.createNextLogFile( false ); + } + + LogFileManager.LogFileWriter writer = logFileManager.getWriterForLogFile( this.currentLogFileNumber ); + long currentOffset = writer.getLength(); + if ( currentOffset > 0 ) + { + writer.seek( currentOffset ); + } + return writer; + } + + /** + * Called when the logging subsystem is notified about the minimum position + * in the log files that is needed. Log manager uses this information to advance + * its checkpoint and delete unnecessary log files. + * + * @param newLogAnchor min needed log anchor + */ + public void advanceMinLogAnchor( LogAnchor newLogAnchor ) + { + if ( newLogAnchor == null ) + { + return; + } + + minLogAnchorLock.lock(); + + if ( anchorComparator.compare( minLogAnchor, newLogAnchor ) < 0 ) + { + minLogAnchor.resetLogAnchor( newLogAnchor ); + } + + minLogAnchorLock.unlock(); + } + + /** + * Writes the control file. To make paritally written control files unlikely, + * data is first written to a shadow file and then moved(renamed) to the controlfile. + * Move of a file is atomic in POSIX systems, in GFS like file systems(in HDFS for example). + * On windows, it is not always atomic but atomic versions of rename operations started to + * appear in their recent file systems. + * + * @throws IOException + */ + private void writeControlFile() throws IOException + { + // Copy the min log file anchor + minLogAnchorLock.lock(); + + controlFileRecord.minNeededLogFile = minLogAnchor.getLogFileNumber(); + controlFileRecord.minNeededLogFileOffset = minLogAnchor.getLogFileOffset(); + controlFileRecord.minNeededLSN = minLogAnchor.getLogLSN(); + + minLogAnchorLock.unlock(); + + if ( controlFileRecord.minNeededLogFile > controlFileRecord.minExistingLogFile ) + { + this.deleteUnnecessaryLogFiles( controlFileRecord.minExistingLogFile,controlFileRecord.minNeededLogFile ); + controlFileRecord.minExistingLogFile = controlFileRecord.minNeededLogFile; + + } + + // TODO compute checksum for log record here + + + controlFileMarker.rewind(); + controlFileMarker.putLong( controlFileRecord.minExistingLogFile ); + controlFileMarker.putLong( controlFileRecord.minNeededLogFile ); + controlFileMarker.putLong( controlFileRecord.minNeededLogFileOffset ); + controlFileMarker.putLong( controlFileRecord.minNeededLSN ); + controlFileMarker.putLong( controlFileRecord.checksum ); + controlFileMarker.putInt( CONTROLFILE_MAGIC_NUMBER ); + + + boolean shadowFileExists = logFileManager.createLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER ); + + if ( shadowFileExists ) + { + logFileManager.truncateLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER, 0 ); + } + + LogFileManager.LogFileWriter controlFileWriter = logFileManager.getWriterForLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER ); + + try + { + controlFileWriter.append( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE); + controlFileWriter.sync(); + } + finally + { + controlFileWriter.close(); + } + + // Do the move now + logFileManager.rename( CONTROLFILE_SHADOW_LOG_FILE_NUMBER , CONTROLFILE_LOG_FILE_NUMBER ); + + + } + + /** + * Read and verifies the control file. + * + * @throws IOException + * @throws InvalidLogException + * @throws FileNotFoundException + */ + private void readControlFile() throws IOException, InvalidLogException, FileNotFoundException + { + boolean invalidControlFile = false; + LogFileManager.LogFileReader controlFileReader = logFileManager.getReaderForLogFile( CONTROLFILE_LOG_FILE_NUMBER ); + + try + { + controlFileReader.read( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE ); + } + catch( EOFException e ) + { + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) , e); + } + finally + { + controlFileReader.close(); + } + + controlFileMarker.rewind(); + controlFileRecord.minExistingLogFile = controlFileMarker.getLong(); + controlFileRecord.minNeededLogFile = controlFileMarker.getLong(); + controlFileRecord.minNeededLogFileOffset = controlFileMarker.getLong(); + controlFileRecord.minNeededLSN = controlFileMarker.getLong(); + controlFileRecord.checksum = controlFileMarker.getLong(); + int magicNumber = controlFileMarker.getInt(); + + + if ( controlFileRecord.minExistingLogFile < LogAnchor.MIN_LOG_NUMBER ) + { + invalidControlFile = true; + } + + if ( (controlFileRecord.minNeededLogFile < LogAnchor.MIN_LOG_NUMBER ) || + ( controlFileRecord.minNeededLogFileOffset < LogAnchor.MIN_LOG_OFFSET ) ) + { + invalidControlFile = true; + } + + if ( controlFileRecord.minExistingLogFile > controlFileRecord.minNeededLogFile ) + { + invalidControlFile = true; + } + + if ( magicNumber != this.CONTROLFILE_MAGIC_NUMBER ) + { + invalidControlFile = true; + } + + // TODO compute and compare checksum + + if ( invalidControlFile == true ) + { + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); + } + + } + + /** + * Creates the next log file. If the log file already exists, then it is reformatted, that is, + * its size is truncated to zero and file header is writtten again. + * + * @param reformatExistingFile log file already exists and should be formatted. If false, log file should not exist. + * @throws IOException + * @throws InvalidLogException + */ + private void createNextLogFile( boolean reformatExistingFile ) throws IOException, InvalidLogException + { + LogFileManager.LogFileWriter writer = null; + + long logFileNumber = this.currentLogFileNumber; + + if ( reformatExistingFile == false ) + { + logFileNumber++; + } + + // Try to create the file. + boolean fileAlreadyExists = logFileManager.createLogFile( logFileNumber ); + + if ( ( reformatExistingFile == false ) && ( fileAlreadyExists == true ) ) + { + // Didnt expect the file to be around + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); + } + + if ( ( reformatExistingFile == true ) && ( fileAlreadyExists == false ) ) + { + // Didnt expect the file to be around + throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); + } + + if ( reformatExistingFile ) + { + logFileManager.truncateLogFile( logFileNumber, LogAnchor.MIN_LOG_OFFSET ); + + } + + writer = logFileManager.getWriterForLogFile( logFileNumber ); + + try + { + markerHead.rewind(); + markerHead.putLong( logFileNumber ); + markerHead.putInt( LogFileRecords.LOG_FILE_HEADER_MAGIC_NUMBER ); + writer.append( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE ); + writer.sync(); + } + finally + { + writer.close(); + } + + this.currentLogFileNumber = logFileNumber; + + } + + private void deleteUnnecessaryLogFiles( long startingLogFileNumber, long endingLogFileNumber ) + { + for ( long logFileNumber = startingLogFileNumber; logFileNumber < endingLogFileNumber; + logFileNumber++ ) + { + // Do a best effort delete + logFileManager.deleteLogFile( logFileNumber ); + } + } + + /** + * Checkpoint record + */ + private class ControlFileRecord + { + long minExistingLogFile; + long minNeededLogFile; + long minNeededLogFileOffset; + long minNeededLSN; + long checksum; + } + + + +} Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,14 @@ + +package org.apache.directory.server.core.log; + + +public interface LogScannerInternal extends LogScanner +{ + /** + * Initializes the scanner + * + * @param startingPoint + * @param logFileManager log file manager to use + */ + public void init( LogAnchor startingPoint, LogFileManager logFileManager ); +} Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,95 @@ + +package org.apache.directory.server.core.txn; + +import java.util.List; +import java.util.ArrayList; + +abstract class AbstractTransaction implements Transaction +{ + /** Logical time(LSN in the wal) when the txn began */ + long startTime; + + /** logical commit time, set when txn commits */ + long commitTime; + + /** State of the transaction */ + State txnState; + + /** List of txns that this txn depends */ + List txnsToCheck = new ArrayList(); + + + public AbstractTransaction( ) + { + txnState = State.INITIAL; + } + + /** + * {@inheritDoc} + */ + public void startTxn( long startTime ) + { + this.startTime = startTime; + this.setState( State.READ ); + } + + /** + * {@inheritDoc} + */ + public long getStartTime() + { + return this.startTime; + } + + /** + * {@inheritDoc} + */ + public void commitTxn( long commitTime ) + { + this.commitTime = commitTime; + this.setState( State.COMMIT ); + } + + /** + * {@inheritDoc} + */ + public long getCommitTime() + { + return commitTime; + } + + /** + * {@inheritDoc} + */ + public void abortTxn() + { + this.setState( State.ABORT ); + } + + + /** + * {@inheritDoc} + */ + public List getTxnsToCheck() + { + return this.txnsToCheck; + } + + /** + * {@inheritDoc} + */ + public State getState() + { + return this.txnState; + } + + /** + * {@inheritDoc} + */ + public void setState( State newState ) + { + this.txnState = newState; + } + + +} \ No newline at end of file Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,95 @@ + +package org.apache.directory.server.core.txn; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +import org.apache.directory.server.core.log.UserLogRecord; +import org.apache.directory.server.core.log.Log; +import org.apache.directory.server.core.log.InvalidLogException; + + + +import org.apache.directory.server.core.txn.logedit.LogEdit; + + +public class DefaultTxnLogManager implements TxnLogManager +{ + /** Write ahea log */ + Log wal; + + /** Txn Manager */ + TxnManagerInternal txnManager; + + public void init( Log logger, TxnManagerInternal txnManager ) + { + this.wal = logger; + this.txnManager = txnManager; + } + /** + * {@inheritDoc} + */ + public void log( LogEdit logEdit, boolean sync ) throws IOException + { + Transaction curTxn = txnManager.getCurTxn(); + + if ( ( curTxn == null ) || ( ! ( curTxn instanceof ReadWriteTxn ) ) ) + { + throw new IllegalStateException( "Trying to log logedit without ReadWriteTxn" ); + } + + ReadWriteTxn txn = (ReadWriteTxn)curTxn; + UserLogRecord logRecord = txn.getUserLogRecord(); + + + ObjectOutputStream out = null; + ByteArrayOutputStream bout = null; + byte[] data; + + try + { + bout = new ByteArrayOutputStream(); + out = new ObjectOutputStream( bout ); + out.writeObject( logEdit ); + out.flush(); + data = bout.toByteArray(); + } + finally + { + if ( bout != null ) + { + bout.close(); + } + + if ( out != null ) + { + out.close(); + } + } + + logRecord.setData( data, data.length ); + + this.log( logRecord, sync ); + + logEdit.getLogAnchor().resetLogAnchor( logRecord.getLogAnchor() ); + txn.getEdits().add( logEdit ); + } + + /** + * {@inheritDoc} + */ + public void log( UserLogRecord logRecord, boolean sync ) throws IOException + { + try + { + wal.log( logRecord, sync ); + } + catch ( InvalidLogException e ) + { + throw new IOException(e); + } + } + + +} Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,413 @@ + +package org.apache.directory.server.core.txn; + +import org.apache.directory.server.core.txn.logedit.TxnStateChange; +import org.apache.directory.server.core.log.LogAnchor; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import java.util.Iterator; +import java.util.List; + + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.directory.server.core.log.UserLogRecord; +import org.apache.directory.server.core.log.LogAnchor; + +import java.io.IOException; + + +public class DefaultTxnManager implements TxnManager, TxnManagerInternal +{ + /** wal log manager */ + TxnLogManager txnLogManager; + + /** List of committed txns in commit LSN order */ + ConcurrentLinkedQueue committedQueue = new ConcurrentLinkedQueue(); + + /** Verify lock under which txn verification is done */ + Lock verifyLock = new ReentrantLock(); + + /** Used to assign start and commit version numbers to writeTxns */ + Lock writeTxnsLock = new ReentrantLock(); + + /** Latest committed txn on which read only txns can depend */ + AtomicReference latestCommittedTxn = new AtomicReference(); + + /** Latest verified write txn */ + AtomicReference latestVerifiedTxn = new AtomicReference(); + + /** Latest flushed txn's logical commit time */ + AtomicLong latestFlushedTxnLSN = new AtomicLong( 0 ); + + /** Per thread txn context */ + static final ThreadLocal < Transaction > txnVar = + new ThreadLocal < Transaction > () + { + @Override + protected Transaction initialValue() + { + return null; + } + }; + + public void init( TxnLogManager txnLogManager ) + { + this.txnLogManager = txnLogManager; + } + + /** + * {@inheritDoc} + */ + public void beginTransaction( boolean readOnly ) throws IOException + { + Transaction curTxn = txnVar.get(); + + if ( curTxn != null ) + { + throw new IllegalStateException("Cannot begin a txn when txn is already running: " + + curTxn); + } + + if ( readOnly ) + { + this.beginReadOnlyTxn(); + } + else + { + this.beginReadWriteTxn(); + } + + } + + /** + * {@inheritDoc} + */ + public void commitTransaction() throws IOException + { + Transaction txn = txnVar.get(); + + if ( txn == null ) + { + throw new IllegalStateException(" trying to commit non existent txn "); + } + + this.prepareForEndingTxn( txn ); + + if ( txn instanceof ReadOnlyTxn ) + { + txn.commitTxn( txn.getStartTime() ); + } + else + { + this.commitReadWriteTxn( (ReadWriteTxn)txn ); + } + + txnVar.set( null ); + + } + + /** + * {@inheritDoc} + */ + public void abortTransaction() throws IOException + { + Transaction txn = txnVar.get(); + + if ( txn == null ) + { + // this is acceptable + return; + } + + this.prepareForEndingTxn( txn ); + + if ( txn instanceof ReadWriteTxn ) + { + this.abortReadWriteTxn( (ReadWriteTxn)txn ); + } + + txn.abortTxn(); + txnVar.set( null ); + } + + /** + * {@inheritDoc} + */ + public Transaction getCurTxn() + { + return txnVar.get(); + } + + private void beginReadOnlyTxn() + { + ReadOnlyTxn txn = new ReadOnlyTxn(); + ReadWriteTxn lastTxnToCheck = null; + + do + { + if ( lastTxnToCheck != null ) + { + lastTxnToCheck.getRefCount().decrementAndGet(); + } + + lastTxnToCheck = latestCommittedTxn.get(); + lastTxnToCheck.getRefCount().getAndIncrement(); + }while ( lastTxnToCheck != latestCommittedTxn.get() ); + + // Determine start time + long startTime; + + if ( lastTxnToCheck != null ) + { + startTime = lastTxnToCheck.getCommitTime(); + } + else + { + startTime = LogAnchor.UNKNOWN_LSN; + } + + this.buildCheckList( txn, lastTxnToCheck ); + txnVar.set( txn ); + } + + private void beginReadWriteTxn() throws IOException + { + long txnID; + + ReadWriteTxn txn = new ReadWriteTxn(); + UserLogRecord logRecord = txn.getUserLogRecord(); + + TxnStateChange txnRecord = new TxnStateChange( LogAnchor.UNKNOWN_LSN, + TxnStateChange.State.TXN_BEGIN ); + ObjectOutputStream out = null; + ByteArrayOutputStream bout = null; + byte[] data; + + try + { + bout = new ByteArrayOutputStream(); + out = new ObjectOutputStream( bout ); + out.writeObject( txnRecord ); + out.flush(); + data = bout.toByteArray(); + } + finally + { + if ( bout != null ) + { + bout.close(); + } + + if ( out != null ) + { + out.close(); + } + } + + + logRecord.setData( data, data.length ); + + ReadWriteTxn lastTxnToCheck = null; + writeTxnsLock.lock(); + + try + { + txnLogManager.log( logRecord, false ); + txn.startTxn( logRecord.getLogAnchor().getLogLSN() ); + + do + { + if ( lastTxnToCheck != null ) + { + lastTxnToCheck.getRefCount().decrementAndGet(); + } + + lastTxnToCheck = latestVerifiedTxn.get(); + lastTxnToCheck.getRefCount().incrementAndGet(); + } while ( lastTxnToCheck != latestVerifiedTxn.get() ); + + } + finally + { + writeTxnsLock.unlock(); + } + + // Finally build the check list + this.buildCheckList( txn, lastTxnToCheck ); + + txnVar.set( txn ); + } + + + + private void buildCheckList( Transaction txn, ReadWriteTxn lastTxnToCheck ) + { + if ( lastTxnToCheck != null ) + { + long lastLSN = lastTxnToCheck.getCommitTime(); + ReadWriteTxn toAdd; + + List toCheckList = txn.getTxnsToCheck(); + Iterator it = committedQueue.iterator(); + while ( it.hasNext() ) + { + toAdd = it.next(); + + if ( toAdd.getCommitTime() > lastLSN ) + { + break; + } + + toCheckList.add( toAdd ); + } + + /* + * Get latest flushed lsn and eliminate already flushed txn from the check list. + */ + long flushedLSN = latestFlushedTxnLSN.get(); + + it = toCheckList.iterator(); + ReadWriteTxn toCheck; + while ( it.hasNext() ) + { + toCheck = it.next(); + if ( toCheck.commitTime <= flushedLSN ) + { + it.remove(); + } + } + + } + + } + + + private void prepareForEndingTxn( Transaction txn ) + { + List toCheck = txn.getTxnsToCheck(); + + if ( toCheck.size() > 0 ) + { + ReadWriteTxn lastTxnToCheck = toCheck.get( toCheck.size() - 1 ); + + if ( lastTxnToCheck.commitTime != txn.getStartTime() ) + { + throw new IllegalStateException( " prepareForEndingTxn: txn has unpexptected start time " + + txn + " expected: " + lastTxnToCheck ); + } + + if ( lastTxnToCheck.getRefCount().get() <= 0 ) + { + throw new IllegalStateException( " prepareForEndingTxn: lastTxnToCheck has unexpected ref cnt " + + txn + " expected: " + lastTxnToCheck ); + } + + lastTxnToCheck.getRefCount().decrementAndGet(); + } + } + + private void commitReadWriteTxn( ReadWriteTxn txn ) throws IOException + { + UserLogRecord logRecord = txn.getUserLogRecord(); + + TxnStateChange txnRecord = new TxnStateChange( txn.getStartTime(), + TxnStateChange.State.TXN_COMMIT ); + ObjectOutputStream out = null; + ByteArrayOutputStream bout = null; + byte[] data; + + try + { + bout = new ByteArrayOutputStream(); + out = new ObjectOutputStream( bout ); + out.writeObject( txnRecord ); + out.flush(); + data = bout.toByteArray(); + } + finally + { + if ( bout != null ) + { + bout.close(); + } + + if ( out != null ) + { + out.close(); + } + } + + logRecord.setData( data, data.length ); + + verifyLock.lock(); + + // TODO verify txn here throw conflict exception if necessary + + + writeTxnsLock.lock(); + try + { + // TODO sync of log can be done outside the locks. + txnLogManager.log( logRecord, true ); + txn.commitTxn( logRecord.getLogAnchor().getLogLSN() ); + + latestVerifiedTxn.set( txn ); + + // TODO when sync is done outside the locks, advance latest commit outside the locks + latestCommittedTxn.set( txn ); + } + finally + { + writeTxnsLock.unlock(); + verifyLock.unlock(); + } + } + + + private void abortReadWriteTxn( ReadWriteTxn txn ) throws IOException + { + UserLogRecord logRecord = txn.getUserLogRecord(); + + TxnStateChange txnRecord = new TxnStateChange( txn.getStartTime(), + TxnStateChange.State.TXN_ABORT ); + ObjectOutputStream out = null; + ByteArrayOutputStream bout = null; + byte[] data; + + try + { + bout = new ByteArrayOutputStream(); + out = new ObjectOutputStream( bout ); + out.writeObject( txnRecord ); + out.flush(); + data = bout.toByteArray(); + } + finally + { + if ( bout != null ) + { + bout.close(); + } + + if ( out != null ) + { + out.close(); + } + } + + logRecord.setData( data, data.length ); + txnLogManager.log( logRecord, false ); + + } + +} Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,7 @@ + +package org.apache.directory.server.core.txn; + +public class ReadOnlyTxn extends AbstractTransaction +{ + +} Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,50 @@ + +package org.apache.directory.server.core.txn; + +import java.util.List; +import java.util.LinkedList; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.directory.server.core.txn.logedit.LogEdit; + +import org.apache.directory.server.core.log.UserLogRecord; + +public class ReadWriteTxn extends AbstractTransaction +{ + /** list of log edits by the txn */ + List logEdits = new LinkedList(); + + /* + * Number of txns that depend on this txn and previous committed + * txns. This number is bumped up only after the txn is committed. + * A txn can be flushed to partitions only after the txn itself is + * committed and ref count becomes zero for all the previously + * committed txns. + */ + AtomicInteger txnRefCount = new AtomicInteger( 0 ); + + /** User record used to communicate data with log manager */ + UserLogRecord logRecord = new UserLogRecord(); + + // TODO add a map of index changes + + + + public AtomicInteger getRefCount() + { + return this.txnRefCount; + } + + public UserLogRecord getUserLogRecord() + { + return this.getUserLogRecord(); + } + + public List getEdits() + { + return logEdits; + } + + +} Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,32 @@ + +package org.apache.directory.server.core.txn; + +import java.util.List; + + +interface Transaction +{ + public List getTxnsToCheck(); + + public long getStartTime(); + + public void startTxn( long startTime ); + + public void commitTxn( long commitTime ); + + public long getCommitTime(); + + public void abortTxn(); + + public State getState(); + + + enum State + { + INITIAL, + READ, + COMMIT, + ABORT + } + +} Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,7 @@ + +package org.apache.directory.server.core.txn; + +public interface TxnManagerInternal extends TxnManager +{ + public Transaction getCurTxn(); +} Added: directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java?rev=1181167&view=auto ============================================================================== --- directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java (added) +++ directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java Mon Oct 10 19:39:36 2011 @@ -0,0 +1,356 @@ + +package org.apache.directory.server.core.log; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class LogFlushScanTest +{ + /** Logger */ + private Log log; + + /** Log buffer size */ + private int logBufferSize = 1 << 12; + + /** Log File Size */ + private long logFileSize = 1 << 13; + + /** log suffix */ + private String logSuffix = "log"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + + private String getLogFoler( ) throws IOException + { + String file = folder.newFolder( "log" ).getAbsolutePath(); + return file; + } + + + @Before + public void setup() throws IOException, InvalidLogException + { + log = new DefaultLog(); + log.init( this.getLogFoler(), logSuffix, logBufferSize, logFileSize ); + } + + + @After + public void teardown() throws IOException + { + + } + + @Test + public void testAppendScan() + { + int idx; + int dataLength = 1024; + UserLogRecord logRecord = new UserLogRecord(); + byte recordData[] = new byte[dataLength]; + byte userRecord[]; + boolean failed = false; + + byte writtenCounter = 0; + byte readCounter = 0; + + LogAnchor startingPoint = new LogAnchor(); + + for ( idx = 0; idx < dataLength; idx++ ) + { + recordData[idx] = writtenCounter; + } + writtenCounter++; + + try + { + logRecord.setData( recordData, dataLength ); + log.log( logRecord, false ); + + // Record the starting point + startingPoint.resetLogAnchor( logRecord.getLogAnchor() ); + + for ( idx = 0; idx < dataLength; idx++ ) + { + recordData[idx] = writtenCounter; + } + writtenCounter++; + + logRecord.setData( recordData, dataLength ); + log.log( logRecord, true ); //Sync what we logged so far + + + LogScanner logScanner = log.beginScan( startingPoint ); + while ( logScanner.getNextRecord( logRecord ) ) + { + userRecord = logRecord.getDataBuffer(); + assertTrue( logRecord.getDataLength() == dataLength ); + + for ( idx = 0; idx < dataLength; idx++ ) + { + assertTrue( userRecord[idx] == readCounter ); + } + + readCounter++; + } + + assertTrue( writtenCounter == readCounter ); + + } + catch( IOException e ) + { + e.printStackTrace(); + failed = true; + } + catch( InvalidLogException e ) + { + e.printStackTrace(); + failed = true; + } + + assertTrue( failed == false ); + } + + @Test + public void testLogSwitchScan() + { + int idx; + int dataLength = 1024; + UserLogRecord logRecord = new UserLogRecord(); + byte recordData[] = new byte[dataLength]; + byte userRecord[]; + boolean failed = false; + + byte writtenCounter = 1; + byte readCounter = 1; + byte maxCounter = 127; + boolean firstRecord = true; + + LogAnchor startingPoint = new LogAnchor(); + LogAnchor endPoint = new LogAnchor(); + + try + { + while ( writtenCounter < maxCounter ) + { + for ( idx = 0; idx < dataLength; idx++ ) + { + recordData[idx] = writtenCounter; + } + + logRecord.setData( recordData, dataLength ); + boolean sync = ( ( writtenCounter % 11 ) == 0 ) || ( writtenCounter == ( maxCounter - 1 ) ); + log.log( logRecord, sync ); + + if ( firstRecord ) + { + // Record the starting point + startingPoint.resetLogAnchor( logRecord.getLogAnchor() ); + firstRecord = false; + } + + if ( ( writtenCounter == ( maxCounter - 1 ) ) ) + { + endPoint.resetLogAnchor( logRecord.getLogAnchor() ); + } + + writtenCounter++; + } + + assertTrue( endPoint.getLogFileNumber() > startingPoint.getLogFileNumber() ); + + LogScanner logScanner = log.beginScan( startingPoint ); + while ( logScanner.getNextRecord( logRecord ) ) + { + userRecord = logRecord.getDataBuffer(); + assertTrue( logRecord.getDataLength() == dataLength ); + + for ( idx = 0; idx < dataLength; idx++ ) + { + assertTrue( userRecord[idx] == readCounter ); + } + + readCounter++; + } + + assertTrue( writtenCounter == readCounter ); + + + } + catch( IOException e ) + { + e.printStackTrace(); + failed = true; + } + catch( InvalidLogException e ) + { + e.printStackTrace(); + failed = true; + } + + } + + @Test + public void testMultiThreadedAppend() throws InterruptedException + { + int idx; + int dataLength = 1024; + UserLogRecord logRecord = new UserLogRecord(); + byte recordData[] = new byte[dataLength]; + byte userRecord[]; + boolean failed = false; + + + + LogAnchor startingPoint = new LogAnchor(); + + for ( idx = 0; idx < dataLength; idx++ ) + { + recordData[idx] = 0; + } + + logRecord.setData( recordData, dataLength ); + + try + { + log.log( logRecord, false ); + } + catch( IOException e ) + { + e.printStackTrace(); + failed = true; + } + catch( InvalidLogException e ) + { + e.printStackTrace(); + failed = true; + } + + assertTrue( failed == false ); + + startingPoint.resetLogAnchor( logRecord.getLogAnchor() ); + + + byte key = 1; + int numThreads = 4; + int numAppends = 64; + int expectedSum = 0; + int sum = 0; + MultiThreadedAppend threads[] = new MultiThreadedAppend[numThreads]; + + for ( idx = 0; idx < numThreads; idx++ ) + { + threads[idx] = new MultiThreadedAppend( key , dataLength, numAppends); + expectedSum += key * numAppends; + } + + for ( idx = 0; idx < numThreads; idx++ ) + { + threads[idx].start(); + } + + for ( idx = 0; idx < numThreads; idx++ ) + { + threads[idx].join(); + } + + + LogScanner logScanner = log.beginScan( startingPoint ); + + try + { + while ( logScanner.getNextRecord( logRecord ) ) + { + userRecord = logRecord.getDataBuffer(); + assertTrue( logRecord.getDataLength() == dataLength ); + key = userRecord[0]; + + for ( idx = 0; idx < dataLength; idx++ ) + { + assertTrue( userRecord[idx] == key ); + } + + sum += key; + } + } + catch( IOException e ) + { + e.printStackTrace(); + failed = true; + } + catch( InvalidLogException e ) + { + e.printStackTrace(); + failed = true; + } + + + assertTrue( sum == expectedSum ); + } + + class MultiThreadedAppend extends Thread + { + byte key; + int dataLength; + int numAppends; + + public MultiThreadedAppend( byte key, int dataLength, int numAppends ) + { + this.key = key; + this.dataLength = dataLength; + this.numAppends = numAppends; + } + + public void run() + { + UserLogRecord logRecord = new UserLogRecord(); + byte recordData[] = new byte[dataLength]; + int idx; + boolean failed = false; + + for ( idx = 0; idx < dataLength; idx++ ) + { + recordData[idx] = key; + } + + logRecord.setData( recordData, dataLength ); + + try + { + for ( idx = 0; idx < numAppends; idx++ ) + { + boolean sync = false; + if ( ( ( idx % 3 ) == 0 ) || ( idx == numAppends - 1 ) ) + { + sync = true; + } + + log.log( logRecord, sync ); + } + } + catch( IOException e ) + { + e.printStackTrace(); + failed = true; + } + catch( InvalidLogException e ) + { + e.printStackTrace(); + failed = true; + } + + assertTrue( failed == false ); + } + } + +}