directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject svn commit: r1355264 - in /directory/apacheds/branches/apacheds-txns: core-api/src/main/java/org/apache/directory/server/core/api/ core-api/src/main/java/org/apache/directory/server/core/api/filtering/ core-api/src/main/java/org/apache/directory/server...
Date Fri, 29 Jun 2012 08:25:29 GMT
Author: saya
Date: Fri Jun 29 08:25:25 2012
New Revision: 1355264

URL: http://svn.apache.org/viewvc?rev=1355264&view=rev
Log:
Leaked cursor management. Entry filtering cursors which are open for more than a minute are redirected to a RandomFileCursor after closing the wrapped cursors.

Added:
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java
Modified:
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java
    directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java
    directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
    directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
    directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java
    directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/DirectoryService.java Fri Jun 29 08:25:25 2012
@@ -39,6 +39,7 @@ import org.apache.directory.server.core.
 import org.apache.directory.server.core.api.schema.SchemaPartition;
 import org.apache.directory.server.core.api.subtree.SubentryCache;
 import org.apache.directory.server.core.api.subtree.SubtreeEvaluator;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
 import org.apache.directory.server.core.api.txn.TxnLogManager;
 import org.apache.directory.server.core.api.txn.TxnManager;
 import org.apache.directory.shared.ldap.codec.api.LdapApiService;
@@ -626,6 +627,7 @@ public interface DirectoryService extend
       
     TxnManager getTxnManager();
     
+    LeakedCursorManager getLeakedCursorManager();
     
     TxnLogManager getTxnLogManager();   
     

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/AbstractEntryFilteringCursor.java Fri Jun 29 08:25:25 2012
@@ -20,6 +20,7 @@ package org.apache.directory.server.core
 
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
 import org.apache.directory.server.core.api.txn.TxnHandle;
@@ -46,10 +47,19 @@ public abstract class AbstractEntryFilte
 
     /** The associated transaction */
     protected TxnHandle transaction;
-
-    /** True if a thread is using the txn */
-    protected AtomicBoolean txnBusy = new AtomicBoolean( false );
-
+    
+    /** 
+     * Entry filtering cursor lock..any access to the cursor is through this lock
+     * The lock is reentrant as same thread may lock it several times without unlocking.
+     */
+    protected ReentrantLock lock = new ReentrantLock();
+    
+    /** flag to detect the closed cursor */
+    protected boolean closed;
+    
+    /** creation timestamp of the cursor */
+    protected long timestamp;
+    
 
     /**
      * An instance for this class
@@ -112,11 +122,16 @@ public abstract class AbstractEntryFilte
             LOG.info( "Cursor has been abandoned." );
         }
     }
+    
 
-
-    protected boolean maybeSetCurTxn() throws Exception
+    /**
+     * {@inheritDoc}
+     */
+    public void pinCursor()
     {
-        if ( transaction != null )
+    	lock.lock();
+    	
+    	if ( transaction != null )
         {
             TxnHandle curTxn = txnManager.getCurTxn();
 
@@ -129,22 +144,47 @@ public abstract class AbstractEntryFilte
             }
             else
             {
-                boolean busy = !txnBusy.compareAndSet( false, true );
-
-                // This can happen if the abondon listener sneaked in and
-                // closed the cursor. return immediately
-                if ( busy )
-                {
-                    throw new OperationAbandonedException();
-                }
-
                 txnManager.setCurTxn( transaction );
-
-                return true;
             }
         }
-
-        return false;
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void unpinCursor()
+    {
+    	boolean checkForTxnUnset = false;
+    	
+    	if ( txnManager != null )
+    	{
+    	    checkForTxnUnset = ( txnManager.getCurTxn() == transaction );
+    	}
+    	lock.unlock();
+    	
+    	if ( checkForTxnUnset && !lock.isHeldByCurrentThread() )
+    	{
+    		txnManager.setCurTxn( null );
+    	}
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void setTimestamp( long timestamp )
+    {
+    	this.timestamp = timestamp;
+    }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public long getTimestamp()
+    {
+    	return timestamp;
     }
 
 
@@ -154,71 +194,28 @@ public abstract class AbstractEntryFilte
         {
             return;
         }
+        
+        if ( transaction.getState() == TxnHandle.State.COMMIT ||
+        		transaction.getState() == TxnHandle.State.ABORT )
+        {
+        	return;
+        }
 
         // If this thread already owns the txn, then end it and return
         TxnHandle curTxn = txnManager.getCurTxn();
 
-        if ( curTxn != null )
+        if ( curTxn != transaction )
         {
-            if ( curTxn != transaction )
-            {
-                throw new IllegalStateException( "Shouldnt Have another txn running if cursor has a txn " );
-            }
-
-            if ( abort == false )
-            {
-                txnManager.commitTransaction();
-            }
-            else
-            {
-                txnManager.abortTransaction();
-            }
+        	throw new IllegalStateException( "Shouldnt Have another txn running if cursor has a txn " );
+        }
 
-            txnBusy.set( false );
-            txnManager.setCurTxn( null );
+        if ( abort == false )
+        {
+        	txnManager.commitTransaction();
         }
         else
         {
-            while ( !( transaction.getState() == TxnHandle.State.COMMIT || transaction.getState() == TxnHandle.State.ABORT ) )
-            {
-                boolean busy = !txnBusy.compareAndSet( false, true );
-
-                // This can happen if the abondon listener sneaked in and
-                // closed the cursor. return immediately
-                if ( busy )
-                {
-                    try
-                    {
-                        Thread.sleep( 100 );
-                    }
-                    catch ( Exception e )
-                    {
-                        //ignore
-                    }
-                    continue;
-                }
-
-                if ( transaction.getState() == TxnHandle.State.COMMIT ||
-                    transaction.getState() == TxnHandle.State.ABORT )
-                {
-                    txnBusy.set( false );
-                    break;
-                }
-
-                txnManager.setCurTxn( transaction );
-
-                if ( abort == false )
-                {
-                    txnManager.commitTransaction();
-                }
-                else
-                {
-                    txnManager.abortTransaction();
-                }
-
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+        	txnManager.abortTransaction();
         }
     }
 }

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/BaseEntryFilteringCursor.java Fri Jun 29 08:25:25 2012
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.directory.server.core.api.entry.ClonedServerEntry;
 import org.apache.directory.server.core.api.entry.ClonedServerEntrySearch;
 import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
 import org.apache.directory.server.core.api.txn.TxnHandle;
 import org.apache.directory.shared.i18n.I18n;
 import org.apache.directory.shared.ldap.model.cursor.ClosureMonitor;
@@ -58,7 +59,7 @@ public class BaseEntryFilteringCursor ex
     private static final Logger LOG = LoggerFactory.getLogger( BaseEntryFilteringCursor.class );
 
     /** the underlying wrapped search results Cursor */
-    private final Cursor<Entry> wrapped;
+    private Cursor<Entry> wrapped;
 
     /** the list of filters to be applied */
     private final List<EntryFilter> filters;
@@ -169,9 +170,9 @@ public class BaseEntryFilteringCursor ex
      * {@inheritDoc}
      */
     public void afterLast() throws Exception
-    {   
-        boolean setCurTxn = maybeSetCurTxn();
-        
+    {
+        pinCursor();
+
         try
         {
             wrapped.afterLast();
@@ -179,11 +180,7 @@ public class BaseEntryFilteringCursor ex
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -193,7 +190,15 @@ public class BaseEntryFilteringCursor ex
      */
     public boolean available()
     {
-        return prefetched != null;
+        boolean result;
+
+        pinCursor();
+
+        result = ( prefetched != null );
+
+        unpinCursor();
+
+        return result;
     }
 
 
@@ -211,8 +216,8 @@ public class BaseEntryFilteringCursor ex
      */
     public void beforeFirst() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             wrapped.beforeFirst();
@@ -220,11 +225,7 @@ public class BaseEntryFilteringCursor ex
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -233,15 +234,20 @@ public class BaseEntryFilteringCursor ex
      * {@inheritDoc}
      */
     public void close() throws Exception
-    {        
+    {
+        pinCursor();
+        closed = true;
+
         try
-        {            
+        {
+
             wrapped.close();
             prefetched = null;
         }
         finally
         {
             endTxnAtClose( false );
+            unpinCursor();
         }
     }
 
@@ -251,14 +257,18 @@ public class BaseEntryFilteringCursor ex
      */
     public void close( Exception reason ) throws Exception
     {
+        pinCursor();
+        closed = true;
+
         try
-        {   
+        {
             wrapped.close( reason );
             prefetched = null;
         }
         finally
         {
             endTxnAtClose( true );
+            unpinCursor();
         }
     }
 
@@ -283,9 +293,9 @@ public class BaseEntryFilteringCursor ex
             close();
             throw new OperationAbandonedException();
         }
-        
-        boolean setCurTxn = maybeSetCurTxn();
-        
+
+        pinCursor();
+
         try
         {
             beforeFirst();
@@ -293,11 +303,7 @@ public class BaseEntryFilteringCursor ex
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -307,11 +313,20 @@ public class BaseEntryFilteringCursor ex
      */
     public Entry get() throws Exception
     {
+        Entry result;
+        
+        pinCursor();
+
         if ( available() )
         {
-            return prefetched;
+            result = prefetched;
+            unpinCursor();
+            
+            return result;
         }
 
+        unpinCursor();
+
         throw new InvalidCursorPositionException();
     }
 
@@ -321,7 +336,7 @@ public class BaseEntryFilteringCursor ex
      */
     public boolean isClosed() throws Exception
     {
-        return wrapped.isClosed();
+        return closed;
     }
 
 
@@ -336,9 +351,9 @@ public class BaseEntryFilteringCursor ex
             close();
             throw new OperationAbandonedException();
         }
-        
-        boolean setCurTxn = maybeSetCurTxn();
-        
+
+        pinCursor();
+
         try
         {
             afterLast();
@@ -346,11 +361,7 @@ public class BaseEntryFilteringCursor ex
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -486,11 +497,11 @@ public class BaseEntryFilteringCursor ex
             close();
             throw new OperationAbandonedException();
         }
-        
-        boolean setCurTxn = maybeSetCurTxn();
-        
+
+        pinCursor();
+
         try
-        { 
+        {
             Entry tempResult = null;
 
             outer: while ( wrapped.next() )
@@ -548,17 +559,13 @@ public class BaseEntryFilteringCursor ex
 
                 return true;
             }
-        
+
             prefetched = null;
             return false;
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -574,18 +581,18 @@ public class BaseEntryFilteringCursor ex
             close();
             throw new OperationAbandonedException();
         }
-        
-        boolean setCurTxn = maybeSetCurTxn();
-        
+
+        pinCursor();
+
         try
         {
             Entry tempResult = null;
-    
+
             outer: while ( wrapped.previous() )
             {
                 boolean accepted = true;
                 tempResult = new ClonedServerEntrySearch( wrapped.get() );
-    
+
                 /*
                  * O P T I M I Z A T I O N
                  * -----------------------
@@ -593,23 +600,23 @@ public class BaseEntryFilteringCursor ex
                  * Don't want to waste cycles on enabling a loop for processing 
                  * filters if we have zero or one filter.
                  */
-    
+
                 if ( filters.isEmpty() )
                 {
                     prefetched = tempResult;
                     filterContents( prefetched );
                     return true;
                 }
-    
+
                 if ( ( filters.size() == 1 ) && filters.get( 0 ).accept( getOperationContext(), tempResult ) )
                 {
                     prefetched = tempResult;
                     filterContents( prefetched );
                     return true;
                 }
-    
+
                 /* E N D   O P T I M I Z A T I O N */
-    
+
                 for ( EntryFilter filter : filters )
                 {
                     // if a filter rejects then short and continue with outer loop
@@ -618,7 +625,7 @@ public class BaseEntryFilteringCursor ex
                         continue outer;
                     }
                 }
-    
+
                 /*
                  * Here the entry has been accepted by all filters.
                  */
@@ -626,19 +633,50 @@ public class BaseEntryFilteringCursor ex
                 filterContents( prefetched );
                 return true;
             }
-    
+
             prefetched = null;
-    
-            return false;     
+
+            return false;
         }
         finally
         {
-            if ( setCurTxn )
+            unpinCursor();
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void doLeakedCursorManagement( LeakedCursorManager leakedCursorManager ) throws Exception
+    {
+        Cursor<Entry> oldCursor;
+        boolean doNext = false;
+
+        pinCursor();
+
+        if ( previous() )
+        {
+            doNext = true;
+        }
+
+        oldCursor = wrapped;
+
+        try
+        {
+            wrapped = leakedCursorManager.createLeakedCursor( this );
+        }
+        finally
+        {
+            if ( doNext )
             {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
+                next();
             }
         }
+
+        oldCursor.close();
+
+        unpinCursor();
     }
 
 
@@ -688,7 +726,7 @@ public class BaseEntryFilteringCursor ex
     {
         throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
             .concat( "." ).concat( "isLast()" ) ) );
-    } 
+    }
 }
 
 

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/CursorList.java Fri Jun 29 08:25:25 2012
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
 import org.apache.directory.shared.i18n.I18n;
 import org.apache.directory.shared.ldap.model.cursor.ClosureMonitor;
 import org.apache.directory.shared.ldap.model.cursor.Cursor;
@@ -47,20 +48,17 @@ import org.slf4j.LoggerFactory;
 public class CursorList extends AbstractEntryFilteringCursor
 {
     /** The inner List */
-    private final List<EntryFilteringCursor> list;
+    private List<EntryFilteringCursor> list;
 
     /** The starting position for the cursor in the list. It can be > 0 */
-    private final int start;
+    private int start;
 
     /** The ending position for the cursor in the list. It can be < List.size() */
-    private final int end;
+    private int end;
 
     /** The current position in the list */
     private int index = -1;
 
-    /** flag to detect the closed cursor */
-    private boolean closed;
-
     private static final Logger LOG = LoggerFactory.getLogger( CursorList.class );
 
 
@@ -128,12 +126,21 @@ public class CursorList extends Abstract
      */
     public boolean available()
     {
-        if ( ( index >= 0 ) && ( index < end ) )
+        pinCursor();
+
+        try
         {
-            return list.get( index ).available();
-        }
+            if ( ( index >= 0 ) && ( index < end ) )
+            {
+                return list.get( index ).available();
+            }
 
-        return false;
+            return false;
+        }
+        finally
+        {
+            unpinCursor();
+        }
     }
 
 
@@ -162,8 +169,8 @@ public class CursorList extends Abstract
      */
     public void beforeFirst() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             this.index = 0;
@@ -171,11 +178,7 @@ public class CursorList extends Abstract
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -185,8 +188,8 @@ public class CursorList extends Abstract
      */
     public void afterLast() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             this.index = end - 1;
@@ -194,14 +197,9 @@ public class CursorList extends Abstract
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
 
-         
     }
 
 
@@ -210,8 +208,8 @@ public class CursorList extends Abstract
      */
     public boolean first() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             if ( list.size() > 0 )
@@ -219,16 +217,12 @@ public class CursorList extends Abstract
                 index = start;
                 return list.get( index ).first();
             }
-    
+
             return false;
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -238,8 +232,8 @@ public class CursorList extends Abstract
      */
     public boolean last() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             if ( list.size() > 0 )
@@ -247,16 +241,12 @@ public class CursorList extends Abstract
                 index = end - 1;
                 return list.get( index ).last();
             }
-    
+
             return false;
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -266,21 +256,17 @@ public class CursorList extends Abstract
      */
     public boolean isFirst() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             return ( list.size() > 0 ) && ( index == start ) && list.get( index ).first();
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
-        
+
     }
 
 
@@ -289,19 +275,15 @@ public class CursorList extends Abstract
      */
     public boolean isLast() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             return ( list.size() > 0 ) && ( index == end - 1 ) && list.get( index ).last();
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -329,8 +311,8 @@ public class CursorList extends Abstract
      */
     public boolean previous() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             // if parked at -1 we cannot go backwards
@@ -338,7 +320,7 @@ public class CursorList extends Abstract
             {
                 return false;
             }
-    
+
             // if the index moved back is still greater than or eq to start then OK
             if ( index - 1 >= start )
             {
@@ -346,7 +328,7 @@ public class CursorList extends Abstract
                 {
                     index--;
                 }
-    
+
                 if ( !list.get( index ).previous() )
                 {
                     index--;
@@ -364,7 +346,7 @@ public class CursorList extends Abstract
                     return true;
                 }
             }
-    
+
             // if the index currently less than or equal to start we need to park it at -1 and return false
             if ( index <= start )
             {
@@ -378,22 +360,18 @@ public class CursorList extends Abstract
                     return true;
                 }
             }
-    
+
             if ( list.size() <= 0 )
             {
                 index = -1;
             }
-    
+
             return false;
-        
+
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -403,8 +381,8 @@ public class CursorList extends Abstract
      */
     public boolean next() throws Exception
     {
-        boolean setCurTxn = maybeSetCurTxn();
-        
+        pinCursor();
+
         try
         {
             // if parked at -1 we advance to the start index and return true
@@ -413,7 +391,7 @@ public class CursorList extends Abstract
                 index = start;
                 return list.get( index ).next();
             }
-    
+
             // if the index plus one is less than the end then increment and return true
             if ( list.size() > 0 && index + 1 < end )
             {
@@ -434,7 +412,7 @@ public class CursorList extends Abstract
                     return true;
                 }
             }
-    
+
             // if the index plus one is equal to the end then increment and return false
             if ( list.size() > 0 && index + 1 == end )
             {
@@ -448,21 +426,17 @@ public class CursorList extends Abstract
                     return true;
                 }
             }
-    
+
             if ( list.size() <= 0 )
             {
                 index = end;
             }
-    
+
             return false;
         }
         finally
         {
-            if ( setCurTxn )
-            {
-                txnBusy.set( false );
-                txnManager.setCurTxn( null );
-            }
+            unpinCursor();
         }
     }
 
@@ -472,14 +446,25 @@ public class CursorList extends Abstract
      */
     public Entry get() throws Exception
     {
-        if ( index < start || index >= end )
+        pinCursor();
+
+        try
         {
-            throw new IOException( I18n.err( I18n.ERR_02009_CURSOR_NOT_POSITIONED ) );
-        }
+            if ( index < start || index >= end )
+            {
+                throw new IOException(
+                    I18n.err( I18n.ERR_02009_CURSOR_NOT_POSITIONED ) );
+            }
+
+            if ( list.get( index ).available() )
+            {
+                return list.get( index ).get();
+            }
 
-        if ( list.get( index ).available() )
+        }
+        finally
         {
-            return list.get( index ).get();
+            unpinCursor();
         }
 
         throw new InvalidCursorPositionException();
@@ -536,34 +521,42 @@ public class CursorList extends Abstract
      */
     public void close( Exception reason ) throws Exception
     {
+        pinCursor();
         closed = true;
 
-        for ( Cursor<?> c : list )
+        try
         {
-            try
+            for ( Cursor<?> c : list )
             {
-                if ( reason != null )
+                try
                 {
-                    c.close( reason );
+                    if ( reason != null )
+                    {
+                        c.close( reason );
+                    }
+                    else
+                    {
+                        c.close();
+                    }
                 }
-                else
+                catch ( Exception e )
                 {
-                    c.close();
+                    LOG.warn( "Failed to close the cursor" );
                 }
             }
-            catch ( Exception e )
+        }
+        finally
+        {
+            if ( reason == null )
             {
-                LOG.warn( "Failed to close the cursor" );
+                this.endTxnAtClose( false );
+            }
+            else
+            {
+                this.endTxnAtClose( true );
             }
-        }
 
-        if ( reason == null )
-        {
-            this.endTxnAtClose( false );
-        }
-        else
-        {
-            this.endTxnAtClose( true );
+            unpinCursor();
         }
     }
 
@@ -580,6 +573,22 @@ public class CursorList extends Abstract
     /**
      * {@inheritDoc}
      */
+    public void doLeakedCursorManagement( LeakedCursorManager leakedCursorManager ) throws Exception
+    {
+        pinCursor();
+
+        for ( EntryFilteringCursor cursor : list )
+        {
+            cursor.doLeakedCursorManagement( leakedCursorManager );
+        }
+
+        unpinCursor();
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
     public Iterator<Entry> iterator()
     {
         throw new UnsupportedOperationException();

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/filtering/EntryFilteringCursor.java Fri Jun 29 08:25:25 2012
@@ -23,6 +23,7 @@ package org.apache.directory.server.core
 import java.util.List;
 
 import org.apache.directory.server.core.api.interceptor.context.SearchingOperationContext;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
 import org.apache.directory.server.core.api.txn.TxnHandle;
 import org.apache.directory.server.core.api.txn.TxnManager;
 import org.apache.directory.shared.ldap.model.cursor.Cursor;
@@ -98,4 +99,36 @@ public interface EntryFilteringCursor ex
      * @return the associated transaction to this cursor
      */
     TxnHandle getTransaction();
+    
+    
+    /**
+     * Ensure exclusive access to the cursor state. A thread might
+     * call this method multiple times without unpinning the cursor,
+     * so it should be reentrant.
+     */
+    void pinCursor();
+    
+    
+    /**
+     * Release pin gotten by pinCursor
+     */
+    void unpinCursor();
+    
+    
+    /**
+     * do the leak cursor management
+     */
+    void doLeakedCursorManagement( LeakedCursorManager leakedCursorManager ) throws Exception;
+    
+    /**
+     * Set creation timestamp
+     * @param timestamp creation timestamp
+     */
+    void setTimestamp( long timestamp );
+    
+    /**
+     * 
+     * @return the creation timestamp
+     */
+    long getTimestamp();
 }
\ No newline at end of file

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/log/LogAnchor.java Fri Jun 29 08:25:25 2012
@@ -142,6 +142,6 @@ public class LogAnchor implements Extern
      */
     public String toString()
     {
-        return "File number: " + logFileNumber + ", offset: " + logFileOffset + ", LSN: " + Long.toHexString( logLSN );
+        return "File number: " + logFileNumber + ", offset: " + logFileOffset + ", LSN: " + logLSN;
     }
 }

Added: directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java?rev=1355264&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java (added)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/main/java/org/apache/directory/server/core/api/txn/LeakedCursorManager.java Fri Jun 29 08:25:25 2012
@@ -0,0 +1,14 @@
+package org.apache.directory.server.core.api.txn;
+
+import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.shared.ldap.model.cursor.Cursor;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+
+
+public interface LeakedCursorManager
+{
+    Cursor<Entry> createLeakedCursor( EntryFilteringCursor cursor ) throws Exception;
+
+
+    void trackCursor( EntryFilteringCursor cursor );
+}

Modified: directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-api/src/test/java/org/apache/directory/server/core/api/MockDirectoryService.java Fri Jun 29 08:25:25 2012
@@ -42,6 +42,7 @@ import org.apache.directory.server.core.
 import org.apache.directory.server.core.api.schema.SchemaPartition;
 import org.apache.directory.server.core.api.subtree.SubentryCache;
 import org.apache.directory.server.core.api.subtree.SubtreeEvaluator;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
 import org.apache.directory.server.core.api.txn.TxnLogManager;
 import org.apache.directory.server.core.api.txn.TxnManager;
 import org.apache.directory.shared.ldap.codec.api.LdapApiService;
@@ -617,6 +618,11 @@ public class MockDirectoryService implem
         return null;
     }
     
+    public LeakedCursorManager getLeakedCursorManager()
+    {
+        return null;
+    }
+    
     
     public TxnLogManager getTxnLogManager()
     {

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java Fri Jun 29 08:25:25 2012
@@ -168,7 +168,6 @@ import org.apache.directory.server.i18n.
             }
             
             initialLsn = logRecord.getLogAnchor().getLogLSN();
-            System.out.println(" Log manager inital lsn " + initialLsn);
 
             long lastGoodLogFileNumber = scanner.getLastGoodFileNumber();
             long lastGoodLogFileOffset = scanner.getLastGoodOffset();

Added: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java?rev=1355264&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java (added)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultLeakedCursorManager.java Fri Jun 29 08:25:25 2012
@@ -0,0 +1,224 @@
+package org.apache.directory.server.core.shared.txn;
+
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
+import org.apache.directory.server.core.shared.txn.DefaultTxnManager.LogSyncer;
+import org.apache.directory.shared.ldap.model.cursor.Cursor;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+
+
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+
+public class DefaultLeakedCursorManager implements LeakedCursorManager
+{
+    /** List of tracked cursors */
+    private ConcurrentLinkedQueue<EntryFilteringCursor> cursors = new ConcurrentLinkedQueue<EntryFilteringCursor>();
+
+    private static int LEAK_CHECK_INTERVAL = 1000;
+
+    private static int LEAK_TIMEOUT =  1000;
+
+    private static final String CURSOR_SUFFIX = "cursor";
+
+    private String cursorFolderPath;
+
+    private AtomicInteger idx = new AtomicInteger( 0 );
+
+    CursorChecker checker;
+
+
+    public DefaultLeakedCursorManager( String folderPath )
+    {
+        cursorFolderPath = folderPath;
+        
+        File folder = new File( cursorFolderPath );
+        folder.mkdirs();
+    }
+
+
+    public void init()
+    {
+        if ( checker == null )
+        {
+            checker = new CursorChecker();
+            checker.setDaemon( true );
+            checker.start();
+        }
+    }
+
+
+    public void shutdown()
+    {
+        checker.interrupt();
+
+        try
+        {
+            checker.join();
+        }
+        catch ( InterruptedException e )
+        {
+            //Ignore
+        }
+        
+        cursors.clear();
+    }
+
+
+    public Cursor<Entry> createLeakedCursor( EntryFilteringCursor cursor ) throws Exception
+    {
+        File cursorFile = makeCursorFileName();
+
+        cursorFile.createNewFile();
+
+        RandomAccessFile raf = new RandomAccessFile( cursorFile, "rw" );
+
+        try
+        {
+            raf.setLength( 0 );
+            raf.getFD().sync();
+        }
+        finally
+        {
+            raf.close();
+        }
+
+        boolean canMoveBeforeFirst = false;
+
+        if ( cursor.previous() )
+        {
+            cursor.next();
+        }
+        else
+        {
+            canMoveBeforeFirst = true;
+        }
+       
+
+        return new RandomFileCursor( cursorFile, cursor, canMoveBeforeFirst,
+                cursor.getOperationContext().getSession().getDirectoryService().getSchemaManager() );
+    }
+
+
+    public void trackCursor( EntryFilteringCursor cursor )
+    {
+        cursors.add( cursor );
+    }
+
+
+    private File makeCursorFileName()
+    {
+        int fileIdx = idx.incrementAndGet();
+
+        return new File( cursorFolderPath + File.separatorChar + fileIdx + "."
+            + CURSOR_SUFFIX );
+    }
+
+
+    private void checkLeakedCursors() throws Exception
+    {
+        Iterator<EntryFilteringCursor> it = cursors.iterator();
+        EntryFilteringCursor cursor;
+        long currentTimestamp = System.currentTimeMillis();
+
+        while ( it.hasNext() )
+        {
+            cursor = it.next();
+
+            if ( cursor.isClosed() )
+            {   
+                it.remove();
+                continue;
+            }
+
+            if ( ( currentTimestamp - cursor.getTimestamp() ) >= LEAK_TIMEOUT )
+            {
+                cursor.pinCursor();
+
+                if ( cursor.isClosed() )
+                {
+                    cursor.unpinCursor();
+                    it.remove();
+                    continue;
+                }
+
+                System.out.println("Doing leaked cursor management2" + cursor);
+                
+                try
+                {
+                    cursor.doLeakedCursorManagement( this );
+                }
+                finally
+                {
+                    cursor.unpinCursor();
+                }
+
+                it.remove();
+                continue;
+
+            }
+            else
+            {
+                // Maybe break here
+            }
+
+        }
+    }
+
+    class CursorChecker extends Thread
+    {
+        @Override
+        public void run()
+        {
+            try
+            {
+                while ( true )
+                {
+                    Thread.sleep( LEAK_CHECK_INTERVAL );
+
+                    try
+                    {
+                        checkLeakedCursors();
+                    }
+                    catch ( Exception e )
+                    {
+                        e.printStackTrace();
+                    }
+                }
+            }
+            catch ( InterruptedException e )
+            {
+                // Bail out
+            }
+            catch ( Exception e )
+            {
+                e.printStackTrace();
+            }
+        }
+    }
+}

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java Fri Jun 29 08:25:25 2012
@@ -171,6 +171,8 @@ class DefaultTxnManager implements TxnMa
         lastFlushedLogAnchor = new LogAnchor();
 
         initialScanPoint = wal.getCheckPoint();
+        //System.out.println("checkpoint " + initialScanPoint);
+        
         lastFlushedLogAnchor.resetLogAnchor( initialScanPoint );
         
         dummyTxn.commitTxn( initialScanPoint.getLogLSN() );
@@ -191,7 +193,7 @@ class DefaultTxnManager implements TxnMa
 
     public void shutdown()
     {
-    	System.out.println("in shutdown");
+    	//System.out.println("in shutdown");
         syncer.interrupt();
 
         try
@@ -212,11 +214,13 @@ class DefaultTxnManager implements TxnMa
         	ReadWriteTxn latestCommitted = latestCommittedTxn.get();
             long latestFlushedLsn = latestFlushedTxnLSN.get();
             
-            System.out.println("latest committed txn " + latestCommitted.getCommitTime() + 
-            		" latest flushed " + latestFlushedLsn);
-            //flushTxns();
+            flushTxns();
+            
+            advanceCheckPoint( lastFlushedLogAnchor );
             
-            //advanceCheckPoint( lastFlushedLogAnchor );
+          //  System.out.println("latest committed txn " + latestCommitted.getCommitTime() + 
+         //       " latest flushed " + latestFlushedLsn + " last flushed log anchor " + 
+          //      lastFlushedLogAnchor );
         }
         catch ( Exception e )
         {
@@ -1004,7 +1008,7 @@ class DefaultTxnManager implements TxnMa
     	UserLogRecord logRecord = new UserLogRecord();
     	byte userRecord[]; 
     	
-    	System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
+    	//System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
     	
     	try
     	{
@@ -1023,7 +1027,7 @@ class DefaultTxnManager implements TxnMa
 	            	
 	            	if ( stateChange.getTxnState() == ChangeState.TXN_COMMIT )
 	            	{
-	            		System.out.println("Adding txn " + stateChange.getTxnID() + " to the tobe recovered txns");
+	            		//System.out.println("Adding txn " + stateChange.getTxnID() + " to the tobe recovered txns");
 	            		txnsToRecover.add( new Long( stateChange.getTxnID() ) );
 	            	}
 	            }
@@ -1043,7 +1047,7 @@ class DefaultTxnManager implements TxnMa
     {
     	Dn partitionSuffix = partition.getSuffixDn();
     	
-    	System.out.println("Recover partition " + partitionSuffix);
+    	//System.out.println("Recover partition " + partitionSuffix);
     	
     	LogScanner logScanner = wal.beginScan( initialScanPoint );
     	UserLogRecord logRecord = new UserLogRecord();
@@ -1065,8 +1069,8 @@ class DefaultTxnManager implements TxnMa
 	            	DataChangeContainer dataChangeContainer = new DataChangeContainer();
 	            	dataChangeContainer.readExternal(in);
 	            	
-	            	System.out.println("Data change container for " + dataChangeContainer.getPartitionDn() + 
-	            			" txn id " + dataChangeContainer.getTxnID() );
+	            	//System.out.println("Data change container for " + dataChangeContainer.getPartitionDn() + 
+	            		//	" txn id " + dataChangeContainer.getTxnID() );
 	            	
 	            	// If this change is for the partition we are tyring to recover 
 	                // and belongs to a txn that committed, then 

Added: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java?rev=1355264&view=auto
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java (added)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/RandomFileCursor.java Fri Jun 29 08:25:25 2012
@@ -0,0 +1,299 @@
+package org.apache.directory.server.core.shared.txn;
+
+
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.RandomAccessFile;
+
+import org.apache.directory.server.core.api.entry.ClonedServerEntry;
+import org.apache.directory.server.core.api.filtering.EntryFilteringCursor;
+import org.apache.directory.shared.i18n.I18n;
+import org.apache.directory.shared.ldap.model.cursor.AbstractCursor;
+import org.apache.directory.shared.ldap.model.cursor.InvalidCursorPositionException;
+import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
+import org.apache.directory.shared.ldap.model.entry.Entry;
+import org.apache.directory.shared.ldap.model.schema.SchemaManager;
+
+
+public class RandomFileCursor extends AbstractCursor<Entry>
+{
+    /* Path to the file containing entries */
+    private File file;
+
+    /* Size of the previous entry(if a next is done). Used to go back on the cursor */
+    private int lastEntrySize;
+
+    /* Current offset into the file */
+    private int currentOffset;
+
+    /* Prefetched entry */
+    private Entry prefetched;
+
+    /* Whether cursor can move to beforefirst */
+    private boolean canMoveBeforeFirst;
+    
+    private SchemaManager schemaManager;
+
+
+    RandomFileCursor( File file, EntryFilteringCursor cursor, boolean canMoveBeforeFirst, 
+            SchemaManager schemaManager ) throws Exception
+    {
+        this.file = file;
+        this.canMoveBeforeFirst = canMoveBeforeFirst;
+        this.schemaManager = schemaManager;
+
+        RandomAccessFile raf = new RandomAccessFile( file, "rw" );
+
+        Entry entry;
+
+        if ( cursor.available() )
+        {
+            entry = cursor.get();
+            prefetched = (( ClonedServerEntry )entry ).getClonedEntry();
+            lastEntrySize = writeEntry( raf, entry );
+            currentOffset = lastEntrySize + 4;
+        }
+
+        try
+        {
+            while ( cursor.next() )
+            {
+                entry = cursor.get();
+                writeEntry( raf, entry );
+
+            }
+        }
+        finally
+        {
+            raf.close();
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean available()
+    {
+        return prefetched != null;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean next() throws Exception
+    {
+        RandomAccessFile raf = new RandomAccessFile( file, "r" );
+        byte[] data;
+        int length;
+
+        ObjectInputStream in = null;
+        ByteArrayInputStream bin = null;
+
+        try
+        {
+            if ( currentOffset >= raf.length() )
+                return false;
+
+            raf.seek( currentOffset );
+            length = raf.readInt();
+
+            data = new byte[length];
+            raf.read( data, 0, length );
+
+            bin = new ByteArrayInputStream( data );
+            in = new ObjectInputStream( bin );
+
+            prefetched = new DefaultEntry();
+            prefetched.readExternal( in );
+
+            lastEntrySize = length;
+            currentOffset += 4 + length;
+        }
+        finally
+        {
+            if ( bin != null )
+            {
+                bin.close();
+            }
+
+            if ( in != null )
+            {
+                in.close();
+            }
+
+            raf.close();
+        }
+
+        return true;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean previous() throws Exception
+    {
+        if ( lastEntrySize == 0 )
+        {
+            return false;
+        }
+
+        if ( currentOffset < ( lastEntrySize + 4 ) )
+        {
+            throw new IllegalStateException( "RandomFileCursor currenOffset: " + currentOffset + " lastEntrySize " +
+                lastEntrySize );
+        }
+
+        currentOffset -= lastEntrySize + 4;
+        next();
+        currentOffset -= lastEntrySize + 4;
+        lastEntrySize = 0;
+        return true;
+
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public Entry get() throws Exception
+    {
+        if ( available() )
+        {   
+            return new ClonedServerEntry( schemaManager, prefetched );
+        }
+
+        throw new InvalidCursorPositionException();
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void after( Entry entry ) throws Exception
+    {
+        throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+            .concat( "." ).concat( "after()" ) ) );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void afterLast() throws Exception
+    {
+        throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+            .concat( "." ).concat( "afterLast()" ) ) );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean last() throws Exception
+    {
+        throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+            .concat( "." ).concat( "last()" ) ) );
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean first() throws Exception
+    {
+        beforeFirst();
+        return next();
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void beforeFirst() throws Exception
+    {
+        if ( !canMoveBeforeFirst )
+        {
+            throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass()
+                .getName()
+                .concat( "." ).concat( "beforeFirst()" ) ) );
+        }
+        else
+        {
+            currentOffset = 0;
+            lastEntrySize = 0;
+        }
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    public void before( Entry entry ) throws Exception
+    {
+        throw new UnsupportedOperationException( I18n.err( I18n.ERR_02014_UNSUPPORTED_OPERATION, getClass().getName()
+            .concat( "." ).concat( "beforeEntry()" ) ) );
+    }
+
+
+    private int writeEntry( RandomAccessFile raf, Entry entry ) throws Exception
+    {
+        entry = ( ( ClonedServerEntry )entry ).getOriginalEntry();    
+        byte[] data;
+
+        ObjectOutputStream out = null;
+        ByteArrayOutputStream bout = null;
+        try
+        {
+            bout = new ByteArrayOutputStream();
+            out = new ObjectOutputStream( bout );
+            entry.writeExternal( out );
+
+            out.flush();
+            data = bout.toByteArray();
+        }
+        finally
+        {
+            if ( bout != null )
+            {
+                bout.close();
+            }
+
+            if ( out != null )
+            {
+                out.close();
+            }
+        }
+
+        raf.writeInt( data.length );
+        raf.write( data, 0, data.length );
+
+        return data.length;
+    }
+}

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java Fri Jun 29 08:25:25 2012
@@ -20,10 +20,12 @@
 package org.apache.directory.server.core.shared.txn;
 
 
+import java.io.File;
 import java.io.IOException;
 
 import org.apache.directory.server.core.api.log.InvalidLogException;
 import org.apache.directory.server.core.api.log.Log;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
 import org.apache.directory.server.core.api.txn.TxnLogManager;
 import org.apache.directory.server.core.api.txn.TxnManager;
 import org.apache.directory.server.core.shared.log.DefaultLog;
@@ -41,6 +43,9 @@ public class TxnManagerFactory
     /** The only txn log manager */
     private TxnLogManagerInternal txnLogManager;
     
+    /** The only leaked cursor manager */
+    private LeakedCursorManager leakedCursorManager;
+    
     /** WAL */
     private Log log;
 
@@ -79,6 +84,8 @@ public class TxnManagerFactory
 
         txnLogManager = new DefaultTxnLogManager( log, this );
         
+        leakedCursorManager = new DefaultLeakedCursorManager( logFolderPath + File.separatorChar + "cursors" );
+        
         this.init();
     }
     
@@ -100,6 +107,8 @@ public class TxnManagerFactory
         }	
     	
     	( ( DefaultTxnManager ) txnManager ).init(txnLogManager);
+    	
+    	( ( DefaultLeakedCursorManager )leakedCursorManager ).init();
     	 
     	inited = true;
     }
@@ -114,6 +123,7 @@ public class TxnManagerFactory
 
         ( ( DefaultTxnManager ) txnManager ).shutdown();
         ( ( DefaultTxnLogManager ) txnLogManager ).shutdown();
+        ( ( DefaultLeakedCursorManager )leakedCursorManager ).shutdown();
         inited = false;
     }
 
@@ -130,6 +140,12 @@ public class TxnManagerFactory
     }
 
 
+    public LeakedCursorManager leakedCursorManagerInstance()
+    {
+        return leakedCursorManager;
+    }
+    
+    
     TxnManagerInternal txnManagerInternalInstance()
     {
         return txnManager;

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java (original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/test/java/org/apache/directory/server/core/shared/log/UserLogRecordTest.java Fri Jun 29 08:25:25 2012
@@ -59,11 +59,11 @@ public class UserLogRecordTest
     {
         ObjectInputStream oIn = null;
         ByteArrayInputStream in = new ByteArrayInputStream( buffer );
-
         try
         {
             oIn = new ObjectInputStream( in );
-
+            oIn.read();
+            
             return oIn;
         }
         catch ( IOException ioe )

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java (original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java Fri Jun 29 08:25:25 2012
@@ -81,6 +81,7 @@ import org.apache.directory.server.core.
 import org.apache.directory.server.core.api.schema.SchemaPartition;
 import org.apache.directory.server.core.api.subtree.SubentryCache;
 import org.apache.directory.server.core.api.subtree.SubtreeEvaluator;
+import org.apache.directory.server.core.api.txn.LeakedCursorManager;
 import org.apache.directory.server.core.api.txn.TxnLogManager;
 import org.apache.directory.server.core.api.txn.TxnManager;
 import org.apache.directory.server.core.authn.AuthenticationInterceptor;
@@ -1179,14 +1180,6 @@ public class DefaultDirectoryService imp
 
             do
             {
-                //TODO TODO
-                // THE followign revert was done in one txn. However, when then number
-                // of changes got close to 1000, it got really slow. For now doing this
-                // in small txns, but identify the cause of this perf problem.
-
-                //txnManager.beginTransaction( false );
-
-                boolean startedTxn = false;
                 List<ChangeLogEvent> events = new LinkedList();
 
                 try
@@ -1215,7 +1208,6 @@ public class DefaultDirectoryService imp
                 }
 
                 Iterator<ChangeLogEvent> it = events.iterator();
-                boolean inTxn = false;
 
                 try
                 {
@@ -2515,6 +2507,15 @@ public class DefaultDirectoryService imp
     {
         return txnManagerFactory.txnLogManagerInstance();
     }
+    
+    
+    /**
+     * {@inheritDoc}
+     */
+    public LeakedCursorManager getLeakedCursorManager()
+    {
+        return txnManagerFactory.leakedCursorManagerInstance();
+    }
 
 
     /**

Modified: directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java (original)
+++ directory/apacheds/branches/apacheds-txns/core/src/main/java/org/apache/directory/server/core/DefaultOperationManager.java Fri Jun 29 08:25:25 2012
@@ -1864,6 +1864,9 @@ public class DefaultOperationManager imp
 
             cursor.setTxnManager( txnManager );
             
+            cursor.setTimestamp( System.currentTimeMillis() );
+            directoryService.getLeakedCursorManager().trackCursor( cursor );
+            
             txnManager.endLogicalDataRead();
             txnManager.setCurTxn( null );
         }

Modified: directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java?rev=1355264&r1=1355263&r2=1355264&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java (original)
+++ directory/apacheds/branches/apacheds-txns/server-integ/src/test/java/org/apache/directory/server/operations/search/PagedSearchIT.java Fri Jun 29 08:25:25 2012
@@ -286,7 +286,7 @@ public class PagedSearchIT extends Abstr
             try
             {
                 list = ctx.search( "dc=users,ou=system", "(cn=*)", controls );
-    
+                
                 while ( list.hasMore() )
                 {
                     SearchResult result = list.next();



Mime
View raw message