directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject svn commit: r1181167 [2/2] - in /directory/apacheds/branches/apacheds-txns: core-api/src/main/java/org/apache/directory/server/core/log/ core-api/src/main/java/org/apache/directory/server/core/txn/ core-api/src/main/java/org/apache/directory/server/cor...
Date Mon, 10 Oct 2011 19:39:37 GMT
Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogManager.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,497 @@
+
+package org.apache.directory.server.core.log;
+
+import java.nio.ByteBuffer;
+
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Lock;
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.EOFException;
+
+import org.apache.directory.server.i18n.I18n;
+
+class LogManager
+{
+   
+    /**  Controlfile record size */
+    private final static int CONTROLFILE_RECORD_SIZE = 44;
+    
+    /** Controlfile file magic number */
+    private final static int CONTROLFILE_MAGIC_NUMBER = 0xFF11FF11;
+    
+    /** Controlfile log file number */
+    private final static long CONTROLFILE_LOG_FILE_NUMBER = -1;
+    
+    /** Shadow Controlfile log file number */
+    private final static long CONTROLFILE_SHADOW_LOG_FILE_NUMBER = -2;
+    
+    /** buffer used to do IO on controlfile */
+    byte controlFileBuffer[] = new byte[CONTROLFILE_RECORD_SIZE];
+    
+    /** ByteBuffer used to to IO on checkpoint file */
+    ByteBuffer controlFileMarker = ByteBuffer.wrap( controlFileBuffer );
+    
+    /** Current checkpoint record in memory */
+    ControlFileRecord controlFileRecord = new ControlFileRecord();
+    
+    /** Min neeeded point in the log */
+    LogAnchor minLogAnchor = new LogAnchor();
+    
+    /** Protects minLogAchor */
+    Lock minLogAnchorLock = new ReentrantLock();
+    
+    /** Log file manager */
+    LogFileManager logFileManager;
+        
+    /** Log Anchor comparator */
+    LogAnchorComparator anchorComparator = new LogAnchorComparator();
+    
+    /** Current log file */
+    private long currentLogFileNumber;
+    
+    /** Buffer used to read log file markers */
+    byte markerBuffer[] = new byte[LogFileRecords.LOG_FILE_HEADER_SIZE];
+    
+    /** ByteBuffer wrapper for the marker buffer */
+    ByteBuffer markerHead = ByteBuffer.wrap( markerBuffer );
+    
+    
+    public LogManager( LogFileManager logFileManager )
+    {
+        this.logFileManager = logFileManager;
+    }
+    
+    /**
+     *Initializes the log management:
+     * 1) Checks if control file exists and creates it if necesssary. If it exists, it reads it and loads the latest checkpoint.
+     * 2) Starts from the lates checkpoint ans scans forwards the logs to check for corrupted logs and determine the end of the log.
+     * This scan ends either when a properly ended log file is found or a partially written log record is found. 
+     *
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    public void initLogManager() throws IOException, InvalidLogException
+    {
+        LogAnchor scanPoint = new LogAnchor();
+        LogScannerInternal scanner;
+        UserLogRecord logRecord;        
+        LogFileManager.LogFileReader reader;
+
+        
+        
+        // Read and verify control file
+        boolean controlFileExists = true;
+        try
+        {
+            this.readControlFile();
+        }
+        catch( FileNotFoundException e )
+        {
+            controlFileExists = false;
+        }
+        
+        if ( controlFileExists )
+        {
+            boolean invalidLog = false;
+            
+            // Set the min log anchor from the control file
+            minLogAnchor.resetLogAnchor( controlFileRecord.minNeededLogFile, 
+                    controlFileRecord.minNeededLogFileOffset, controlFileRecord.minNeededLSN );
+            
+            scanPoint.resetLogAnchor( minLogAnchor );
+            
+            logRecord = new UserLogRecord();
+            scanner = new DefaultLogScanner();
+            scanner.init( scanPoint, logFileManager );
+            
+            try
+            {
+                while ( scanner.getNextRecord( logRecord ) )
+                {
+                    // No need to do anything with the log record
+                }
+            }
+            catch( InvalidLogException e )
+            {
+                invalidLog = true;
+            }
+            finally
+            {
+                scanner.close();
+            }
+            
+            long lastGoodLogFileNumber = scanner.getLastGoodFileNumber();
+            long lastGoodLogFileOffset = scanner.getLastGoodOffset();
+            currentLogFileNumber = lastGoodLogFileNumber;
+            
+            if ( ( lastGoodLogFileNumber < LogAnchor.MIN_LOG_NUMBER ) || 
+                ( lastGoodLogFileOffset < LogAnchor.MIN_LOG_OFFSET ))
+            {
+                throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+            }
+            
+            scanPoint.resetLogAnchor( lastGoodLogFileNumber, lastGoodLogFileOffset, 
+                    LogAnchor.UNKNOWN_LSN );
+            
+            if ( anchorComparator.compare( scanPoint, minLogAnchor ) < 0 )
+            {
+                throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+            }
+            
+            /*
+             * If invalid content at the end of file:
+             * if we are past the header of file, then
+             * truncate the file to the end of the last
+             * read log record, otherwise we read a partially
+             * written log file header, in this case reformat the log file.
+             * Also check next for the existence of next file to make
+             * sure we really read the last log file.
+             */
+            if ( invalidLog )
+            {
+                // Check if next log file exists
+                reader = null;
+                try
+                {
+                    reader = logFileManager.getReaderForLogFile( ( lastGoodLogFileNumber + 1 ) );
+                    
+                }
+                catch ( FileNotFoundException e )
+                {
+                    // Fine, this is what we want
+                }
+                finally
+                {
+                    if ( reader != null )
+                    {
+                        reader.close();
+                    }
+                }
+                
+                if ( reader != null )
+                {
+                    throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+                }
+                
+                if  ( lastGoodLogFileOffset >= LogFileRecords.LOG_FILE_HEADER_SIZE  )
+                {
+                    logFileManager.truncateLogFile( lastGoodLogFileNumber, lastGoodLogFileOffset );
+                }
+                else
+                {
+                    // Reformat the existing log file
+                    this.createNextLogFile( true);
+                }           
+            }
+            
+        }
+        {
+            /*
+             * Control file does not exist. Either we are at the very beginning or 
+             * maybe we crashed in the middle of creating the first log file. 
+             * We  should have the min log file at most with the file header formatted. 
+             */
+           reader = null;
+           boolean fileExists = false;
+           currentLogFileNumber = LogAnchor.MIN_LOG_NUMBER - 1;
+           try
+           {
+               reader = logFileManager.getReaderForLogFile( LogAnchor.MIN_LOG_NUMBER );
+               
+               if ( reader.getLength() > LogFileRecords.LOG_FILE_HEADER_SIZE )
+               {
+                   throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+               }
+               fileExists = true;
+               currentLogFileNumber++;
+           }
+           catch ( FileNotFoundException e )
+           {
+               // Fine, we will create the file
+           }
+           finally
+           {
+               if ( reader != null )
+               {
+                   reader.close();
+               }
+           }
+            
+           
+           
+           this.createNextLogFile( fileExists );
+            
+            // Prepare the min log anchor and control file and write the control file
+           minLogAnchor.resetLogAnchor( LogAnchor.MIN_LOG_NUMBER, LogFileRecords.LOG_FILE_HEADER_SIZE, LogAnchor.UNKNOWN_LSN );
+           
+           this.writeControlFile();
+        }
+    }
+    
+    /**
+     * Called by LogFlushManager to switch to the next file.
+     *
+     * Note:Currently we do a checkpoint and delete unnecessary log files when we switch to a new file. Some
+     * of this tasks can be delegated to a background thread later. 
+     *
+     * @param currentWriter current log file used by the flush manager. Null if the flush manager is just starting up.
+     * @return new lgo file to be used.
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    public LogFileManager.LogFileWriter switchToNextLogFile( LogFileManager.LogFileWriter currentWriter ) throws IOException, InvalidLogException
+    {
+        if ( currentWriter != null )
+        {
+            currentWriter.close();
+            this.writeControlFile();
+            this.createNextLogFile( false );
+        }
+        
+        LogFileManager.LogFileWriter writer =  logFileManager.getWriterForLogFile( this.currentLogFileNumber );
+        long currentOffset = writer.getLength();
+        if ( currentOffset > 0 )
+        {
+            writer.seek( currentOffset );
+        }
+        return writer;    
+    }
+    
+    /**
+     * Called when the logging subsystem is notified about the minimum position 
+     * in the log files that is needed. Log manager uses this information to advance
+     * its checkpoint and delete unnecessary log files.
+     *
+     * @param newLogAnchor min needed log anchor
+     */
+    public void advanceMinLogAnchor( LogAnchor newLogAnchor )
+    {
+        if ( newLogAnchor == null )
+        {
+            return;
+        }
+        
+        minLogAnchorLock.lock();
+        
+        if ( anchorComparator.compare( minLogAnchor, newLogAnchor ) < 0 )
+        {
+            minLogAnchor.resetLogAnchor( newLogAnchor );
+        }
+        
+        minLogAnchorLock.unlock();
+    }
+    
+    /**
+     * Writes the control file. To make paritally written control files unlikely,
+     * data is first written to a shadow file and then moved(renamed) to the controlfile.
+     * Move of a file is atomic in POSIX systems, in GFS like file systems(in HDFS for example).
+     * On windows, it is not always atomic but atomic versions of rename operations started to
+     * appear in their recent file systems. 
+     *
+     * @throws IOException
+     */
+    private void writeControlFile() throws IOException
+    {
+        // Copy the min log file anchor
+        minLogAnchorLock.lock();
+        
+        controlFileRecord.minNeededLogFile = minLogAnchor.getLogFileNumber();
+        controlFileRecord.minNeededLogFileOffset = minLogAnchor.getLogFileOffset();
+        controlFileRecord.minNeededLSN = minLogAnchor.getLogLSN();
+        
+        minLogAnchorLock.unlock();
+        
+        if ( controlFileRecord.minNeededLogFile > controlFileRecord.minExistingLogFile  )
+        {
+            this.deleteUnnecessaryLogFiles( controlFileRecord.minExistingLogFile,controlFileRecord.minNeededLogFile );
+            controlFileRecord.minExistingLogFile = controlFileRecord.minNeededLogFile;
+            
+        }
+        
+        // TODO compute checksum for log record here
+        
+        
+        controlFileMarker.rewind();
+        controlFileMarker.putLong( controlFileRecord.minExistingLogFile );
+        controlFileMarker.putLong( controlFileRecord.minNeededLogFile );
+        controlFileMarker.putLong( controlFileRecord.minNeededLogFileOffset );
+        controlFileMarker.putLong( controlFileRecord.minNeededLSN );
+        controlFileMarker.putLong( controlFileRecord.checksum );
+        controlFileMarker.putInt( CONTROLFILE_MAGIC_NUMBER );
+        
+        
+        boolean shadowFileExists = logFileManager.createLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER  );
+        
+        if ( shadowFileExists )
+        {
+            logFileManager.truncateLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER, 0 );
+        }
+        
+        LogFileManager.LogFileWriter controlFileWriter = logFileManager.getWriterForLogFile( CONTROLFILE_SHADOW_LOG_FILE_NUMBER );
+        
+        try
+        {
+            controlFileWriter.append( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE);
+            controlFileWriter.sync();
+        }
+        finally
+        {
+            controlFileWriter.close();
+        }
+        
+        // Do the move now
+        logFileManager.rename( CONTROLFILE_SHADOW_LOG_FILE_NUMBER , CONTROLFILE_LOG_FILE_NUMBER );
+        
+        
+    }
+    
+    /**
+     * Read and verifies the control file.
+     *
+     * @throws IOException
+     * @throws InvalidLogException
+     * @throws FileNotFoundException
+     */
+    private void readControlFile() throws IOException, InvalidLogException, FileNotFoundException
+    {
+        boolean invalidControlFile = false;
+        LogFileManager.LogFileReader controlFileReader = logFileManager.getReaderForLogFile( CONTROLFILE_LOG_FILE_NUMBER );
+        
+        try
+        {
+            controlFileReader.read( controlFileBuffer, 0, CONTROLFILE_RECORD_SIZE );
+        }
+        catch( EOFException e )
+        {
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) , e);
+        }
+        finally
+        {
+            controlFileReader.close();
+        }
+        
+        controlFileMarker.rewind();
+        controlFileRecord.minExistingLogFile = controlFileMarker.getLong();
+        controlFileRecord.minNeededLogFile = controlFileMarker.getLong();
+        controlFileRecord.minNeededLogFileOffset = controlFileMarker.getLong();
+        controlFileRecord.minNeededLSN = controlFileMarker.getLong();
+        controlFileRecord.checksum = controlFileMarker.getLong();
+        int magicNumber = controlFileMarker.getInt();
+        
+        
+        if ( controlFileRecord.minExistingLogFile < LogAnchor.MIN_LOG_NUMBER )
+        {
+            invalidControlFile = true;
+        }
+        
+        if ( (controlFileRecord.minNeededLogFile < LogAnchor.MIN_LOG_NUMBER ) ||
+              ( controlFileRecord.minNeededLogFileOffset < LogAnchor.MIN_LOG_OFFSET ) )
+        {
+            invalidControlFile = true;
+        }
+        
+        if ( controlFileRecord.minExistingLogFile > controlFileRecord.minNeededLogFile )
+        {
+            invalidControlFile = true;
+        }
+        
+        if ( magicNumber != this.CONTROLFILE_MAGIC_NUMBER )
+        {
+            invalidControlFile = true;
+        }
+        
+        // TODO compute and compare checksum
+        
+        if ( invalidControlFile == true )
+        {
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        }
+        
+    }
+    
+    /**
+     * Creates the next log file. If the log file already exists, then it is reformatted, that is,
+     * its size is truncated to zero and file header is writtten again.
+     *
+     * @param reformatExistingFile log file already exists and should be formatted. If false, log file should not exist.
+     * @throws IOException
+     * @throws InvalidLogException
+     */
+    private void createNextLogFile( boolean reformatExistingFile ) throws IOException, InvalidLogException
+    {
+        LogFileManager.LogFileWriter writer = null;
+            
+        long logFileNumber = this.currentLogFileNumber;
+        
+        if ( reformatExistingFile == false )
+        {
+            logFileNumber++;
+        }
+        
+        // Try to create the file.
+        boolean fileAlreadyExists = logFileManager.createLogFile( logFileNumber );
+        
+        if ( ( reformatExistingFile == false ) && ( fileAlreadyExists == true ) )
+        {
+            // Didnt expect the file to be around
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        }
+        
+        if ( ( reformatExistingFile == true ) && ( fileAlreadyExists == false ) )
+        {
+            // Didnt expect the file to be around
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        }
+        
+        if ( reformatExistingFile )
+        {
+            logFileManager.truncateLogFile( logFileNumber, LogAnchor.MIN_LOG_OFFSET );
+           
+        }
+        
+        writer = logFileManager.getWriterForLogFile( logFileNumber );
+        
+        try
+        {
+            markerHead.rewind();
+            markerHead.putLong( logFileNumber );
+            markerHead.putInt( LogFileRecords.LOG_FILE_HEADER_MAGIC_NUMBER );
+            writer.append( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE );
+            writer.sync();            
+        }
+        finally
+        {
+            writer.close();
+        }
+        
+        this.currentLogFileNumber = logFileNumber;
+        
+    }
+    
+    private void deleteUnnecessaryLogFiles( long startingLogFileNumber, long endingLogFileNumber )
+    {
+        for ( long logFileNumber = startingLogFileNumber; logFileNumber < endingLogFileNumber; 
+                logFileNumber++ )
+        {
+            // Do a best effort delete
+            logFileManager.deleteLogFile( logFileNumber );
+        }
+    }
+    
+    /**
+     * Checkpoint record
+     */
+     private class ControlFileRecord
+     {
+         long minExistingLogFile;
+         long minNeededLogFile;
+         long minNeededLogFileOffset;
+         long minNeededLSN;
+         long checksum;
+     }
+     
+    
+      
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/log/LogScannerInternal.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,14 @@
+
+package org.apache.directory.server.core.log;
+
+
+public interface LogScannerInternal extends LogScanner
+{
+    /**
+     * Initializes the scanner
+     *
+     * @param startingPoint
+     * @param logFileManager log file manager to use 
+     */
+    public void init( LogAnchor startingPoint, LogFileManager logFileManager );
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/AbstractTransaction.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,95 @@
+
+package org.apache.directory.server.core.txn;
+
+import java.util.List;
+import java.util.ArrayList;
+
+abstract class AbstractTransaction implements Transaction
+{
+    /** Logical time(LSN in the wal) when the txn began */ 
+    long startTime;
+    
+    /** logical commit time, set when txn commits */
+    long commitTime;
+    
+    /** State of the transaction */
+    State txnState;
+    
+    /** List of txns that this txn depends */
+    List<ReadWriteTxn> txnsToCheck = new ArrayList<ReadWriteTxn>();
+ 
+    
+    public AbstractTransaction( )
+    {
+        txnState = State.INITIAL;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void startTxn( long startTime )
+    {
+        this.startTime = startTime;
+        this.setState( State.READ );
+    }
+    
+    /**
+     * {@inheritDoc}
+     */  
+    public long getStartTime()
+    {
+        return this.startTime;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void commitTxn( long commitTime )
+    {
+        this.commitTime = commitTime;
+        this.setState( State.COMMIT );
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public long getCommitTime()
+    {
+        return commitTime;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void abortTxn()
+    {
+       this.setState( State.ABORT ); 
+    }
+
+    
+    /**
+     * {@inheritDoc}
+     */  
+    public List<ReadWriteTxn> getTxnsToCheck()
+    {
+        return this.txnsToCheck;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */  
+    public State getState()
+    {
+        return this.txnState;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */  
+    public void setState( State newState )
+    {
+        this.txnState = newState;
+    }
+    
+    
+}
\ No newline at end of file

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnLogManager.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,95 @@
+
+package org.apache.directory.server.core.txn;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+import org.apache.directory.server.core.log.UserLogRecord;
+import org.apache.directory.server.core.log.Log;
+import org.apache.directory.server.core.log.InvalidLogException;
+
+
+
+import org.apache.directory.server.core.txn.logedit.LogEdit;
+
+
+public class DefaultTxnLogManager implements TxnLogManager
+{
+    /** Write ahea log */
+    Log wal;
+    
+    /** Txn Manager */
+    TxnManagerInternal txnManager;
+    
+    public void init( Log logger, TxnManagerInternal txnManager )
+    {
+        this.wal = logger;
+        this.txnManager = txnManager;
+    }
+    /**
+     * {@inheritDoc}
+     */
+   public void log( LogEdit logEdit, boolean sync ) throws IOException
+   {
+       Transaction curTxn = txnManager.getCurTxn();
+       
+       if ( ( curTxn == null ) || ( ! ( curTxn instanceof ReadWriteTxn ) ) )
+       {
+           throw new IllegalStateException( "Trying to log logedit without ReadWriteTxn" );
+       }
+       
+       ReadWriteTxn txn = (ReadWriteTxn)curTxn;
+       UserLogRecord logRecord = txn.getUserLogRecord();
+       
+       
+       ObjectOutputStream out = null;
+       ByteArrayOutputStream bout = null;
+       byte[] data;
+
+       try
+       {
+           bout = new ByteArrayOutputStream();
+           out = new ObjectOutputStream( bout );
+           out.writeObject( logEdit );
+           out.flush();
+           data = bout.toByteArray();
+       }
+       finally
+       {
+           if ( bout != null )
+           {
+               bout.close();
+           }
+           
+           if ( out != null )
+           {
+               out.close();
+           }
+       }
+       
+       logRecord.setData( data, data.length );
+       
+       this.log( logRecord, sync );
+       
+       logEdit.getLogAnchor().resetLogAnchor( logRecord.getLogAnchor() );
+       txn.getEdits().add( logEdit );
+   }
+    
+   /**
+    * {@inheritDoc}
+    */
+   public void log( UserLogRecord logRecord, boolean sync ) throws IOException
+   {
+       try
+       {
+           wal.log( logRecord, sync );
+       }
+       catch ( InvalidLogException e )
+       {
+           throw new IOException(e);
+       }
+   }
+   
+   
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,413 @@
+
+package org.apache.directory.server.core.txn;
+
+import org.apache.directory.server.core.txn.logedit.TxnStateChange;
+import org.apache.directory.server.core.log.LogAnchor;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import java.util.Iterator;
+import java.util.List;
+
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.directory.server.core.log.UserLogRecord;
+import org.apache.directory.server.core.log.LogAnchor;
+
+import java.io.IOException;
+
+
+public class DefaultTxnManager implements TxnManager, TxnManagerInternal
+{
+    /** wal log manager */
+    TxnLogManager txnLogManager;
+    
+    /** List of committed txns in commit LSN order */
+    ConcurrentLinkedQueue<ReadWriteTxn> committedQueue = new ConcurrentLinkedQueue<ReadWriteTxn>();
+    
+    /** Verify lock under which txn verification is done */
+    Lock verifyLock = new ReentrantLock();
+    
+    /** Used to assign start and commit version numbers to writeTxns */
+    Lock writeTxnsLock = new ReentrantLock();
+    
+    /** Latest committed txn on which read only txns can depend */
+    AtomicReference<ReadWriteTxn> latestCommittedTxn = new AtomicReference<ReadWriteTxn>();
+    
+    /** Latest verified write txn */
+    AtomicReference<ReadWriteTxn> latestVerifiedTxn = new AtomicReference<ReadWriteTxn>();
+    
+    /** Latest flushed txn's logical commit time */
+    AtomicLong latestFlushedTxnLSN = new AtomicLong( 0 );
+    
+    /** Per thread txn context */
+    static final ThreadLocal < Transaction > txnVar = 
+         new ThreadLocal < Transaction > () 
+         {
+             @Override 
+             protected Transaction initialValue()
+             {
+                 return null;
+             }
+        };
+    
+    public void init( TxnLogManager txnLogManager )
+    {
+        this.txnLogManager = txnLogManager;
+    }
+    
+    /**
+     * {@inheritDoc}
+     */  
+    public void beginTransaction( boolean readOnly ) throws IOException
+    {
+        Transaction curTxn = txnVar.get();
+        
+        if ( curTxn != null )
+        {
+            throw new IllegalStateException("Cannot begin a txn when txn is already running: " + 
+                curTxn);
+        }
+        
+        if ( readOnly )
+        {
+            this.beginReadOnlyTxn();
+        }
+        else
+        {
+            this.beginReadWriteTxn();
+        }
+        
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void commitTransaction() throws IOException
+    {
+        Transaction txn = txnVar.get();
+        
+        if ( txn == null )
+        {
+            throw new IllegalStateException(" trying to commit non existent txn ");
+        }
+        
+        this.prepareForEndingTxn( txn );
+        
+        if ( txn instanceof ReadOnlyTxn )
+        {
+            txn.commitTxn( txn.getStartTime() );
+        }
+        else
+        {
+            this.commitReadWriteTxn( (ReadWriteTxn)txn );
+        }
+        
+        txnVar.set( null );
+            
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void abortTransaction() throws IOException
+    {
+        Transaction txn = txnVar.get();
+        
+        if ( txn == null )
+        {
+            // this is acceptable
+            return;
+        }
+        
+        this.prepareForEndingTxn( txn );
+        
+        if ( txn instanceof ReadWriteTxn )
+        {
+            this.abortReadWriteTxn( (ReadWriteTxn)txn );
+        }
+        
+        txn.abortTxn();
+        txnVar.set( null );
+    }
+    
+    /**
+     * {@inheritDoc}
+     */
+    public Transaction getCurTxn()
+    {
+       return txnVar.get(); 
+    }
+    
+    private void beginReadOnlyTxn()
+    {
+        ReadOnlyTxn txn = new ReadOnlyTxn();
+        ReadWriteTxn lastTxnToCheck = null;
+        
+        do
+        {
+            if ( lastTxnToCheck != null )
+            {
+                lastTxnToCheck.getRefCount().decrementAndGet();
+            }
+            
+            lastTxnToCheck = latestCommittedTxn.get();
+            lastTxnToCheck.getRefCount().getAndIncrement();
+        }while ( lastTxnToCheck != latestCommittedTxn.get()  );
+        
+        // Determine start time
+        long startTime;
+        
+        if ( lastTxnToCheck != null )
+        {
+            startTime = lastTxnToCheck.getCommitTime();
+        }
+        else
+        {
+            startTime = LogAnchor.UNKNOWN_LSN;
+        }
+        
+        this.buildCheckList( txn, lastTxnToCheck );
+        txnVar.set( txn );
+    }
+    
+    private void beginReadWriteTxn() throws IOException
+    {
+        long txnID;
+        
+        ReadWriteTxn txn = new ReadWriteTxn();
+        UserLogRecord logRecord = txn.getUserLogRecord();
+        
+        TxnStateChange txnRecord = new TxnStateChange( LogAnchor.UNKNOWN_LSN, 
+                TxnStateChange.State.TXN_BEGIN );
+        ObjectOutputStream out = null;
+        ByteArrayOutputStream bout = null;
+        byte[] data;
+
+        try
+        {
+            bout = new ByteArrayOutputStream();
+            out = new ObjectOutputStream( bout );
+            out.writeObject( txnRecord );
+            out.flush();
+            data = bout.toByteArray();
+        }
+        finally
+        {
+            if ( bout != null )
+            {
+                bout.close();
+            }
+            
+            if ( out != null )
+            {
+                out.close();
+            }
+        }
+        
+        
+        logRecord.setData(  data, data.length );
+        
+        ReadWriteTxn lastTxnToCheck = null; 
+        writeTxnsLock.lock();
+        
+        try
+        {
+            txnLogManager.log( logRecord, false );
+            txn.startTxn( logRecord.getLogAnchor().getLogLSN() );
+            
+            do
+            {
+                if ( lastTxnToCheck != null )
+                {
+                    lastTxnToCheck.getRefCount().decrementAndGet();
+                }
+                
+                lastTxnToCheck = latestVerifiedTxn.get();
+                lastTxnToCheck.getRefCount().incrementAndGet();
+            } while ( lastTxnToCheck != latestVerifiedTxn.get() );
+            
+        }
+        finally
+        {
+            writeTxnsLock.unlock();
+        }
+        
+        // Finally build the check list
+        this.buildCheckList( txn, lastTxnToCheck );
+        
+        txnVar.set( txn );
+    }
+    
+    
+    
+    private void buildCheckList( Transaction txn, ReadWriteTxn lastTxnToCheck )
+    {
+        if ( lastTxnToCheck != null )
+        {
+            long lastLSN = lastTxnToCheck.getCommitTime();
+            ReadWriteTxn toAdd;
+
+            List<ReadWriteTxn> toCheckList = txn.getTxnsToCheck();
+            Iterator<ReadWriteTxn> it = committedQueue.iterator();
+            while ( it.hasNext() )
+            {
+                toAdd = it.next();
+
+                if ( toAdd.getCommitTime() > lastLSN )
+                {
+                    break;
+                }
+
+                toCheckList.add( toAdd );
+            }
+
+            /*
+             * Get latest flushed lsn and eliminate already flushed txn from the check list.
+             */
+            long flushedLSN = latestFlushedTxnLSN.get();
+
+            it = toCheckList.iterator();
+            ReadWriteTxn toCheck;
+            while ( it.hasNext() )
+            {
+                toCheck = it.next();
+                if ( toCheck.commitTime <= flushedLSN )
+                {
+                    it.remove();
+                }
+            }
+
+        }
+
+    }
+    
+    
+    private void prepareForEndingTxn( Transaction txn )
+    {
+        List<ReadWriteTxn> toCheck = txn.getTxnsToCheck();
+        
+        if ( toCheck.size() > 0 )
+        {
+            ReadWriteTxn lastTxnToCheck = toCheck.get( toCheck.size() - 1 );
+            
+            if ( lastTxnToCheck.commitTime != txn.getStartTime() )
+            {
+                throw new IllegalStateException( " prepareForEndingTxn: txn has unpexptected start time " + 
+                    txn + " expected: " + lastTxnToCheck );
+            }
+            
+            if ( lastTxnToCheck.getRefCount().get() <= 0 )
+            {
+                throw new IllegalStateException( " prepareForEndingTxn: lastTxnToCheck has unexpected ref cnt " + 
+                    txn + " expected: " + lastTxnToCheck );
+            }
+            
+            lastTxnToCheck.getRefCount().decrementAndGet();
+        }
+    }
+    
+    private void commitReadWriteTxn( ReadWriteTxn txn ) throws IOException
+    {
+        UserLogRecord logRecord = txn.getUserLogRecord();
+
+        TxnStateChange txnRecord = new TxnStateChange( txn.getStartTime(),
+            TxnStateChange.State.TXN_COMMIT );
+        ObjectOutputStream out = null;
+        ByteArrayOutputStream bout = null;
+        byte[] data;
+
+        try
+        {
+            bout = new ByteArrayOutputStream();
+            out = new ObjectOutputStream( bout );
+            out.writeObject( txnRecord );
+            out.flush();
+            data = bout.toByteArray();
+        }
+        finally
+        {
+            if ( bout != null )
+            {
+                bout.close();
+            }
+
+            if ( out != null )
+            {
+                out.close();
+            }
+        }
+
+        logRecord.setData( data, data.length );
+        
+        verifyLock.lock();
+       
+        // TODO verify txn here throw conflict exception if necessary
+
+        
+        writeTxnsLock.lock();
+        try
+        {
+           // TODO sync of log can be done outside the locks. 
+           txnLogManager.log( logRecord, true );
+           txn.commitTxn( logRecord.getLogAnchor().getLogLSN() );
+           
+           latestVerifiedTxn.set( txn );
+           
+           // TODO when sync is done outside the locks, advance latest commit outside the locks
+           latestCommittedTxn.set( txn );
+        }
+        finally
+        {
+            writeTxnsLock.unlock();
+            verifyLock.unlock();
+        }
+    }
+    
+    
+    private void abortReadWriteTxn( ReadWriteTxn txn ) throws IOException
+    {
+        UserLogRecord logRecord = txn.getUserLogRecord();
+
+        TxnStateChange txnRecord = new TxnStateChange( txn.getStartTime(),
+            TxnStateChange.State.TXN_ABORT );
+        ObjectOutputStream out = null;
+        ByteArrayOutputStream bout = null;
+        byte[] data;
+
+        try
+        {
+            bout = new ByteArrayOutputStream();
+            out = new ObjectOutputStream( bout );
+            out.writeObject( txnRecord );
+            out.flush();
+            data = bout.toByteArray();
+        }
+        finally
+        {
+            if ( bout != null )
+            {
+                bout.close();
+            }
+
+            if ( out != null )
+            {
+                out.close();
+            }
+        }
+
+        logRecord.setData( data, data.length );
+        txnLogManager.log( logRecord, false );
+        
+    }
+
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,7 @@
+
+package org.apache.directory.server.core.txn;
+
+public class ReadOnlyTxn extends AbstractTransaction
+{
+   
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,50 @@
+
+package org.apache.directory.server.core.txn;
+
+import java.util.List;
+import java.util.LinkedList;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.server.core.txn.logedit.LogEdit;
+
+import org.apache.directory.server.core.log.UserLogRecord;
+
+public class ReadWriteTxn extends AbstractTransaction
+{  
+    /** list of log edits by the txn */
+    List<LogEdit> logEdits = new LinkedList<LogEdit>();
+    
+    /*
+     * Number of txns that depend on this txn and previous committed
+     * txns. This number is bumped up only after the txn is committed.
+     * A txn can be flushed to partitions only after the txn itself is
+     * committed and ref count becomes zero for all the previously
+     * committed txns.
+     */
+    AtomicInteger txnRefCount = new AtomicInteger( 0 );
+    
+    /** User record used to communicate data with log manager */
+    UserLogRecord logRecord = new UserLogRecord();
+    
+    // TODO add a map of index changes 
+   
+    
+      
+    public AtomicInteger getRefCount()
+    {
+        return this.txnRefCount;
+    }
+    
+    public UserLogRecord getUserLogRecord()
+    {
+        return this.getUserLogRecord();
+    }
+    
+    public List<LogEdit> getEdits()
+    {
+        return logEdits;
+    }
+    
+    
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/Transaction.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,32 @@
+
+package org.apache.directory.server.core.txn;
+
+import java.util.List;
+
+
+interface Transaction
+{
+    public List<ReadWriteTxn> getTxnsToCheck();
+    
+    public long getStartTime();
+    
+    public void startTxn( long startTime );
+    
+    public void commitTxn( long commitTime );
+    
+    public long getCommitTime();
+    
+    public void abortTxn();
+    
+    public State getState();    
+    
+    
+    enum State
+    {
+        INITIAL,
+        READ,
+        COMMIT,
+        ABORT   
+    }
+
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,7 @@
+
+package org.apache.directory.server.core.txn;
+
+public interface TxnManagerInternal extends TxnManager
+{
+    public Transaction getCurTxn();
+}

Added: directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java?rev=1181167&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java (added)
+++ directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/log/LogFlushScanTest.java Mon Oct 10 19:39:36 2011
@@ -0,0 +1,356 @@
+
+package org.apache.directory.server.core.log;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LogFlushScanTest
+{
+    /** Logger */
+    private Log log;
+    
+    /** Log buffer size */
+    private int logBufferSize = 1 << 12;
+    
+    /** Log File Size */
+    private long logFileSize = 1 << 13;
+    
+    /** log suffix */
+    private String logSuffix = "log";
+    
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+
+    private String getLogFoler( ) throws IOException
+    {
+        String file = folder.newFolder( "log" ).getAbsolutePath();
+        return file;
+    }
+
+
+    @Before
+    public void setup() throws IOException, InvalidLogException
+    {
+        log = new DefaultLog();
+        log.init( this.getLogFoler(), logSuffix, logBufferSize, logFileSize );
+    }
+
+
+    @After
+    public void teardown() throws IOException
+    {
+  
+    }
+    
+    @Test
+    public void testAppendScan()
+    {
+        int idx;
+        int dataLength = 1024;
+        UserLogRecord logRecord = new UserLogRecord();
+        byte recordData[] = new byte[dataLength];
+        byte userRecord[];
+        boolean failed = false;
+        
+        byte writtenCounter = 0;
+        byte readCounter = 0;
+        
+        LogAnchor startingPoint = new LogAnchor();
+        
+        for ( idx = 0; idx < dataLength; idx++ )
+        {
+            recordData[idx] = writtenCounter;
+        }
+        writtenCounter++;
+        
+        try
+        {
+            logRecord.setData( recordData, dataLength );
+            log.log( logRecord, false );
+            
+            // Record the starting point
+            startingPoint.resetLogAnchor( logRecord.getLogAnchor() );
+            
+            for ( idx = 0; idx < dataLength; idx++ )
+            {
+                recordData[idx] = writtenCounter;
+            }
+            writtenCounter++;
+            
+            logRecord.setData( recordData, dataLength );
+            log.log( logRecord, true ); //Sync what we logged so far
+       
+            
+            LogScanner logScanner = log.beginScan( startingPoint );
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                assertTrue( logRecord.getDataLength() == dataLength );
+                
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    assertTrue( userRecord[idx] == readCounter );
+                }
+                
+                readCounter++;
+            }
+            
+            assertTrue( writtenCounter == readCounter );
+        
+        }
+        catch( IOException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        catch( InvalidLogException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        
+        assertTrue( failed == false );
+    }
+    
+    @Test
+    public void testLogSwitchScan()
+    {
+        int idx;
+        int dataLength = 1024;
+        UserLogRecord logRecord = new UserLogRecord();
+        byte recordData[] = new byte[dataLength];
+        byte userRecord[];
+        boolean failed = false;
+        
+        byte writtenCounter = 1;
+        byte readCounter = 1;
+        byte maxCounter = 127; 
+        boolean firstRecord = true;
+        
+        LogAnchor startingPoint = new LogAnchor();
+        LogAnchor endPoint = new LogAnchor();
+        
+       try
+       {
+            while ( writtenCounter < maxCounter )
+            {
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    recordData[idx] = writtenCounter;
+                }
+                
+                logRecord.setData( recordData, dataLength );
+                boolean sync = ( ( writtenCounter % 11 ) == 0 ) || ( writtenCounter == ( maxCounter - 1 ) );
+                log.log( logRecord, sync );
+                
+                if ( firstRecord )
+                {
+                 // Record the starting point
+                    startingPoint.resetLogAnchor( logRecord.getLogAnchor() );
+                    firstRecord = false;
+                }
+                
+                if ( ( writtenCounter == ( maxCounter - 1 ) ) )
+                {
+                    endPoint.resetLogAnchor( logRecord.getLogAnchor() );
+                }
+                
+                writtenCounter++;
+            }
+            
+            assertTrue( endPoint.getLogFileNumber() > startingPoint.getLogFileNumber() ); 
+            
+            LogScanner logScanner = log.beginScan( startingPoint );
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                assertTrue( logRecord.getDataLength() == dataLength );
+                
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    assertTrue( userRecord[idx] == readCounter );
+                }
+                
+                readCounter++;
+            }
+            
+            assertTrue( writtenCounter == readCounter );
+            
+        
+       }
+       catch( IOException e )
+       {
+           e.printStackTrace();
+           failed = true;
+       }
+       catch( InvalidLogException e )
+       {
+           e.printStackTrace();
+           failed = true;
+       }
+        
+    }
+    
+    @Test
+    public void testMultiThreadedAppend() throws InterruptedException
+    {
+        int idx;
+        int dataLength = 1024;
+        UserLogRecord logRecord = new UserLogRecord();
+        byte recordData[] = new byte[dataLength];
+        byte userRecord[];
+        boolean failed = false;
+        
+        
+        
+        LogAnchor startingPoint = new LogAnchor();
+        
+        for ( idx = 0; idx < dataLength; idx++ )
+        {
+            recordData[idx] = 0;
+        }
+        
+        logRecord.setData( recordData, dataLength );
+        
+        try
+        {
+            log.log( logRecord, false );
+        }
+        catch( IOException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        catch( InvalidLogException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        
+        assertTrue( failed == false );
+        
+        startingPoint.resetLogAnchor( logRecord.getLogAnchor() );
+        
+        
+        byte key = 1;
+        int numThreads = 4;
+        int numAppends = 64;
+        int expectedSum = 0;
+        int sum = 0;
+        MultiThreadedAppend threads[] = new MultiThreadedAppend[numThreads];
+        
+        for ( idx = 0; idx < numThreads; idx++ )
+        {
+            threads[idx] = new MultiThreadedAppend( key , dataLength, numAppends);
+            expectedSum += key * numAppends;
+        }
+        
+        for ( idx = 0; idx < numThreads; idx++ )
+        {
+            threads[idx].start();
+        }
+        
+        for ( idx = 0; idx < numThreads; idx++ )
+        {
+            threads[idx].join();
+        }
+        
+        
+        LogScanner logScanner = log.beginScan( startingPoint );
+        
+        try
+        {
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                assertTrue( logRecord.getDataLength() == dataLength );
+                key = userRecord[0];
+                
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    assertTrue( userRecord[idx] == key );
+                }
+                
+                sum += key;
+            }
+        }
+        catch( IOException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        catch( InvalidLogException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        
+        
+        assertTrue( sum == expectedSum );
+    }
+    
+    class MultiThreadedAppend extends Thread
+    {
+        byte key;
+        int dataLength;
+        int numAppends;
+                
+        public MultiThreadedAppend( byte key, int dataLength, int numAppends )
+        {
+            this.key = key;
+            this.dataLength = dataLength;
+            this.numAppends = numAppends;
+        }
+        
+        public void run() 
+        {
+            UserLogRecord logRecord = new UserLogRecord();
+            byte recordData[] = new byte[dataLength];
+            int idx;
+            boolean failed = false;
+            
+            for ( idx = 0; idx < dataLength; idx++ )
+            {
+                recordData[idx] = key;
+            }
+            
+            logRecord.setData( recordData, dataLength );
+            
+            try
+            {
+                for ( idx = 0; idx < numAppends; idx++ )
+                {
+                    boolean sync = false;
+                    if ( ( ( idx % 3 )  == 0 ) || ( idx == numAppends - 1 ) )
+                    {
+                        sync = true;
+                    }
+                    
+                    log.log( logRecord, sync );
+                }
+            }
+            catch( IOException e )
+            {
+                e.printStackTrace();
+                failed = true;
+            }
+            catch( InvalidLogException e )
+            {
+                e.printStackTrace();
+                failed = true;
+            }
+            
+            assertTrue( failed == false );
+        }
+    }
+
+}



Mime
View raw message