directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject svn commit: r1178793 - in /directory/apacheds/branches/apacheds-txns/xdbm-partition/src: main/java/org/apache/directory/server/log/ main/java/org/apache/directory/server/log/impl/ test/java/org/apache/directory/server/log/ test/java/org/apache/director...
Date Tue, 04 Oct 2011 13:57:25 GMT
Author: saya
Date: Tue Oct  4 13:57:24 2011
New Revision: 1178793

URL: http://svn.apache.org/viewvc?rev=1178793&view=rev
Log:
Logging and Scan tests and fixes bases on them. Exception handling for LogFlushManager and
LogScanner.

Added:
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/test/java/org/apache/directory/server/log/
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/test/java/org/apache/directory/server/log/impl/
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/test/java/org/apache/directory/server/log/impl/LogFlushScanTest.java
Modified:
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
    directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java

Modified: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java?rev=1178793&r1=1178792&r2=1178793&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
(original)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/Log.java
Tue Oct  4 13:57:24 2011
@@ -13,8 +13,10 @@ public interface Log
      * @param suffix suffix for log file.
      * @param logBufferSize size of buffer that will hold unflushed log changes. Specifigy
zero if no buffering is desired
      * @param logFileSize A soft limit on the log file size
+     * @throws IOException
+     * @throws InvalidLogException
      */
-   public void init( String logFilepath, String suffix, int logBufferSize, long logFileSize
);
+   public void init( String logFilepath, String suffix, int logBufferSize, long logFileSize
) throws IOException, InvalidLogException;
     
     /**
      * Logs the given user record to the log. Position in the log files where the record
is logged is returned as part of
@@ -35,6 +37,15 @@ public interface Log
      * @return
      */
     public LogScanner beginScan( LogAnchor startPoint );
+    
+    
+    /**
+     * Advances the min needed position in the logs. Logging subsystem uses this
+     * information to get rid of unneeded
+     *
+     * @param newAnchor
+     */
+    public void advanceMinNeededLogPosition( LogAnchor newAnchor );
 
     
 }

Modified: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java?rev=1178793&r1=1178792&r2=1178793&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
(original)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/UserLogRecord.java
Tue Oct  4 13:57:24 2011
@@ -24,7 +24,8 @@ public class UserLogRecord
     
     public void setData( byte[] data, int length )
     {
-        this.recordHolder = recordHolder;
+        this.recordHolder = data;
+        this.length = length;
     }
     
     public byte[] getDataBuffer()

Modified: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java?rev=1178793&r1=1178792&r2=1178793&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogFileManager.java
Tue Oct  4 13:57:24 2011
@@ -24,7 +24,7 @@ class DefaultLogFileManager implements L
      * @param logFilepath log file path
      * @param suffix suffix for log file.
      */
-    public void init( String logFilepath, String suffix )
+    public void init( String logFilePath, String suffix )
     {
         this.logFilePath = logFilePath;
         this.suffix = suffix;
@@ -239,5 +239,13 @@ class DefaultLogFileManager implements L
         {
             return raf.length();
         }
+        
+        /**
+         * {@inheritDoc}
+         */
+        public void seek( long position ) throws IOException
+        {
+            raf.seek( position );
+        }
     }
 }

Modified: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java?rev=1178793&r1=1178792&r2=1178793&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
(original)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/DefaultLogScanner.java
Tue Oct  4 13:57:24 2011
@@ -12,7 +12,7 @@ import org.apache.directory.server.log.L
 import org.apache.directory.server.log.LogScanner;
 import org.apache.directory.server.log.UserLogRecord;
 
-public class DefaultLogScanner implements LogScanner
+public class DefaultLogScanner implements LogScannerInternal
 {
     /** LSN of the last successfully read log record */
     private long prevLSN = LogAnchor.UNKNOWN_LSN;
@@ -47,9 +47,12 @@ public class DefaultLogScanner implement
     /** ByteBuffer wrapper for the marker buffer */
     ByteBuffer markerHead = ByteBuffer.wrap( markerBuffer );
     
-    public DefaultLogScanner( LogAnchor startingLogAnchor, LogFileManager logFileManger )
+    /**
+     * {@inheritDoc}
+     */
+    public void init( LogAnchor startingLogAnchor, LogFileManager logFileManager )
     {
-        startingLogAnchor.resetLogAnchor( startingLogAnchor );
+        this.startingLogAnchor.resetLogAnchor( startingLogAnchor );
         this.logFileManager = logFileManager;
     }
     
@@ -91,7 +94,7 @@ public class DefaultLogScanner implement
                     if ( startingOffset < LogFileRecords.LOG_FILE_HEADER_SIZE )
                     {
                         // Offset should be at log file marker boundary
-                        this.markScanInvalid();
+                        this.markScanInvalid( null );
                     }
                     
                     prevLogFileOffset = Math.max( startingOffset, currentLogFile.getLength()
);
@@ -107,7 +110,7 @@ public class DefaultLogScanner implement
                 
                 if ( fileOffset > fileLength )
                 {
-                    this.markScanInvalid();
+                    this.markScanInvalid( null );
                 }
                 else if ( fileOffset == fileLength )
                 {
@@ -138,12 +141,12 @@ public class DefaultLogScanner implement
                 
                 if ( ( startingLSN != LogAnchor.UNKNOWN_LSN ) && ( startingLSN !=
lastReadLSN ) )
                 {
-                    this.markScanInvalid();
+                    this.markScanInvalid( null );
                 }
             }
             
             // Read and verify user block
-            this.readLogRecord( logRecord, recordLength );
+            this.readLogRecord( logRecord, recordLength - ( LogFileRecords.RECORD_HEADER_SIZE
+ LogFileRecords.RECORD_FOOTER_SIZE ));
             
             // Read and verify footer
             this.readRecordFooter();
@@ -165,9 +168,20 @@ public class DefaultLogScanner implement
         {
             // This means either the log record or the log file header was
             // partially written. Treat this as invalid log content
-            this.markScanInvalid();
+            this.markScanInvalid( e );
+        }
+        catch( IOException e)
+        {
+            this.close();
+            throw e;
+        }
+        catch( InvalidLogException e)
+        {
+            this.close();
+            throw e;
         }
         
+        
         return true;
         
     }
@@ -249,7 +263,7 @@ public class DefaultLogScanner implement
         
         if ( invalid == true )
         {
-            this.markScanInvalid();
+            this.markScanInvalid( null );
         }
         
         // Everything went fine
@@ -276,7 +290,7 @@ public class DefaultLogScanner implement
         
         if ( invalid == true )
         {
-            this.markScanInvalid();
+            this.markScanInvalid( null );
         }
     }
     
@@ -313,7 +327,7 @@ public class DefaultLogScanner implement
         this.prevLogFileOffset = 0;
         
         markerHead.rewind();
-        currentLogFile.read( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE );
+        logFile.read( markerBuffer, 0, LogFileRecords.LOG_FILE_HEADER_SIZE );
         long persistedLogFileNumber = markerHead.getLong();
         int magicNumber = markerHead.getInt();
       
@@ -330,7 +344,7 @@ public class DefaultLogScanner implement
         
         if ( invalid == true )
         {
-            this.markScanInvalid();
+            this.markScanInvalid( null );
         }
         
         // Everything is fine, advance good file offset and return
@@ -346,9 +360,9 @@ public class DefaultLogScanner implement
         }
     }
     
-    private void markScanInvalid() throws InvalidLogException
+    private void markScanInvalid( Exception cause ) throws InvalidLogException
     {
         invalidLog = true;
-        throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+        throw new InvalidLogException( I18n.err( I18n.ERR_750 ), cause );
     }
 }

Modified: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java?rev=1178793&r1=1178792&r2=1178793&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFileManager.java
Tue Oct  4 13:57:24 2011
@@ -169,6 +169,13 @@ interface LogFileManager
          * returns the length of the file
          */
         public long getLength() throws IOException;
+        
+        /**
+         * Repositions the reader at the given offset
+         *
+         * @param position
+         */
+        public void seek( long position ) throws IOException;
 
     }
     

Modified: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java?rev=1178793&r1=1178792&r2=1178793&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogFlushManager.java
Tue Oct  4 13:57:24 2011
@@ -61,6 +61,9 @@ class LogFlushManager
     /** Sof limit on the log file size */
     long targetLogFileSize;
     
+    /** If logging cannot succeed, then loggingFailed is set to true and further logging
is prevented */
+    boolean logFailed;
+    
     public LogFlushManager(LogManager logManager, int logMemoryBufferSize, long logFileSize
)
     {
         if ( ( logMemoryBufferSize < 0 ) || ( logFileSize < 0 ) )
@@ -87,7 +90,7 @@ class LogFlushManager
      */
     public void append(UserLogRecord userRecord, boolean sync ) throws IOException, InvalidLogException
     {
-        long lsn;
+        long lsn = LogAnchor.UNKNOWN_LSN;
         boolean appendedRecord = false;
         byte[] userBuffer = userRecord.getDataBuffer();
         int length  = userRecord.getDataLength();
@@ -97,90 +100,111 @@ class LogFlushManager
         
         appendLock.lock();
         
-        lsn = logLSN++;
-        
-        if ( currentLogFile == null )
+        if ( logFailed )
         {
-            // We are just starting, get the current log file
-            currentLogFile = logManager.switchToNextLogFile( null );
-            appendedSize = currentLogFile.getLength();
+            appendLock.unlock();
+            throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
         }
         
-        if ( appendedSize > this.targetLogFileSize )
+        try
         {
-            // Make sure everything outstanding goes to the current log file
-            this.flush( lsn, null, 0, 0, true);
+            lsn = logLSN++;
             
-            currentLogFile = logManager.switchToNextLogFile( currentLogFile );
-            appendedSize = currentLogFile.getLength();
-        }
-        
-        if ( recordSize <= logBufferSize )
-        {
-            ByteBuffer writeHead = logBuffer.writeHead;
+            if ( currentLogFile == null )
+            {
+                // We are just starting, get the current log file
+                currentLogFile = logManager.switchToNextLogFile( null );
+                appendedSize = currentLogFile.getLength();
+            }
             
-            while ( !appendedRecord )
+            if ( appendedSize > this.targetLogFileSize )
             {
-                // First get the rewind count then the position to which the readhead advanced
-                int readHeadRewindCount = logBuffer.readHeadRewindCount.get();
-                int readHeadPosition = logBuffer.readHeadPosition;                
+                // Make sure everything outstanding goes to the current log file
+                this.flush( lsn, null, 0, 0, true);
                 
-                if ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount ) || 
-                    ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount + 1 ) &&

-                        ( readHeadPosition < writeHead.position() ) ) )
+                currentLogFile = logManager.switchToNextLogFile( currentLogFile );
+                appendedSize = currentLogFile.getLength();
+            }
+            
+            if ( recordSize <= logBufferSize )
+            {
+                ByteBuffer writeHead = logBuffer.writeHead;
+                
+                while ( !appendedRecord )
                 {
-                    if ( writeHead.remaining() >= recordSize )
-                    {
-                        this.writeHeader( writeHead, length, lsn );
-                        writeHead.put( userBuffer, 0, length );
-                        this.writeFooter( writeHead, 0 );
-                        appendedRecord = true;
-                    }
-                    else // ( writeHead.remaining() < recordSize )
+                    // First get the rewind count then the position to which the readhead
advanced
+                    int readHeadRewindCount = logBuffer.readHeadRewindCount.get();
+                    int readHeadPosition = logBuffer.readHeadPosition;                
+                    
+                    if ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount ) || 
+                        ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount + 1 ) &&

+                            ( readHeadPosition < writeHead.position() ) ) )
                     {
-                        if ( writeHead.remaining() >= LogFileRecords.RECORD_HEADER_SIZE
)
+                        if ( writeHead.remaining() >= recordSize )
                         {
-                            // Write a skip record
-                            this.writeHeader( writeHead, -1, -1 );
+                            this.writeHeader( writeHead, recordSize, lsn );
+                            writeHead.put( userBuffer, 0, length );
+                            this.writeFooter( writeHead, 0 );
+                            appendedRecord = true;
+                        }
+                        else // ( writeHead.remaining() < recordSize )
+                        {
+                            if ( writeHead.remaining() >= LogFileRecords.RECORD_HEADER_SIZE
)
+                            {
+                                // Write a skip record
+                                this.writeHeader( writeHead, -1, -1 );
+                            }
+                            
+                            // rewind buffer now
+                            writeHead.rewind();
+                            logBuffer.writeHeadRewindCount++;
                         }
-                        
-                        // rewind buffer now
-                        writeHead.rewind();
-                        logBuffer.writeHeadRewindCount++;
-                    }
-                }
-                else 
-                {
-                    assert( logBuffer.writeHeadRewindCount == ( readHeadRewindCount + 1 )
) : 
-                            "Unexpected sequence number for read/write heads:" + logBuffer.writeHeadRewindCount
+
-                            " " + readHeadRewindCount;
-                    
-                    if ( ( readHeadPosition - writeHead.position() ) > recordSize )
-                    {
-                        this.writeHeader( writeHead, length, lsn );
-                        writeHead.put( userBuffer, 0, length );
-                        this.writeFooter( writeHead, 0 );
-                        appendedRecord = true;
                     }
-                    else
+                    else 
                     {
-                        this.flush( lsn, null, 0, 0, true);
+                        assert( logBuffer.writeHeadRewindCount == ( readHeadRewindCount +
1 ) ) : 
+                                "Unexpected sequence number for read/write heads:" + logBuffer.writeHeadRewindCount
+
+                                " " + readHeadRewindCount;
+                        
+                        if ( ( readHeadPosition - writeHead.position() ) > recordSize
)
+                        {
+                            this.writeHeader( writeHead, recordSize, lsn );
+                            writeHead.put( userBuffer, 0, length );
+                            this.writeFooter( writeHead, 0 );
+                            appendedRecord = true;
+                        }
+                        else
+                        {
+                            this.flush( lsn, null, 0, 0, true);
+                        }
                     }
                 }
+                
+            }
+            else
+            {   
+                this.flush( lsn, userBuffer, 0, length, true );
             }
             
+            
+            
+            userLogAnchor.resetLogAnchor( currentLogFile.logFileNumber(), appendedSize, lsn
);
+            this.appendedSize += recordSize;
         }
-        else
-        {   
-            this.flush( lsn, userBuffer, 0, length, true );
+        catch( IOException e )
+        {
+            e.printStackTrace();
+            logFailed = true; // Mark log subsytem failed
+        }
+        catch( InvalidLogException e )
+        {
+            e.printStackTrace();
+            logFailed = true; // Mark log subsystem failed
+        }
+        finally
+        {
+            appendLock.unlock();
         }
-        
-        
-        
-        userLogAnchor.resetLogAnchor( currentLogFile.logFileNumber(), appendedSize, lsn );
-        this.appendedSize += recordSize;
-    
-        appendLock.unlock();
         
         if ( sync )
             this.flush( lsn, null, 0, 0, false );
@@ -212,9 +236,10 @@ class LogFlushManager
      * @param length length of user data
      * @param appendLockHeld true if append lock is held
      * @throws IOException
+     * @throws InvalidLogException
      */
     private void flush( long flushLSN, byte[] userBuffer, int offset, int length, 
-                        boolean appendLockHeld ) throws IOException
+                        boolean appendLockHeld ) throws IOException, InvalidLogException
     {    
         long uptoLSN = flushLSN;
        
@@ -238,6 +263,11 @@ class LogFlushManager
         
         while ( true )
         {
+            if ( logFailed )
+            {
+                flushLock.unlock();
+                throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
+            }
             if ( ( flushStatus.flushedLSN >= uptoLSN ) && ( appendLockHeld ==
false ) )
             {
                 flushLock.unlock();
@@ -269,28 +299,51 @@ class LogFlushManager
         
         flushLock.unlock();
         
-        long flushedLSN = this.doFlush( uptoLSN, appendLockHeld );
+        long flushedLSN = LogAnchor.UNKNOWN_LSN;
         
-        // Now if there is a user buffer, flush from that        
-        if ( userBuffer != null )
+        
+        try
         {
-            ByteBuffer headerFooterHead = logBuffer.headerFooterHead;
+            flushedLSN = this.doFlush( uptoLSN, appendLockHeld );
             
-            headerFooterHead.rewind();
-            this.writeHeader( headerFooterHead, length, flushLSN );
-            currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_HEADER_MAGIC_NUMBER
);
+            // Now if there is a user buffer, flush from that        
+            if ( userBuffer != null )
+            {
+                ByteBuffer headerFooterHead = logBuffer.headerFooterHead;
+                int recordSize = LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE
+ length;
+                
+                headerFooterHead.rewind();
+                this.writeHeader( headerFooterHead, recordSize, flushLSN );
+                currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_HEADER_MAGIC_NUMBER
);
+                
+                currentLogFile.append( userBuffer, offset, length );   
+                
+                headerFooterHead.rewind();
+                this.writeFooter( headerFooterHead, 0 );
+                currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_FOOTER_SIZE
);
+    
+                flushedLSN = flushLSN;
+            }
             
-            currentLogFile.append( userBuffer, offset, length );   
+            currentLogFile.sync();
+        }
+        catch( IOException e )
+        {
+            // Mark the logger invalid, wakeup any waiters and return
+            flushLock.lock();
+            logFailed = true;
+            flushStatus.flushInProgress = false;
             
-            headerFooterHead.rewind();
-            this.writeFooter( headerFooterHead, 0 );
-            currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_FOOTER_SIZE
);
-
-            flushedLSN = flushLSN;
+            if ( flushStatus.numWaiters != 0 )
+            {
+                flushCondition.signalAll();
+            }
+            
+            flushLock.unlock();
+            
+            throw e;
         }
         
-        currentLogFile.sync();
-        
         flushLock.lock();
         
         if ( flushedLSN != LogAnchor.UNKNOWN_LSN )
@@ -371,8 +424,10 @@ class LogFlushManager
                     break;
                 
                 
+      
+                
                 // Sanitize length, it includes header and footer overhead
-                assert( length >  ( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER + LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER)
) :
+                assert( length >  ( LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE)
) :
                     "Record length doesnt make sense:" + length + " expected:" +
                     ( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER + LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER);
                 
@@ -431,7 +486,7 @@ class LogFlushManager
         buffer.putInt( LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER );
     }
     
-    
+ 
     /**
      * Used to group the memory buffer data together 
      */
@@ -487,11 +542,11 @@ class LogFlushManager
         boolean flushInProgress;
         
         /** Current flush request */
-        long uptoLSN;
+        long uptoLSN = LogAnchor.UNKNOWN_LSN;
         
         
         /** Current flushed lsn */
-        long flushedLSN;
+        long flushedLSN = LogAnchor.UNKNOWN_LSN;
         
         
         /** Keeps track of the number of waiters */

Modified: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java?rev=1178793&r1=1178792&r2=1178793&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/main/java/org/apache/directory/server/log/impl/LogManager.java
Tue Oct  4 13:57:24 2011
@@ -21,7 +21,7 @@ class LogManager
 {
    
     /**  Controlfile record size */
-    private final static int CONTROLFILE_RECORD_SIZE = 36;
+    private final static int CONTROLFILE_RECORD_SIZE = 44;
     
     /** Controlfile file magic number */
     private final static int CONTROLFILE_MAGIC_NUMBER = 0xFF11FF11;
@@ -80,7 +80,7 @@ class LogManager
     public void initLogManager() throws IOException, InvalidLogException
     {
         LogAnchor scanPoint = new LogAnchor();
-        LogScanner scanner;
+        LogScannerInternal scanner;
         UserLogRecord logRecord;        
         LogFileManager.LogFileReader reader;
 
@@ -108,7 +108,8 @@ class LogManager
             scanPoint.resetLogAnchor( minLogAnchor );
             
             logRecord = new UserLogRecord();
-            scanner = new DefaultLogScanner( scanPoint, logFileManager );
+            scanner = new DefaultLogScanner();
+            scanner.init( scanPoint, logFileManager );
             
             try
             {
@@ -199,7 +200,7 @@ class LogManager
              */
            reader = null;
            boolean fileExists = false;
-           currentLogFileNumber = LogAnchor.MIN_LOG_NUMBER;
+           currentLogFileNumber = LogAnchor.MIN_LOG_NUMBER - 1;
            try
            {
                reader = logFileManager.getReaderForLogFile( LogAnchor.MIN_LOG_NUMBER );
@@ -209,6 +210,7 @@ class LogManager
                    throw new InvalidLogException( I18n.err( I18n.ERR_750 ) );
                }
                fileExists = true;
+               currentLogFileNumber++;
            }
            catch ( FileNotFoundException e )
            {
@@ -253,7 +255,13 @@ class LogManager
             this.createNextLogFile( false );
         }
         
-        return logFileManager.getWriterForLogFile( this.currentLogFileNumber );
+        LogFileManager.LogFileWriter writer =  logFileManager.getWriterForLogFile( this.currentLogFileNumber
);
+        long currentOffset = writer.getLength();
+        if ( currentOffset > 0 )
+        {
+            writer.seek( currentOffset );
+        }
+        return writer;    
     }
     
     /**

Added: directory/apacheds/branches/apacheds-txns/xdbm-partition/src/test/java/org/apache/directory/server/log/impl/LogFlushScanTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/xdbm-partition/src/test/java/org/apache/directory/server/log/impl/LogFlushScanTest.java?rev=1178793&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/xdbm-partition/src/test/java/org/apache/directory/server/log/impl/LogFlushScanTest.java
(added)
+++ directory/apacheds/branches/apacheds-txns/xdbm-partition/src/test/java/org/apache/directory/server/log/impl/LogFlushScanTest.java
Tue Oct  4 13:57:24 2011
@@ -0,0 +1,362 @@
+
+package org.apache.directory.server.log.impl;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.directory.server.log.Log;
+import org.apache.directory.server.log.UserLogRecord;
+import org.apache.directory.server.log.InvalidLogException;
+import org.apache.directory.server.log.LogAnchor;
+import org.apache.directory.server.log.LogScanner;
+
+public class LogFlushScanTest
+{
+    /** Logger */
+    private Log log;
+    
+    /** Log buffer size */
+    private int logBufferSize = 1 << 12;
+    
+    /** Log File Size */
+    private long logFileSize = 1 << 13;
+    
+    /** log suffix */
+    private String logSuffix = "log";
+    
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+
+    private String getLogFoler( ) throws IOException
+    {
+        String file = folder.newFolder( "log" ).getAbsolutePath();
+        return file;
+    }
+
+
+    @Before
+    public void setup() throws IOException, InvalidLogException
+    {
+        log = new DefaultLog();
+        log.init( this.getLogFoler(), logSuffix, logBufferSize, logFileSize );
+    }
+
+
+    @After
+    public void teardown() throws IOException
+    {
+  
+    }
+    
+    @Test
+    public void testAppendScan()
+    {
+        int idx;
+        int dataLength = 1024;
+        UserLogRecord logRecord = new UserLogRecord();
+        byte recordData[] = new byte[dataLength];
+        byte userRecord[];
+        boolean failed = false;
+        
+        byte writtenCounter = 0;
+        byte readCounter = 0;
+        
+        LogAnchor startingPoint = new LogAnchor();
+        
+        for ( idx = 0; idx < dataLength; idx++ )
+        {
+            recordData[idx] = writtenCounter;
+        }
+        writtenCounter++;
+        
+        try
+        {
+            logRecord.setData( recordData, dataLength );
+            log.log( logRecord, false );
+            
+            // Record the starting point
+            startingPoint.resetLogAnchor( logRecord.getLogAnchor() );
+            
+            for ( idx = 0; idx < dataLength; idx++ )
+            {
+                recordData[idx] = writtenCounter;
+            }
+            writtenCounter++;
+            
+            logRecord.setData( recordData, dataLength );
+            log.log( logRecord, true ); //Sync what we logged so far
+       
+            
+            LogScanner logScanner = log.beginScan( startingPoint );
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                assertTrue( logRecord.getDataLength() == dataLength );
+                
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    assertTrue( userRecord[idx] == readCounter );
+                }
+                
+                readCounter++;
+            }
+            
+            assertTrue( writtenCounter == readCounter );
+        
+        }
+        catch( IOException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        catch( InvalidLogException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        
+        assertTrue( failed == false );
+    }
+    
+    @Test
+    public void testLogSwitchScan()
+    {
+        int idx;
+        int dataLength = 1024;
+        UserLogRecord logRecord = new UserLogRecord();
+        byte recordData[] = new byte[dataLength];
+        byte userRecord[];
+        boolean failed = false;
+        
+        byte writtenCounter = 1;
+        byte readCounter = 1;
+        byte maxCounter = 127; 
+        boolean firstRecord = true;
+        
+        LogAnchor startingPoint = new LogAnchor();
+        LogAnchor endPoint = new LogAnchor();
+        
+       try
+       {
+            while ( writtenCounter < maxCounter )
+            {
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    recordData[idx] = writtenCounter;
+                }
+                
+                logRecord.setData( recordData, dataLength );
+                boolean sync = ( ( writtenCounter % 11 ) == 0 ) || ( writtenCounter == (
maxCounter - 1 ) );
+                log.log( logRecord, sync );
+                
+                if ( firstRecord )
+                {
+                 // Record the starting point
+                    startingPoint.resetLogAnchor( logRecord.getLogAnchor() );
+                    firstRecord = false;
+                }
+                
+                if ( ( writtenCounter == ( maxCounter - 1 ) ) )
+                {
+                    endPoint.resetLogAnchor( logRecord.getLogAnchor() );
+                }
+                
+                writtenCounter++;
+            }
+            
+            assertTrue( endPoint.getLogFileNumber() > startingPoint.getLogFileNumber()
); 
+            
+            LogScanner logScanner = log.beginScan( startingPoint );
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                assertTrue( logRecord.getDataLength() == dataLength );
+                
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    assertTrue( userRecord[idx] == readCounter );
+                }
+                
+                readCounter++;
+            }
+            
+            assertTrue( writtenCounter == readCounter );
+            
+        
+       }
+       catch( IOException e )
+       {
+           e.printStackTrace();
+           failed = true;
+       }
+       catch( InvalidLogException e )
+       {
+           e.printStackTrace();
+           failed = true;
+       }
+        
+    }
+    
+    @Test
+    public void testMultiThreadedAppend() throws InterruptedException
+    {
+        int idx;
+        int dataLength = 1024;
+        UserLogRecord logRecord = new UserLogRecord();
+        byte recordData[] = new byte[dataLength];
+        byte userRecord[];
+        boolean failed = false;
+        
+        
+        
+        LogAnchor startingPoint = new LogAnchor();
+        
+        for ( idx = 0; idx < dataLength; idx++ )
+        {
+            recordData[idx] = 0;
+        }
+        
+        logRecord.setData( recordData, dataLength );
+        
+        try
+        {
+            log.log( logRecord, false );
+        }
+        catch( IOException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        catch( InvalidLogException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        
+        assertTrue( failed == false );
+        
+        startingPoint.resetLogAnchor( logRecord.getLogAnchor() );
+        
+        
+        byte key = 1;
+        int numThreads = 4;
+        int numAppends = 64;
+        int expectedSum = 0;
+        int sum = 0;
+        MultiThreadedAppend threads[] = new MultiThreadedAppend[numThreads];
+        
+        for ( idx = 0; idx < numThreads; idx++ )
+        {
+            threads[idx] = new MultiThreadedAppend( key , dataLength, numAppends);
+            expectedSum += key * numAppends;
+        }
+        
+        for ( idx = 0; idx < numThreads; idx++ )
+        {
+            threads[idx].start();
+        }
+        
+        for ( idx = 0; idx < numThreads; idx++ )
+        {
+            threads[idx].join();
+        }
+        
+        
+        LogScanner logScanner = log.beginScan( startingPoint );
+        
+        try
+        {
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                assertTrue( logRecord.getDataLength() == dataLength );
+                key = userRecord[0];
+                
+                for ( idx = 0; idx < dataLength; idx++ )
+                {
+                    assertTrue( userRecord[idx] == key );
+                }
+                
+                sum += key;
+            }
+        }
+        catch( IOException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        catch( InvalidLogException e )
+        {
+            e.printStackTrace();
+            failed = true;
+        }
+        
+        
+        assertTrue( sum == expectedSum );
+    }
+    
+    class MultiThreadedAppend extends Thread
+    {
+        byte key;
+        int dataLength;
+        int numAppends;
+                
+        public MultiThreadedAppend( byte key, int dataLength, int numAppends )
+        {
+            this.key = key;
+            this.dataLength = dataLength;
+            this.numAppends = numAppends;
+        }
+        
+        public void run() 
+        {
+            UserLogRecord logRecord = new UserLogRecord();
+            byte recordData[] = new byte[dataLength];
+            int idx;
+            boolean failed = false;
+            
+            for ( idx = 0; idx < dataLength; idx++ )
+            {
+                recordData[idx] = key;
+            }
+            
+            logRecord.setData( recordData, dataLength );
+            
+            try
+            {
+                for ( idx = 0; idx < numAppends; idx++ )
+                {
+                    boolean sync = false;
+                    if ( ( ( idx % 3 )  == 0 ) || ( idx == numAppends - 1 ) )
+                    {
+                        sync = true;
+                    }
+                    
+                    log.log( logRecord, sync );
+                }
+            }
+            catch( IOException e )
+            {
+                e.printStackTrace();
+                failed = true;
+            }
+            catch( InvalidLogException e )
+            {
+                e.printStackTrace();
+                failed = true;
+            }
+            
+            assertTrue( failed == false );
+        }
+    }
+
+}



Mime
View raw message