directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject svn commit: r1299421 - in /directory/apacheds/branches/apacheds-txns: core-api/src/main/java/org/apache/directory/server/core/api/txn/ core-shared/src/main/java/org/apache/directory/server/core/shared/txn/ core/src/main/java/org/apache/directory/server...
Date Sun, 11 Mar 2012 18:45:05 GMT
Author: saya
Date: Sun Mar 11 18:45:04 2012
New Revision: 1299421

URL: http://svn.apache.org/viewvc?rev=1299421&view=rev
Log:
*Added txn conflict handling to the txn layer and the default operation manager
*Removed some recently added unneccassry code from Default operation manager

next tests will be added for conflict handling.

Modified:
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/ReadWriteTxn.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java?rev=1299421&r1=1299420&r2=1299421&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
Sun Mar 11 18:45:04 2012
@@ -35,7 +35,14 @@ public interface TxnManager
      * @throws Exception
      */
     TxnHandle beginTransaction( boolean readOnly ) throws Exception;
-
+    
+    
+    /**
+     * Retries a txn. Retry is not necessary for read only transactions and
+     * this method should only expect RW transactions
+     * 
+     */
+    TxnHandle retryTransaction() throws Exception;
 
     
     /**

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java?rev=1299421&r1=1299420&r2=1299421&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
Sun Mar 11 18:45:04 2012
@@ -57,6 +57,21 @@ abstract class AbstractTransaction imple
     protected long id;
 
     private static AtomicLong counter = new AtomicLong( 0 );
+    
+    /** Trus if this is the only running txn */
+    private boolean isExclusive = false;
+
+
+    public boolean isExclusive()
+    {
+        return isExclusive;
+    }
+    
+    
+    public void setExclusive()
+    {
+        isExclusive = true;
+    }
 
 
     /**

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java?rev=1299421&r1=1299420&r2=1299421&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
Sun Mar 11 18:45:04 2012
@@ -33,7 +33,9 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.directory.server.core.api.log.LogAnchor;
 import org.apache.directory.server.core.api.log.UserLogRecord;
@@ -62,6 +64,9 @@ class DefaultTxnManager implements TxnMa
 
     /** Used to assign start and commit version numbers to writeTxns */
     private Lock writeTxnsLock = new ReentrantLock();
+    
+    /** Used to order txns in case of conflicts */
+    private ReadWriteLock optimisticLock = new ReentrantReadWriteLock();
 
     /** Latest committed txn on which read only txns can depend */
     private AtomicReference<ReadWriteTxn> latestCommittedTxn = new AtomicReference<ReadWriteTxn>();
@@ -170,6 +175,28 @@ class DefaultTxnManager implements TxnMa
         syncer = null;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    public TxnHandle retryTransaction() throws Exception
+    {
+        Transaction curTxn = getCurTxn();
+
+        // Should have a rw txn
+        if ( ( curTxn == null ) ||
+              !( curTxn instanceof ReadWriteTxn )  )
+        {
+            // Cannot start a TXN when a RW txn is ongoing 
+            throw new IllegalStateException( "Unexpected txn state when trying txn: " +
+                curTxn );
+        }
+        
+        // abort current txn and start a new read write txn
+        
+        abortTransaction();
+        return beginReadWriteTxn( true );
+    }
+    
 
     /**
      * {@inheritDoc}
@@ -181,19 +208,6 @@ class DefaultTxnManager implements TxnMa
         // Deal with an existing TXN
         if ( curTxn != null )
         {
-            // If we already have a RO TXN, then we can start another one
-            if ( curTxn instanceof ReadOnlyTxn )
-            {
-                if ( readOnly )
-                {
-                    return beginReadOnlyTxn();
-                }
-                else
-                {
-                    return beginReadWriteTxn();
-                }
-            }
-
             // Cannot start a TXN when a RW txn is ongoing 
             throw new IllegalStateException( "Cannot begin a txn when txn is already running:
" +
                 curTxn );
@@ -206,7 +220,7 @@ class DefaultTxnManager implements TxnMa
         }
         else
         {
-            return beginReadWriteTxn();
+            return beginReadWriteTxn( false );
         }
     }
 
@@ -251,22 +265,37 @@ class DefaultTxnManager implements TxnMa
     public void abortTransaction() throws Exception
     {
         Transaction txn = getCurTxn();
-
+        
         if ( txn == null )
         {
-            // this is acceptable
-            return;
+            throw new IllegalStateException("Trying to abort while there is not txn ");
         }
+        
+        boolean isExclusive = txn.isExclusive();
 
-        prepareForEndingTxn( txn );
-
-        if ( txn instanceof ReadWriteTxn )
+        try
         {
-            abortReadWriteTxn( ( ReadWriteTxn ) txn );
+            prepareForEndingTxn( txn );
+    
+            if ( txn instanceof ReadWriteTxn )
+            {
+                abortReadWriteTxn( ( ReadWriteTxn ) txn );
+            }
+    
+            txn.abortTxn();
+            setCurTxn( null );
+        }
+        finally
+        {
+            if ( !isExclusive )
+            {
+                optimisticLock.readLock().unlock();
+            }
+            else
+            {
+                optimisticLock.writeLock().unlock();
+            }
         }
-
-        txn.abortTxn();
-        setCurTxn( null );
 
         //System.out.println( "TRAN: Aborted " + txn );
 
@@ -391,6 +420,7 @@ class DefaultTxnManager implements TxnMa
 
         buildCheckList( txn, lastTxnToCheck );
 
+        optimisticLock.readLock().lock(); 
         setCurTxn( txn );
 
         //System.out.println( "TRAN: Started " + txn );
@@ -404,7 +434,7 @@ class DefaultTxnManager implements TxnMa
      * into the txn log and the lsn of that log record is the
      * start time.
      */
-    private Transaction beginReadWriteTxn() throws Exception
+    private Transaction beginReadWriteTxn( boolean retry ) throws Exception
     {
 
         ReadWriteTxn txn = new ReadWriteTxn();
@@ -438,6 +468,16 @@ class DefaultTxnManager implements TxnMa
         }
 
         logRecord.setData( data, data.length );
+        
+        if ( retry == false )
+        {
+            optimisticLock.readLock().lock();
+        }
+        else
+        {
+            optimisticLock.writeLock().lock();
+            txn.setExclusive();
+        }
 
         /*
          * Get the start time and last txn to depend on
@@ -466,6 +506,17 @@ class DefaultTxnManager implements TxnMa
             }
             while ( lastTxnToCheck != latestVerifiedTxn.get() );
         }
+        catch( Exception e )
+        {
+            if ( txn.isExclusive() == false )
+            {
+                optimisticLock.readLock().unlock();
+            }
+            else
+            {
+                optimisticLock.writeLock().unlock();
+            }
+        }
         finally
         {
             writeTxnsLock.unlock();

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/ReadWriteTxn.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/ReadWriteTxn.java?rev=1299421&r1=1299420&r2=1299421&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/ReadWriteTxn.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/ReadWriteTxn.java
Sun Mar 11 18:45:04 2012
@@ -87,8 +87,8 @@ class ReadWriteTxn extends AbstractTrans
 
     /** List of Dn sets affected by the write operations of this txn */
     private List<DnSet> writeDns = new LinkedList<DnSet>();
-
-
+    
+    
     public AtomicInteger getRefCount()
     {
         return txnRefCount;

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java?rev=1299421&r1=1299420&r2=1299421&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
Sun Mar 11 18:45:04 2012
@@ -37,7 +37,12 @@ import org.apache.directory.shared.ldap.
 /** Package protected */
 interface Transaction extends TxnHandle
 {
-
+    boolean isExclusive();
+    
+    
+    void setExclusive();
+    
+    
     /**
      * Get the list of txns that this txn should check when mergin
      * its view from the partitions with the data in txn logs.

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java?rev=1299421&r1=1299420&r2=1299421&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java
Sun Mar 11 18:45:04 2012
@@ -44,6 +44,7 @@ import org.apache.directory.server.core.
 import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.SearchOperationContext;
 import org.apache.directory.server.core.api.interceptor.context.UnbindOperationContext;
+import org.apache.directory.server.core.api.txn.TxnConflictException;
 import org.apache.directory.server.core.api.txn.TxnHandle;
 import org.apache.directory.server.core.api.txn.TxnManager;
 import org.apache.directory.server.i18n.I18n;
@@ -310,6 +311,22 @@ public class DefaultOperationManager imp
             throw new LdapOtherException( e.getMessage() );
         }
     }
+    
+    
+    /**
+     * Retries a transaction
+     */
+    private TxnHandle retryTransactionRW( TxnManager txnManager ) throws LdapException
+    {
+        try
+        {
+            return txnManager.retryTransaction();
+        }
+        catch ( Exception e )
+        {
+            throw new LdapOtherException( e.getMessage() );
+        }
+    }
 
 
     /**
@@ -331,15 +348,18 @@ public class DefaultOperationManager imp
     /**
      * Commit a transaction
      */
-    private void commitTransaction( TxnManager txnManager ) throws LdapException
+    private void commitTransaction( TxnManager txnManager ) throws LdapException, TxnConflictException
     {
         try
         {
             txnManager.commitTransaction();
         }
+        catch( TxnConflictException ce )
+        {
+            throw ce;
+        }
         catch ( Exception e )
         {
-            // TODO check for conflict
             throw new LdapOtherException( e.getMessage(), e );
         }
     }
@@ -402,10 +422,14 @@ public class DefaultOperationManager imp
             TxnHandle curTxn = txnManager.getCurTxn();
 
             boolean done = false;
-
+            
             do
             {
-                if ( curTxn == null )
+                if ( startedTxn )
+                {
+                    retryTransactionRW( txnManager );
+                }
+                else if ( curTxn == null )
                 {
                     beginTransactionRW( txnManager );
                     startedTxn = true;
@@ -414,8 +438,6 @@ public class DefaultOperationManager imp
                 try
                 {
                     head.add( addContext );
-
-                    done = true;
                 }
                 catch ( LdapException le )
                 {
@@ -427,9 +449,19 @@ public class DefaultOperationManager imp
                     throw le;
                 }
 
+                
+                done = true;
+                
                 if ( startedTxn )
                 {
-                    commitTransaction( txnManager );
+                    try
+                    {
+                        commitTransaction( txnManager );
+                    }
+                    catch ( TxnConflictException txne )
+                    {
+                       done = false; // retry
+                    }
                 }
 
                 txnManager.applyPendingTxns();
@@ -464,6 +496,9 @@ public class DefaultOperationManager imp
             try
             {
                 head.bind( bindContext );
+                
+                // If here then we are done.
+                done = true;
             }
             catch ( LdapException le )
             {
@@ -474,15 +509,37 @@ public class DefaultOperationManager imp
                  *  conflict exception.
                  */
 
-                commitTransaction( txnManager );
+                boolean conflict = false;
+                
+                try
+                {
+                    commitTransaction( txnManager );
+                }
+                catch ( TxnConflictException txne )
+                {
+                   conflict = true; // retry
+                }
 
-                throw ( le );
+                if ( conflict == false )
+                {
+                    throw ( le );
+                }
+                else
+                {
+                    done = false;
+                }
             }
 
-            // If here then we are done.
-            commitTransaction( txnManager );
 
-            done = true;
+            
+            try
+            {
+                commitTransaction( txnManager );
+            }
+            catch ( TxnConflictException txne )
+            {
+               done = false; // retry
+            }
         }
         while ( !done );
 
@@ -574,7 +631,14 @@ public class DefaultOperationManager imp
             throw le;
         }
 
-        commitTransaction( txnManager );
+        try
+        {
+            commitTransaction( txnManager );
+        }
+        catch ( TxnConflictException txne )
+        {
+          throw new IllegalStateException(" Read only txn shouldn have conflict ");
+        }
 
         LOG.debug( "<< CompareOperation successful" );
 
@@ -655,7 +719,11 @@ public class DefaultOperationManager imp
 
         do
         {
-            if ( curTxn == null )
+            if ( startedTxn )
+            {
+                retryTransactionRW( txnManager );
+            }
+            else if ( curTxn == null )
             {
                 beginTransactionRW( txnManager );
                 startedTxn = true;
@@ -670,9 +738,6 @@ public class DefaultOperationManager imp
                 Interceptor head = directoryService.getInterceptor( deleteContext.getNextInterceptor()
);
 
                 head.delete( deleteContext );
-
-                // If here then we are done.
-                done = true;
             }
             catch ( LdapException le )
             {
@@ -684,9 +749,19 @@ public class DefaultOperationManager imp
                 throw le;
             }
 
+            // If here then we are done.
+            done = true;
+            
             if ( startedTxn )
             {
-                commitTransaction( txnManager );
+                try
+                {
+                    commitTransaction( txnManager );
+                }
+                catch ( TxnConflictException txne )
+                {
+                   done = false; // retry
+                }
             }
             txnManager.applyPendingTxns();
         }
@@ -745,7 +820,14 @@ public class DefaultOperationManager imp
             return false;
         }
 
-        commitTransaction( txnManager );
+        try
+        {
+            commitTransaction( txnManager );
+        }
+        catch ( TxnConflictException txne )
+        {
+          throw new IllegalStateException(" Read only txn shouldn have conflict ");
+        }
 
         LOG.debug( "<< HasEntryOperation successful" );
 
@@ -818,7 +900,14 @@ public class DefaultOperationManager imp
             throw le;
         }
 
-        commitTransaction( txnManager );
+        try
+        {
+            commitTransaction( txnManager );
+        }
+        catch ( TxnConflictException txne )
+        {
+          throw new IllegalStateException(" Read only txn shouldn have conflict ");
+        }
 
         LOG.debug( "<< LookupOperation successful" );
 
@@ -907,7 +996,11 @@ public class DefaultOperationManager imp
 
         do
         {
-            if ( curTxn == null )
+            if ( startedTxn )
+            {
+                retryTransactionRW( txnManager );
+            }
+            else if ( curTxn == null )
             {
                 beginTransactionRW( txnManager );
                 startedTxn = true;
@@ -938,7 +1031,14 @@ public class DefaultOperationManager imp
 
             if ( startedTxn )
             {
-                commitTransaction( txnManager );
+                try
+                {
+                    commitTransaction( txnManager );
+                }
+                catch ( TxnConflictException txne )
+                {
+                   done = false; // retry
+                }
             }
             txnManager.applyPendingTxns();
         }
@@ -1041,7 +1141,11 @@ public class DefaultOperationManager imp
 
         do
         {
-            if ( curTxn == null )
+            if ( startedTxn )
+            {
+                retryTransactionRW( txnManager );
+            }
+            else if ( curTxn == null )
             {
                 beginTransactionRW( txnManager );
                 startedTxn = true;
@@ -1073,7 +1177,14 @@ public class DefaultOperationManager imp
 
             if ( startedTxn )
             {
-                commitTransaction( txnManager );
+                try
+                {
+                    commitTransaction( txnManager );
+                }
+                catch ( TxnConflictException txne )
+                {
+                   done = false; // retry
+                }
             }
             txnManager.applyPendingTxns();
         }
@@ -1178,7 +1289,11 @@ public class DefaultOperationManager imp
 
         do
         {
-            if ( curTxn == null )
+            if ( startedTxn )
+            {
+                retryTransactionRW( txnManager );
+            }
+            else if ( curTxn == null )
             {
                 beginTransactionRW( txnManager );
                 startedTxn = true;
@@ -1209,7 +1324,14 @@ public class DefaultOperationManager imp
 
             if ( startedTxn )
             {
-                commitTransaction( txnManager );
+                try
+                {
+                    commitTransaction( txnManager );
+                }
+                catch ( TxnConflictException txne )
+                {
+                   done = false; // retry
+                }
             }
             txnManager.applyPendingTxns();
         }
@@ -1302,7 +1424,11 @@ public class DefaultOperationManager imp
 
         do
         {
-            if ( curTxn == null )
+            if ( startedTxn )
+            {
+                retryTransactionRW( txnManager );
+            }
+            else if ( curTxn == null )
             {
                 beginTransactionRW( txnManager );
                 startedTxn = true;
@@ -1337,7 +1463,14 @@ public class DefaultOperationManager imp
 
             if ( startedTxn )
             {
-                commitTransaction( txnManager );
+                try
+                {
+                    commitTransaction( txnManager );
+                }
+                catch ( TxnConflictException txne )
+                {
+                   done = false; // retry
+                }
             }
             txnManager.applyPendingTxns();
         }



Mime
View raw message