directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject svn commit: r1195429 - in /directory/apacheds/branches/apacheds-txns/core/src: main/java/org/apache/directory/server/core/txn/ test/java/org/apache/directory/server/core/txn/
Date Mon, 31 Oct 2011 12:14:48 GMT
Author: saya
Date: Mon Oct 31 12:14:48 2011
New Revision: 1195429

URL: http://svn.apache.org/viewvc?rev=1195429&view=rev
Log:
Tests for txn manager services begin, commit, abort and building txn dependency list.

Documentatin for txn manager.

Added:
    directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/txn/DefaultTxnManagerTest.java
Modified:
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerFactory.java
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java?rev=1195429&r1=1195428&r2=1195429&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/DefaultTxnManager.java
Mon Oct 31 12:14:48 2011
@@ -47,7 +47,7 @@ import java.io.IOException;
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-public class DefaultTxnManager<ID> implements  TxnManagerInternal<ID>
+/** Package protected */ class DefaultTxnManager<ID> implements  TxnManagerInternal<ID>
 {
     /** wal log manager */
     private TxnLogManager<ID> txnLogManager;
@@ -68,7 +68,7 @@ public class DefaultTxnManager<ID> imple
     private AtomicReference<ReadWriteTxn<ID>> latestVerifiedTxn = new AtomicReference<ReadWriteTxn<ID>>();
     
     /** Latest flushed txn's logical commit time */
-    private AtomicLong latestFlushedTxnLSN = new AtomicLong( 0 );
+    private AtomicLong latestFlushedTxnLSN = new AtomicLong( LogAnchor.UNKNOWN_LSN );
     
     /** ID comparator */
     private Comparator<ID> idComparator;
@@ -205,25 +205,44 @@ public class DefaultTxnManager<ID> imple
     }
     
     
+    /**
+     * Begins a read only txn. A read only txn does not put any log edits
+     * to the txn log.Its start time is the latest committed txn's commit time. 
+     */
     private void beginReadOnlyTxn()
     {
         ReadOnlyTxn<ID> txn = new ReadOnlyTxn<ID>();
         ReadWriteTxn<ID> lastTxnToCheck = null;
-        
+
+        /*
+         * Set the start time as the latest committed txn's commit time. We need to make
sure that
+         * any change after our start time is not flushed to the partitions. Say we have
txn1 as the
+         * lastest committed txn. There is a small window where we get ref to txn1, txn2
commits and
+         * becomes the latest committed txn, txn1's ref count becomes zero before we bump
its ref
+         * count and changes to txn2 are flushed to partitions. Below we loop until we make
sure
+         * that the txn for which we bumped up the ref count is indeed the latest committed
txn.
+         */
+
         do
         {
             if ( lastTxnToCheck != null )
             {
                 lastTxnToCheck.getRefCount().decrementAndGet();
             }
-            
+
             lastTxnToCheck = latestCommittedTxn.get();
-            lastTxnToCheck.getRefCount().getAndIncrement();
-        } while ( lastTxnToCheck != latestCommittedTxn.get()  );
-        
+
+            if ( lastTxnToCheck != null )
+            {
+                lastTxnToCheck.getRefCount().getAndIncrement();
+            }
+
+        }
+        while ( lastTxnToCheck != latestCommittedTxn.get() );
+
         // Determine start time
         long startTime;
-        
+
         if ( lastTxnToCheck != null )
         {
             startTime = lastTxnToCheck.getCommitTime();
@@ -232,22 +251,27 @@ public class DefaultTxnManager<ID> imple
         {
             startTime = LogAnchor.UNKNOWN_LSN;
         }
-        
+
         txn.startTxn( startTime );
-        
+
         buildCheckList( txn, lastTxnToCheck );
         txnVar.set( txn );
     }
     
-    
+
+    /**
+     * Begins a read write txn. A start txn marker is inserted
+     * into the txn log and the lsn of that log record is the
+     * start time.
+     */
     private void beginReadWriteTxn() throws IOException
     {
-        
+
         ReadWriteTxn<ID> txn = new ReadWriteTxn<ID>();
         UserLogRecord logRecord = txn.getUserLogRecord();
-        
-        TxnStateChange<ID> txnRecord = new TxnStateChange<ID>( LogAnchor.UNKNOWN_LSN,

-                TxnStateChange.State.TXN_BEGIN );
+
+        TxnStateChange<ID> txnRecord = new TxnStateChange<ID>( LogAnchor.UNKNOWN_LSN,
+            TxnStateChange.State.TXN_BEGIN );
         ObjectOutputStream out = null;
         ByteArrayOutputStream bout = null;
         byte[] data;
@@ -266,47 +290,67 @@ public class DefaultTxnManager<ID> imple
             {
                 bout.close();
             }
-            
+
             if ( out != null )
             {
                 out.close();
             }
         }
-        
-        logRecord.setData(  data, data.length );
-        
-        ReadWriteTxn<ID> lastTxnToCheck = null; 
+
+        logRecord.setData( data, data.length );
+
+        /*
+         * Get the start time and last txn to depend on
+         * when mergin data under te writeTxnLock.
+         */
+
+        ReadWriteTxn<ID> lastTxnToCheck = null;
         writeTxnsLock.lock();
-        
+
         try
         {
             txnLogManager.log( logRecord, false );
             txn.startTxn( logRecord.getLogAnchor().getLogLSN() );
-            
+
             do
             {
                 if ( lastTxnToCheck != null )
                 {
                     lastTxnToCheck.getRefCount().decrementAndGet();
                 }
-                
+
                 lastTxnToCheck = latestVerifiedTxn.get();
-                lastTxnToCheck.getRefCount().incrementAndGet();
-            } while ( lastTxnToCheck != latestVerifiedTxn.get() );
-            
+
+                if ( lastTxnToCheck != null )
+                {
+                    lastTxnToCheck.getRefCount().incrementAndGet();
+                }
+
+            }
+            while ( lastTxnToCheck != latestVerifiedTxn.get() );
+
         }
         finally
         {
             writeTxnsLock.unlock();
         }
-        
+
         // Finally build the check list
         buildCheckList( txn, lastTxnToCheck );
-        
+
         txnVar.set( txn );
     }
     
-    
+
+    /**
+     * Builds the list of txns which the given txn should check while mergin what it read
from
+     * the partitions with the changes in the txn log. These are the txns that committed
before
+     * the start of the give txn and for which the changes are not flushed to the partitions
yet.
+     * Note that, for some of these txns, flush to partitions could go on in parallel.
+     *
+     * @param txn txn for which we will build the check list
+     * @param lastTxnToCheck latest txn to check
+     */
     private void buildCheckList( Transaction<ID> txn, ReadWriteTxn<ID> lastTxnToCheck
)
     {
         if ( lastTxnToCheck != null )
@@ -316,7 +360,7 @@ public class DefaultTxnManager<ID> imple
 
             List<ReadWriteTxn<ID>> toCheckList = txn.getTxnsToCheck();
             Iterator<ReadWriteTxn<ID>> it = committedQueue.iterator();
-            
+
             while ( it.hasNext() )
             {
                 toAdd = it.next();
@@ -336,29 +380,54 @@ public class DefaultTxnManager<ID> imple
 
             it = toCheckList.iterator();
             ReadWriteTxn<ID> toCheck;
-            
+
             while ( it.hasNext() )
             {
                 toCheck = it.next();
-                
+
                 if ( toCheck.commitTime <= flushedLSN )
                 {
                     it.remove();
                 }
             }
         }
+
+        // A read write txn, always has to check its changes
+        if ( txn instanceof ReadWriteTxn )
+        {
+            txn.getTxnsToCheck().add( ( ReadWriteTxn<ID> ) txn );
+        }
     }
     
     
+    /**
+     * Called before ending a txn. Txn for which this txn bumped 
+     * up the ref count is gotten and its ref count is decreased.
+     *
+     * @param txn txn which is about to commit or abort
+     */
     private void prepareForEndingTxn( Transaction<ID> txn )
     {
         List<ReadWriteTxn<ID>> toCheck = txn.getTxnsToCheck();
         
+        // A read write txn, always has to check its changes
+        if ( txn instanceof ReadWriteTxn )
+        {
+
+            if ( toCheck.size() <= 0 )
+            {
+                throw new IllegalStateException(
+                    " prepareForEndingTxn: a read write txn should at least depend on itself:"
+ txn );
+            }
+
+            txn.getTxnsToCheck().remove( ( ReadWriteTxn<ID> ) txn );
+        }
+        
         if ( toCheck.size() > 0 )
         {
             ReadWriteTxn<ID> lastTxnToCheck = toCheck.get( toCheck.size() - 1 );
             
-            if ( lastTxnToCheck.commitTime != txn.getStartTime() )
+            if ( lastTxnToCheck.commitTime > txn.getStartTime() )
             {
                 throw new IllegalStateException( " prepareForEndingTxn: txn has unpexptected
start time " + 
                     txn + " expected: " + lastTxnToCheck );
@@ -375,6 +444,30 @@ public class DefaultTxnManager<ID> imple
     }
     
     
+    /**
+     * Tries to commit the given read write txn. Before a read write txn can commit, it is
+     * verified against the txns that committed after this txn started. If a conflicting
change is
+     * found, a conflict exception is thrown. 
+     * 
+     * If a txn can commit, a commit record is inserted into the txn log. The lsn of the
commit record
+     * is the commit time of the txn.
+     * 
+     * Note that, a txn is not committed until its commit record is synced to the underlying
media. Say we haveread write txns rw1 and 
+     * rw2 and that rw1 and rw2 is verified and their commit record are in the log but not
synced to underlying media yet. A new read 
+     * write txn rw3 and a read only txn r1 comes along. Since rw1 and rw2 wont be acked
until they commit, r1 should not depend on rw1 and rw2 and can have a view 
+     * as of a commit time before rw1 and rw2's commit time. If r1 depended rw1 or rw2 and
we crashed before sycning rw1 and rw2's to the underlying media, 
+     * r1 would have depended on a change that actually doesnt exist in the database. However,
rw3 either has to depend on rw1 and rw2) or has to verify 
+     * its changeset against rw1 and rw2 when it tries to commit. Whether the first thing(depending
on rw1, rw2 and merging its changeset) or the second
+     * thing ( verifiying its change set against rw1 and rw2) is determined by the order
of the lsns of the commit record of rw1 and rw2  and start record of rw3.
+     * Lets say we have this order in the txn log:
+     *              commit record rw1, start record rw3, commit record rw2
+     * then rw3 will merge its changes with that of rw1 and will verify its changes against
rw2. When rw3 is merging its changeset with that of rw1, rw1 might not have
+     * committed yet as its commit record might not have made it to the underlying media
but this is OK as rw3 cannot commit before rw1 because of the log.
+     *
+     * @param txn txn to commit.
+     * @throws IOException
+     * @throws TxnConflictException
+     */
     private void commitReadWriteTxn( ReadWriteTxn<ID> txn ) throws IOException, TxnConflictException
     {
         UserLogRecord logRecord = txn.getUserLogRecord();
@@ -412,7 +505,7 @@ public class DefaultTxnManager<ID> imple
        
         //Verify txn and throw conflict exception if necessary
         Iterator<ReadWriteTxn<ID>> it = committedQueue.iterator();
-        ReadWriteTxn toCheckTxn;
+        ReadWriteTxn<ID> toCheckTxn;
         long startTime = txn.getStartTime();
         
         while ( it.hasNext() )
@@ -453,7 +546,13 @@ public class DefaultTxnManager<ID> imple
         }
     }
     
-    
+
+    /**
+     * Aborts a read write txn. An abort record is inserted into the txn log.
+     *
+     * @param txn txn to abort
+     * @throws IOException
+     */
     private void abortReadWriteTxn( ReadWriteTxn<ID> txn ) throws IOException
     {
         UserLogRecord logRecord = txn.getUserLogRecord();

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java?rev=1195429&r1=1195428&r2=1195429&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadOnlyTxn.java
Mon Oct 31 12:14:48 2011
@@ -23,7 +23,7 @@ package org.apache.directory.server.core
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-class ReadOnlyTxn<ID> extends AbstractTransaction<ID>
+/** Package protected */ class ReadOnlyTxn<ID> extends AbstractTransaction<ID>
 {
-   
+
 }

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java?rev=1195429&r1=1195428&r2=1195429&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/ReadWriteTxn.java
Mon Oct 31 12:14:48 2011
@@ -58,7 +58,7 @@ import org.apache.directory.shared.ldap.
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-class ReadWriteTxn<ID> extends AbstractTransaction<ID>
+/** Package protected */ class ReadWriteTxn<ID> extends AbstractTransaction<ID>
 {  
     /** list of log edits by the txn */
     private List<LogEdit<ID>> logEdits = new LinkedList<LogEdit<ID>>();

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerFactory.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerFactory.java?rev=1195429&r1=1195428&r2=1195429&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerFactory.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerFactory.java
Mon Oct 31 12:14:48 2011
@@ -58,7 +58,7 @@ public class TxnManagerFactory
      */
     @SuppressWarnings("unchecked")
     public static <ID> void init( Comparator<ID> idComparator, Serializer idSerializer,
String logFolderPath,
-        int logBufferSize, int logFileSize ) throws IOException
+        int logBufferSize, long logFileSize ) throws IOException
     {
         Log log = new DefaultLog();
 

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java?rev=1195429&r1=1195428&r2=1195429&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/txn/TxnManagerInternal.java
Mon Oct 31 12:14:48 2011
@@ -23,7 +23,12 @@ package org.apache.directory.server.core
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-public interface TxnManagerInternal<ID> extends TxnManager<ID>
+/** Package protected */ interface TxnManagerInternal<ID> extends TxnManager<ID>
 {
+    /**
+     * Returns the current txn associated with the current thread.
+     *
+     * @return current txn
+     */
     Transaction<ID> getCurTxn();
 }

Added: directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/txn/DefaultTxnManagerTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/txn/DefaultTxnManagerTest.java?rev=1195429&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/txn/DefaultTxnManagerTest.java
(added)
+++ directory/apacheds/branches/apacheds-txns/core/src/test/java/org/apache/directory/server/core/txn/DefaultTxnManagerTest.java
Mon Oct 31 12:14:48 2011
@@ -0,0 +1,204 @@
+
+package org.apache.directory.server.core.txn;
+
+import java.io.IOException;
+
+import org.apache.directory.server.core.log.InvalidLogException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class DefaultTxnManagerTest
+{
+    /** Log buffer size : 4096 bytes */
+    private int logBufferSize = 1 << 12;
+
+    /** Log File Size : 8192 bytes */
+    private long logFileSize = 1 << 13;
+
+    /** log suffix */
+    private static String LOG_SUFFIX = "log";
+
+    /** Txn manager */
+    private TxnManagerInternal<Long> txnManager;
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+
+    /**
+     * Get the Log folder
+     */
+    private String getLogFolder() throws IOException
+    {
+        String file = folder.newFolder( LOG_SUFFIX ).getAbsolutePath();
+
+        return file;
+    }
+
+
+    @Before
+    public void setup() throws IOException, InvalidLogException
+    {
+        TxnManagerFactory.<Long> init( LongComparator.INSTANCE, LongSerializer.INSTANCE,
getLogFolder(), logBufferSize,
+            logFileSize );
+        txnManager = TxnManagerFactory.<Long> txnManagerInternalInstance();
+    }
+
+
+    @After
+    public void teardown() throws IOException
+    {
+    }
+
+
+    @Test
+    public void testBeginCommitReadOnlyTxn()
+    {
+        try
+        {
+            txnManager.beginTransaction( true );
+
+            assertTrue( txnManager.getCurTxn() != null );
+            assertTrue( txnManager.getCurTxn() instanceof ReadOnlyTxn );
+
+            txnManager.commitTransaction();
+        }
+        catch ( IOException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+        catch ( TxnConflictException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+
+    @Test
+    public void testBeginAbortReadOnlyTxn()
+    {
+        try
+        {
+            txnManager.beginTransaction( true );
+
+            assertTrue( txnManager.getCurTxn() != null );
+            assertTrue( txnManager.getCurTxn() instanceof ReadOnlyTxn );
+
+            txnManager.abortTransaction();
+        }
+        catch ( IOException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+
+    @Test
+    public void testBeginCommitReadWriteTxn()
+    {
+        try
+        {
+            txnManager.beginTransaction( false );
+
+            assertTrue( txnManager.getCurTxn() != null );
+            assertTrue( txnManager.getCurTxn() instanceof ReadWriteTxn );
+
+            txnManager.commitTransaction();
+        }
+        catch ( IOException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+        catch ( TxnConflictException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+
+    @Test
+    public void testBeginAbortReadWriteTxn()
+    {
+        try
+        {
+            txnManager.beginTransaction( false );
+
+            assertTrue( txnManager.getCurTxn() != null );
+            assertTrue( txnManager.getCurTxn() instanceof ReadWriteTxn );
+
+            txnManager.abortTransaction();
+        }
+        catch ( IOException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+
+    @Test
+    public void testDependencyList()
+    {
+        List<ReadWriteTxn<Long>> dependentTxns;
+        try
+        {
+            Transaction<Long> txn1 = null;
+            txnManager.beginTransaction( false );
+            txn1 = txnManager.getCurTxn();
+            txnManager.commitTransaction();
+
+            Transaction<Long> txn2 = null;
+            txnManager.beginTransaction( false );
+            txn2 = txnManager.getCurTxn();
+            txnManager.commitTransaction();
+
+            Transaction<Long> txn3 = null;
+            txnManager.beginTransaction( true );
+            txn3 = txnManager.getCurTxn();
+
+            dependentTxns = txn3.getTxnsToCheck();
+            assertTrue( dependentTxns.contains( txn1 ) );
+            assertTrue( dependentTxns.contains( txn2 ) );
+            assertTrue( dependentTxns.contains( txn3 ) == false );
+
+            txnManager.commitTransaction();
+
+            Transaction<Long> txn4 = null;
+            txnManager.beginTransaction( false );
+            txn4 = txnManager.getCurTxn();;
+            dependentTxns = txn4.getTxnsToCheck();
+            assertTrue( dependentTxns.contains( txn1 ) );
+            assertTrue( dependentTxns.contains( txn2 ) );
+            assertTrue( dependentTxns.contains( txn3 ) == false );
+            assertTrue( dependentTxns.contains( txn4 ) );
+
+            txnManager.commitTransaction();
+
+        }
+        catch ( IOException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+        catch ( TxnConflictException e )
+        {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+}



Mime
View raw message