directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elecha...@apache.org
Subject svn commit: r1161416 - in /directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm: btree/BPage.java btree/BTree.java helper/TupleBrowser.java recman/BaseRecordManager.java
Date Thu, 25 Aug 2011 06:58:36 GMT
Author: elecharny
Date: Thu Aug 25 06:58:36 2011
New Revision: 1161416

URL: http://svn.apache.org/viewvc?rev=1161416&view=rev
Log:
applied Selcuk's patch

Modified:
    directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BPage.java
    directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BTree.java
    directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/helper/TupleBrowser.java
    directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/recman/BaseRecordManager.java

Modified: directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BPage.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BPage.java?rev=1161416&r1=1161415&r2=1161416&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BPage.java (original)
+++ directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BPage.java Thu Aug 25 06:58:36 2011
@@ -50,6 +50,7 @@ package jdbm.btree;
 import jdbm.helper.Serializer;
 import jdbm.helper.Tuple;
 import jdbm.helper.TupleBrowser;
+import jdbm.helper.ActionContext;
 
 import java.io.IOException;
 import java.io.ByteArrayOutputStream;
@@ -259,10 +260,11 @@ public class BPage<K, V> implements Seri
      *
      * @param height Height of the current BPage (zero is leaf page)
      * @param key The key
+     * @param context action specific context. not null if record manager is action capable
      * @return TupleBrowser positionned just before the given key, or before
      *                      next greater key if key isn't found.
      */
-    TupleBrowser<K, V> find( int height, K key ) throws IOException
+    TupleBrowser<K, V> find( int height, K key, ActionContext context ) throws IOException
     {
         int index = this.findChildren( key );
         
@@ -285,26 +287,27 @@ public class BPage<K, V> implements Seri
             }
         }
 
-        return new Browser( child, index );
+        return new Browser( child, index, context );
     }
 
 
     /**
      * Find first entry and return a browser positioned before it.
      *
+     * @param context action specific context. not null if record manager is action capable
      * @return TupleBrowser positionned just before the first entry.
      */
-    TupleBrowser<K, V> findFirst() throws IOException
+    TupleBrowser<K, V> findFirst(ActionContext context ) throws IOException
     {
         if ( isLeaf )
         {
-            return new Browser( this, first );
+            return new Browser( this, first, context );
         }
         else
         {
             BPage<K, V> child = childBPage( first );
             
-            return child.findFirst();
+            return child.findFirst( context );
         }
     }
 
@@ -337,9 +340,12 @@ public class BPage<K, V> implements Seri
 
         height -= 1;
         
+        BPage<K,V> pageNewCopy = null;
         if ( height == 0 )
         {
+            pageNewCopy = btree.copyOnWrite( this );
             result = new InsertResult<K, V>();
+            result.pageNewCopy = pageNewCopy;
 
             // inserting on a leaf BPage
             overflow = -1;
@@ -365,8 +371,8 @@ public class BPage<K, V> implements Seri
                 
                 if ( replace )
                 {
-                    values[index] = value;
-                    btree.recordManager.update( recordId, this, this );
+                    pageNewCopy.values[index] = value;
+                    btree.recordManager.update( recordId, pageNewCopy, this );
                 }
                 
                 // return the existing key
@@ -378,6 +384,11 @@ public class BPage<K, V> implements Seri
             // non-leaf BPage
             BPage<K, V> child = childBPage( index );
             result = child.insert( height, key, value, replace );
+            if( result.pageNewCopy != null)
+            {
+                child = result.pageNewCopy;
+                result.pageNewCopy = null;
+            }
 
             if ( result.existing != null )
             {
@@ -393,6 +404,9 @@ public class BPage<K, V> implements Seri
 
             // there was an overflow, we need to insert the overflow page
             // on this BPage
+            pageNewCopy = btree.copyOnWrite( this );
+            result.pageNewCopy = pageNewCopy;
+            
             if ( DEBUG )
             {
                 System.out.println( "BPage.insert() Overflow page: " + result.overflow.recordId );
@@ -402,7 +416,7 @@ public class BPage<K, V> implements Seri
             overflow = result.overflow.recordId;
 
             // update child's largest key
-            keys[index] = child.getLargestKey();
+            pageNewCopy.keys[index] = child.getLargestKey();
 
             // clean result so we can reuse it
             result.overflow = null;
@@ -410,24 +424,25 @@ public class BPage<K, V> implements Seri
 
         // if we get here, we need to insert a new entry on the BPage
         // before children[ index ]
-        if ( !isFull() )
-        {
+        if ( !pageNewCopy.isFull() )
+        {            
             if ( height == 0 )
             {
-                insertEntry( this, index - 1, key, value );
+                insertEntry( pageNewCopy, index - 1, key, value );
             }
             else
             {
-                insertChild( this, index - 1, key, overflow );
+                insertChild( pageNewCopy, index - 1, key, overflow );
             }
             
-            btree.recordManager.update( recordId, this, this );
+            btree.recordManager.update( recordId, pageNewCopy, this );
             return result;
         }
 
         // page is full, we must divide the page
         int half = btree.pageSize >> 1;
-        BPage<K, V> newPage = new BPage<K, V>( btree, isLeaf );
+        BPage<K, V> newPage = new BPage<K, V>( btree, pageNewCopy.isLeaf );
+        
         
         if ( index < half )
         {
@@ -441,15 +456,15 @@ public class BPage<K, V> implements Seri
             
             if ( height == 0 )
             {
-                copyEntries( this, 0, newPage, half, index );
+                copyEntries( pageNewCopy, 0, newPage, half, index );
                 setEntry( newPage, half + index, key, value );
-                copyEntries( this, index, newPage, half + index + 1, half - index - 1 );
+                copyEntries( pageNewCopy, index, newPage, half + index + 1, half - index - 1 );
             }
             else
             {
-                copyChildren( this, 0, newPage, half, index );
+                copyChildren( pageNewCopy, 0, newPage, half, index );
                 setChild( newPage, half + index, key, overflow );
-                copyChildren( this, index, newPage, half + index + 1, half - index - 1 );
+                copyChildren( pageNewCopy, index, newPage, half + index + 1, half - index - 1 );
             }
         }
         else
@@ -463,50 +478,51 @@ public class BPage<K, V> implements Seri
             
             if ( height == 0 )
             {
-                copyEntries( this, 0, newPage, half, half );
-                copyEntries( this, half, this, half - 1, index - half );
-                setEntry( this, index - 1, key, value );
+                copyEntries( pageNewCopy, 0, newPage, half, half );
+                copyEntries( pageNewCopy, half, pageNewCopy, half - 1, index - half );
+                setEntry( pageNewCopy, index - 1, key, value );
             }
             else
             {
-                copyChildren( this, 0, newPage, half, half );
-                copyChildren( this, half, this, half - 1, index - half );
-                setChild( this, index - 1, key, overflow );
+                copyChildren( pageNewCopy, 0, newPage, half, half );
+                copyChildren( pageNewCopy, half, pageNewCopy, half - 1, index - half );
+                setChild( pageNewCopy, index - 1, key, overflow );
             }
         }
 
-        first = half - 1;
+        pageNewCopy.first = half - 1;
 
         // nullify lower half of entries
-        for ( int i = 0; i < first; i++ )
+        for ( int i = 0; i < pageNewCopy.first; i++ )
         {
             if ( height == 0 )
             {
-                setEntry( this, i, null, null );
+                setEntry( pageNewCopy, i, null, null );
             }
             else
             {
-                setChild( this, i, null, -1 );
+                setChild( pageNewCopy, i, null, -1 );
             }
         }
 
-        if ( isLeaf )
+        if ( pageNewCopy.isLeaf )
         {
             // link newly created BPage
-            newPage.previous = previous;
-            newPage.next = recordId;
+            newPage.previous = pageNewCopy.previous;
+            newPage.next = pageNewCopy.recordId;
             
-            if ( previous != 0 )
+            if ( pageNewCopy.previous != 0 )
             {
-                BPage<K, V> previousBPage = loadBPage( previous );
+                BPage<K, V> previousBPage = loadBPage( pageNewCopy.previous );
+                previousBPage = btree.copyOnWrite( previousBPage );
                 previousBPage.next = newPage.recordId;
-                btree.recordManager.update( previous, previousBPage, this );
+                btree.recordManager.update( pageNewCopy.previous, previousBPage, this );
             }
             
-            previous = newPage.recordId;
+            pageNewCopy.previous = newPage.recordId;
         }
 
-        btree.recordManager.update( recordId, this, this );
+        btree.recordManager.update( recordId, pageNewCopy, this );
         btree.recordManager.update( newPage.recordId, newPage, this );
 
         result.overflow = newPage;
@@ -521,9 +537,9 @@ public class BPage<K, V> implements Seri
      * @param key Removal key
      * @return Remove result object
      */
-    RemoveResult<V> remove( int height, K key ) throws IOException
+    RemoveResult<K, V> remove( int height, K key ) throws IOException
     {
-        RemoveResult<V> result;
+        RemoveResult<K, V> result;
 
         int half = btree.pageSize / 2;
         int index = findChildren( key );
@@ -536,6 +552,7 @@ public class BPage<K, V> implements Seri
 
         height -= 1;
         
+        BPage<K,V> pageNewCopy = btree.copyOnWrite( this );;
         if ( height == 0 )
         {
             // remove leaf entry
@@ -544,22 +561,32 @@ public class BPage<K, V> implements Seri
                 throw new IllegalArgumentException( I18n.err( I18n.ERR_514, key ) );
             }
             
-            result = new RemoveResult<V>();
-            result.value = values[index];
-            removeEntry( this, index );
+            result = new RemoveResult<K, V>();
+            result.value = pageNewCopy.values[index];
+            removeEntry( pageNewCopy, index );
 
             // update this BPage
-            btree.recordManager.update( recordId, this, this );
+            btree.recordManager.update( recordId, pageNewCopy, this );
         }
         else
         {
             // recurse into Btree to remove entry on a children page
             BPage<K, V> child = childBPage( index );
             result = child.remove( height, key );
+            
+            if ( result.pageNewCopy != null )
+            {
+                child = result.pageNewCopy;
+                result.pageNewCopy = null;
+            }
+            else
+            {
+                child = btree.copyOnWrite( child );
+            }
 
             // update children
-            keys[index] = child.getLargestKey();
-            btree.recordManager.update( recordId, this, this );
+            pageNewCopy.keys[index] = child.getLargestKey();
+            btree.recordManager.update( recordId, pageNewCopy, this );
 
             if ( result.underflow )
             {
@@ -569,10 +596,11 @@ public class BPage<K, V> implements Seri
                     throw new IllegalStateException( I18n.err( I18n.ERR_513, "1" ) );
                 }
                 
-                if ( index < children.length - 1 )
+                if ( index < pageNewCopy.children.length - 1 )
                 {
                     // exists greater brother page
-                    BPage<K, V> brother = childBPage( index + 1 );
+                    BPage<K, V> brother = pageNewCopy.childBPage( index + 1 );
+                    brother = btree.copyOnWrite( brother );
                     int bfirst = brother.first;
                     
                     if ( bfirst < half )
@@ -606,12 +634,12 @@ public class BPage<K, V> implements Seri
                         }
 
                         // update child's largest key
-                        keys[index] = child.getLargestKey();
+                        pageNewCopy.keys[index] = child.getLargestKey();
 
                         // no change in previous/next BPage
 
                         // update BPages
-                        btree.recordManager.update( recordId, this, this );
+                        btree.recordManager.update( recordId, pageNewCopy, this );
                         btree.recordManager.update( brother.recordId, brother, this );
                         btree.recordManager.update( child.recordId, child, this );
 
@@ -638,24 +666,25 @@ public class BPage<K, V> implements Seri
                         btree.recordManager.update( brother.recordId, brother, this );
 
                         // remove "child" from current BPage
-                        if ( isLeaf )
+                        if ( pageNewCopy.isLeaf )
                         {
-                            copyEntries( this, first, this, first + 1, index - first );
-                            setEntry( this, first, null, null );
+                            copyEntries( pageNewCopy, pageNewCopy.first, pageNewCopy, pageNewCopy.first + 1, index - pageNewCopy.first );
+                            setEntry( pageNewCopy, pageNewCopy.first, null, null );
                         }
                         else
                         {
-                            copyChildren( this, first, this, first + 1, index - first );
-                            setChild( this, first, null, -1 );
+                            copyChildren( pageNewCopy, pageNewCopy.first, pageNewCopy, pageNewCopy.first + 1, index - pageNewCopy.first );
+                            setChild( pageNewCopy, pageNewCopy.first, null, -1 );
                         }
                         
-                        first += 1;
-                        btree.recordManager.update( recordId, this, this );
+                        pageNewCopy.first += 1;
+                        btree.recordManager.update( recordId, pageNewCopy, this );
 
                         // re-link previous and next BPages
                         if ( child.previous != 0 )
                         {
                             BPage<K, V> prev = loadBPage( child.previous );
+                            prev = btree.copyOnWrite( prev );
                             prev.next = child.next;
                             btree.recordManager.update( prev.recordId, prev, this );
                         }
@@ -663,6 +692,7 @@ public class BPage<K, V> implements Seri
                         if ( child.next != 0 )
                         {
                             BPage<K, V> next = loadBPage( child.next );
+                            next = btree.copyOnWrite( next );
                             next.previous = child.previous;
                             btree.recordManager.update( next.recordId, next, this );
                         }
@@ -674,7 +704,8 @@ public class BPage<K, V> implements Seri
                 else
                 {
                     // page "brother" is before "child"
-                    BPage<K, V> brother = childBPage( index - 1 );
+                    BPage<K, V> brother = pageNewCopy.childBPage( index - 1 );
+                    brother = btree.copyOnWrite( brother );
                     int bfirst = brother.first;
                     
                     if ( bfirst < half )
@@ -708,12 +739,12 @@ public class BPage<K, V> implements Seri
                         }
 
                         // update brother's largest key
-                        keys[index - 1] = brother.getLargestKey();
+                        pageNewCopy.keys[index - 1] = brother.getLargestKey();
 
                         // no change in previous/next BPage
 
                         // update BPages
-                        btree.recordManager.update( recordId, this, this );
+                        btree.recordManager.update( recordId, pageNewCopy, this );
                         btree.recordManager.update( brother.recordId, brother, this );
                         btree.recordManager.update( child.recordId, child, this );
 
@@ -740,24 +771,25 @@ public class BPage<K, V> implements Seri
                         btree.recordManager.update( child.recordId, child, this );
 
                         // remove "brother" from current BPage
-                        if ( isLeaf )
+                        if ( pageNewCopy.isLeaf )
                         {
-                            copyEntries( this, first, this, first + 1, index - 1 - first );
-                            setEntry( this, first, null, null );
+                            copyEntries( pageNewCopy, pageNewCopy.first, pageNewCopy, pageNewCopy.first + 1, index - 1 - pageNewCopy.first );
+                            setEntry( pageNewCopy, pageNewCopy.first, null, null );
                         }
                         else
                         {
-                            copyChildren( this, first, this, first + 1, index - 1 - first );
-                            setChild( this, first, null, -1 );
+                            copyChildren( pageNewCopy, pageNewCopy.first, pageNewCopy, pageNewCopy.first + 1, index - 1 - pageNewCopy.first );
+                            setChild( pageNewCopy, pageNewCopy.first, null, -1 );
                         }
                         
-                        first += 1;
-                        btree.recordManager.update( recordId, this, this );
+                        pageNewCopy.first += 1;
+                        btree.recordManager.update( recordId, pageNewCopy, this );
 
                         // re-link previous and next BPages
                         if ( brother.previous != 0 )
                         {
                             BPage<K, V> prev = loadBPage( brother.previous );
+                            prev = btree.copyOnWrite( prev );
                             prev.next = brother.next;
                             btree.recordManager.update( prev.recordId, prev, this );
                         }
@@ -765,6 +797,7 @@ public class BPage<K, V> implements Seri
                         if ( brother.next != 0 )
                         {
                             BPage<K, V> next = loadBPage( brother.next );
+                            next = btree.copyOnWrite( next );
                             next.previous = brother.previous;
                             btree.recordManager.update( next.recordId, next, this );
                         }
@@ -777,7 +810,8 @@ public class BPage<K, V> implements Seri
         }
 
         // underflow if page is more than half-empty
-        result.underflow = first > half;
+        result.underflow = pageNewCopy.first > half;
+        result.pageNewCopy = pageNewCopy;
 
         return result;
     }
@@ -1316,13 +1350,18 @@ public class BPage<K, V> implements Seri
          * Existing value for the insertion key.
          */
         V existing;
+        
+        /**
+         * New version of the page doing the insert
+         */
+        BPage<K, V> pageNewCopy;
     }
 
     /** STATIC INNER CLASS
      *  Result from remove() method call. If we had to removed a BPage,
      *  it will be stored into the underflow field.
      */
-    static class RemoveResult<V>
+    static class RemoveResult<K, V>
     {
         /**
          * Set to true if underlying pages underflowed
@@ -1333,6 +1372,12 @@ public class BPage<K, V> implements Seri
          * Removed entry value
          */
         V value;
+        
+        /**
+         * New version of the page doing the remove
+         */
+        BPage<K, V> pageNewCopy;   
+        
     }
 
     /** PRIVATE INNER CLASS
@@ -1342,6 +1387,9 @@ public class BPage<K, V> implements Seri
     {
         /** Current page. */
         private BPage<K, V> page;
+        
+        /** Browsing action's context in case of a action capable record manager */
+        ActionContext context;
 
         /**
          * Current index in the page.  The index positionned on the next
@@ -1354,12 +1402,14 @@ public class BPage<K, V> implements Seri
          * Create a browser.
          *
          * @param page Current page
+         * @param context Action specific context. Not null if part of an action
          * @param index Position of the next tuple to return.
          */
-        Browser( BPage<K, V> page, int index )
+        Browser( BPage<K, V> page, int index, ActionContext context )
         {
             this.page = page;
             this.index = index;
+            this.context = context;
         }
 
 
@@ -1373,6 +1423,7 @@ public class BPage<K, V> implements Seri
          */
         public boolean getNext( Tuple<K, V> tuple ) throws IOException
         {
+            btree.setAsCurrentAction( context );
             // First, check that we are within a page
             if ( index < page.btree.pageSize )
             {
@@ -1402,6 +1453,7 @@ public class BPage<K, V> implements Seri
 
         public boolean getPrevious( Tuple<K, V> tuple ) throws IOException
         {
+            btree.setAsCurrentAction( context );
             if ( index == page.first )
             {
                 if ( page.previous != 0 )
@@ -1422,6 +1474,12 @@ public class BPage<K, V> implements Seri
             
             return true;
         }
+        
+        @Override
+        public void close()
+        {
+            btree.endAction( context );
+        }
     }
     
     

Modified: directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BTree.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BTree.java?rev=1161416&r1=1161415&r2=1161416&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BTree.java (original)
+++ directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/btree/BTree.java Thu Aug 25 06:58:36 2011
@@ -52,13 +52,23 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import jdbm.RecordManager;
+import jdbm.ActionRecordManager;
 import jdbm.helper.Serializer;
 import jdbm.helper.Tuple;
 import jdbm.helper.TupleBrowser;
+import jdbm.helper.WrappedRuntimeException;
+import jdbm.helper.ActionContext;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.directory.server.i18n.I18n;
 
@@ -131,7 +141,13 @@ public class BTree<K, V> implements Exte
 
     /** Serializer used for BPages of this tree */
     private transient BPage<K, V> bpageSerializer;
-
+    
+    /** TRUE if underlying record manager is snapshot capable */
+    private transient boolean isActionCapable;
+    
+    
+    /** Big lock snychronizing all actions */
+    private transient Lock bigLock = new ReentrantLock(); 
 
     /**
      * No-argument constructor used by serialization.
@@ -220,8 +236,27 @@ public class BTree<K, V> implements Exte
         this.bpageSerializer = new BPage<K, V>();
         this.bpageSerializer.btree = this;
         this.nbEntries = new AtomicInteger( 0 );
+        
+        this.isActionCapable = recordManager instanceof ActionRecordManager; 
 
-        this.recordId = recordManager.insert( this );
+        boolean abortedAction = false;
+        ActionContext context = this.beginAction( false );
+        
+        try
+        {
+            this.recordId = recordManager.insert( this );
+        }
+        catch ( IOException e )
+        {
+            abortedAction = true;
+            this.abortAction( context );
+            throw e;
+        }
+        finally
+        {
+            if ( !abortedAction )
+                this.endAction( context );
+        }
     }
     
     
@@ -268,7 +303,7 @@ public class BTree<K, V> implements Exte
      * @param replace Set to true to replace an existing key-value pair.
      * @return Existing value, if any.
      */
-    public synchronized Object insert( K key, V value, boolean replace ) throws IOException
+    public Object insert( K key, V value, boolean replace ) throws IOException
     {
         if ( key == null )
         {
@@ -280,58 +315,88 @@ public class BTree<K, V> implements Exte
             throw new IllegalArgumentException( I18n.err( I18n.ERR_524 ) );
         }
 
-        BPage<K, V> rootPage = getRoot();
+        
+        boolean abortedAction = false;
+        ActionContext context  = this.beginAction( false );
+        
 
-        if ( rootPage == null )
-        {
-            // BTree is currently empty, create a new root BPage
-            if ( DEBUG )
-            {
-                System.out.println( "BTree.insert() new root BPage" );
-            }
-            
-            rootPage = new BPage<K, V>( this, key, value );
-            rootId = rootPage.getRecordId();
-            bTreeHeight = 1;
-            nbEntries.set( 1 );
-            recordManager.update( recordId, this );
-            
-            return null;
-        }
-        else
+        if ( !isActionCapable )
+            bigLock.lock();
+        
+        try
         {
-            BPage.InsertResult<K, V> insert = rootPage.insert( bTreeHeight, key, value, replace );
-            boolean dirty = false;
-            
-            if ( insert.overflow != null )
+            BPage<K, V> rootPage = getRoot( false );
+    
+            if ( rootPage == null )
             {
-                // current root page overflowed, we replace with a new root page
+                // BTree is currently empty, create a new root BPage
                 if ( DEBUG )
                 {
-                    System.out.println( "BTree.insert() replace root BPage due to overflow" );
+                    System.out.println( "BTree.insert() new root BPage" );
                 }
                 
-                rootPage = new BPage<K, V>( this, rootPage, insert.overflow );
+                rootPage = new BPage<K, V>( this, key, value );
                 rootId = rootPage.getRecordId();
-                bTreeHeight += 1;
-                dirty = true;
-            }
-            
-            if ( insert.existing == null )
-            {
-                nbEntries.getAndIncrement();
-                dirty = true;
+                bTreeHeight = 1;
+                nbEntries.set( 1 );
+                recordManager.update( recordId, this );
+                
+                return null;
             }
-            
-            if ( dirty )
+            else
             {
-                recordManager.update( recordId, this );
+                BPage.InsertResult<K, V> insert = rootPage.insert( bTreeHeight, key, value, replace );
+                if ( insert.pageNewCopy != null )
+                    rootPage = insert.pageNewCopy;
+                
+                boolean dirty = false;
+                
+                if ( insert.overflow != null )
+                {
+                    // current root page overflowed, we replace with a new root page
+                    if ( DEBUG )
+                    {
+                        System.out.println( "BTree.insert() replace root BPage due to overflow" );
+                    }
+                    
+                    rootPage = new BPage<K, V>( this, rootPage, insert.overflow );
+                    rootId = rootPage.getRecordId();
+                    bTreeHeight += 1;
+                    dirty = true;
+                }
+                
+                if ( insert.existing == null )
+                {
+                    nbEntries.getAndIncrement();
+                    dirty = true;
+                }
+                
+                if ( dirty )
+                {
+                    recordManager.update( recordId, this );
+                }
+                
+                // insert might have returned an existing value
+                return insert.existing;
             }
+        }
+        catch ( IOException e )
+        {
+            abortedAction = true;
+            this.abortAction( context );
+            throw e;
+        }
+        finally
+        {
+            if ( !abortedAction )
+                this.endAction( context );
             
-            // insert might have returned an existing value
-            return insert.existing;
+            if ( !isActionCapable )
+                bigLock.unlock();
         }
+        
     }
+        
 
 
     /**
@@ -341,52 +406,80 @@ public class BTree<K, V> implements Exte
      * @return Value associated with the key, or null if no entry with given
      *         key existed in the BTree.
      */
-    public synchronized V remove( K key ) throws IOException
+    public V remove( K key ) throws IOException
     {
         if ( key == null )
         {
             throw new IllegalArgumentException( I18n.err( I18n.ERR_523 ) );
         }
-
-        BPage<K, V> rootPage = getRoot();
+       
         
-        if ( rootPage == null )
-        {
-            return null;
-        }
-        
-        boolean dirty = false;
-        BPage.RemoveResult<V> remove = rootPage.remove( bTreeHeight, key );
+        boolean abortedAction = false;
+        ActionContext context = this.beginAction( false );
+
+        if ( !isActionCapable )
+            bigLock.lock();
         
-        if ( remove.underflow && rootPage.isEmpty() )
+        try
         {
-            bTreeHeight -= 1;
-            dirty = true;
 
-            recordManager.delete( rootId );
+            BPage<K, V> rootPage = getRoot( false );
             
-            if ( bTreeHeight == 0 )
+            if ( rootPage == null )
             {
-                rootId = 0;
+                return null;
             }
-            else
+            
+            boolean dirty = false;
+            BPage.RemoveResult<K, V> remove = rootPage.remove( bTreeHeight, key );
+            if ( remove.pageNewCopy != null )
+                rootPage = remove.pageNewCopy;
+            
+            if ( remove.underflow && rootPage.isEmpty() )
+            {
+                bTreeHeight -= 1;
+                dirty = true;
+    
+                recordManager.delete( rootId );
+                
+                if ( bTreeHeight == 0 )
+                {
+                    rootId = 0;
+                }
+                else
+                {
+                    rootId = rootPage.childBPage( pageSize - 1 ).getRecordId();
+                }
+            }
+            
+            if ( remove.value != null )
             {
-                rootId = rootPage.childBPage( pageSize - 1 ).getRecordId();
+                nbEntries.getAndDecrement();
+                dirty = true;
             }
+            
+            if ( dirty )
+            {
+                recordManager.update( recordId, this );
+            }
+            
+            return remove.value;
         }
-        
-        if ( remove.value != null )
+        catch ( IOException e )
         {
-            nbEntries.getAndDecrement();
-            dirty = true;
+            abortedAction = true;
+            this.abortAction( context );
+            throw e;
         }
-        
-        if ( dirty )
+        finally
         {
-            recordManager.update( recordId, this );
+            if ( !abortedAction )
+                this.endAction( context );
+            
+            if ( !isActionCapable )
+                bigLock.unlock();
         }
         
-        return remove.value;
     }
 
 
@@ -396,40 +489,52 @@ public class BTree<K, V> implements Exte
      * @param key Lookup key.
      * @return Value associated with the key, or null if not found.
      */
-    public synchronized V find( K key ) throws IOException
+    public V find( K key ) throws IOException
     {
+        TupleBrowser<K, V> browser = null;
+        Tuple<K, V> tuple = null;
+        
         if ( key == null )
         {
             throw new IllegalArgumentException( I18n.err( I18n.ERR_523 ) );
         }
         
-        BPage<K, V> rootPage = getRoot();
+        if ( !isActionCapable )
+            bigLock.lock();
         
-        if ( rootPage == null )
-        {
-            return null;
-        }
-
-        Tuple<K, V> tuple = new Tuple<K, V>( null, null );
-        TupleBrowser<K, V> browser = rootPage.find( bTreeHeight, key );
-
-        if ( browser.getNext( tuple ) )
-        {
-            // find returns the matching key or the next ordered key, so we must
-            // check if we have an exact match
-            if ( comparator.compare( key, tuple.getKey() ) != 0 )
+        try
+        {      
+            tuple = new Tuple<K, V>( null, null );
+     
+            browser = browse( key );
+   
+            if ( browser.getNext( tuple ) )
             {
-                return null;
+                // find returns the matching key or the next ordered key, so we must
+                // check if we have an exact match
+                if ( comparator.compare( key, tuple.getKey() ) != 0 )
+                {
+                    return null;
+                }
+                else
+                {
+                    return tuple.getValue();
+                }
             }
             else
             {
-                return tuple.getValue();
+                return null;
             }
         }
-        else
+        finally
         {
-            return null;
+            if ( browser != null )
+                browser.close();  
+
+            if ( !isActionCapable )
+                bigLock.unlock();
         }
+        
     }
 
 
@@ -441,10 +546,10 @@ public class BTree<K, V> implements Exte
      * @return Value associated with the key, or a greater entry, or null if no
      *         greater entry was found.
      */
-    public synchronized Tuple<K, V> findGreaterOrEqual( K key ) throws IOException
+    public Tuple<K, V> findGreaterOrEqual( K key ) throws IOException
     {
         Tuple<K, V> tuple;
-        TupleBrowser<K, V> browser;
+        TupleBrowser<K, V> browser = null;
 
         if ( key == null )
         {
@@ -453,16 +558,31 @@ public class BTree<K, V> implements Exte
             return null;
         }
 
-        tuple = new Tuple<K, V>( null, null );
-        browser = browse( key );
+        if ( !isActionCapable )
+            bigLock.lock();
         
-        if ( browser.getNext( tuple ) )
+        tuple = new Tuple<K, V>( null, null );
+        try
         {
-            return tuple;
+            browser = browse( key );
+            
+            if ( browser.getNext( tuple ) )
+            {
+                return tuple;
+            }
+            else
+            {
+                return null;
+            }
         }
-        else
+        finally
         {
-            return null;
+            if ( browser != null )
+                browser.close(); 
+            
+            if ( !isActionCapable )
+                bigLock.unlock();
+
         }
     }
 
@@ -476,16 +596,26 @@ public class BTree<K, V> implements Exte
      *
      * @return Browser positionned at the beginning of the BTree.
      */
-    public synchronized TupleBrowser<K, V> browse() throws IOException
+    public TupleBrowser<K, V> browse() throws IOException
     {
-        BPage<K, V> rootPage = getRoot();
-        
-        if ( rootPage == null )
+        TupleBrowser<K, V> browser = null;
+        ActionContext context = this.beginAction( true );      
+        try
         {
-            return new EmptyBrowser(){};
+            BPage<K, V> rootPage = getRoot( true );
+            
+            if ( rootPage == null )
+            {
+                this.endAction( context );
+                return new EmptyBrowser(){};
+            }
+            browser = rootPage.findFirst( context );         
+        }
+        catch( IOException e )
+        {
+            this.abortAction( context );
+            throw e;
         }
-        
-        TupleBrowser<K, V> browser = rootPage.findFirst();
         
         return browser;
     }
@@ -503,19 +633,32 @@ public class BTree<K, V> implements Exte
      *            (Null is considered to be an "infinite" key)
      * @return Browser positioned just before the given key.
      */
-    public synchronized TupleBrowser<K, V> browse( K key ) throws IOException
+    public TupleBrowser<K, V> browse( K key ) throws IOException
     {
-        BPage<K, V> rootPage = getRoot();
+        TupleBrowser<K, V> browser = null;
+        ActionContext context = this.beginAction( true );  
         
-        if ( rootPage == null )
+        try
         {
-            return new EmptyBrowser(){};
+            BPage<K, V> rootPage = getRoot( true );
+            
+            if ( rootPage == null )
+            {
+                this.endAction( context );
+                return new EmptyBrowser(){};
+            }
+          
+            browser  = rootPage.find( rootPage.btree.bTreeHeight, key, context );
+        }
+        catch( IOException e )
+        {
+            this.abortAction( context );
+            throw e;
         }
-        
-        TupleBrowser<K, V> browser = rootPage.find( bTreeHeight, key );
         
         return browser;
     }
+    
 
 
     /**
@@ -539,16 +682,27 @@ public class BTree<K, V> implements Exte
     /**
      * Return the root BPage<Object, Object>, or null if it doesn't exist.
      */
-    private BPage<K, V> getRoot() throws IOException
+    private BPage<K, V> getRoot( boolean readOnlyAction ) throws IOException
     {
-        if ( rootId == 0 )
+        BTree<K, V> bTreeCopy;
+        
+        if ( readOnlyAction )
+        {
+            bTreeCopy = ( BTree<K, V> )recordManager.fetch( recordId );
+        }
+        else
+        {
+            bTreeCopy = this;
+        }
+        
+        if ( bTreeCopy.rootId == 0 )
         {
             return null;
         }
         
-        BPage<K, V> root = ( BPage<K, V> ) recordManager.fetch( rootId, bpageSerializer );
-        root.setRecordId( rootId );
-        root.btree = this;
+        BPage<K, V> root = ( BPage<K, V> ) recordManager.fetch( bTreeCopy.rootId, bpageSerializer );
+        root.setRecordId( bTreeCopy.rootId );
+        root.btree = bTreeCopy;
         
         return root;
     }
@@ -616,6 +770,115 @@ public class BTree<K, V> implements Exte
     }
     
     
+    void setAsCurrentAction( ActionContext context )
+    {
+        if ( context != null )
+        {
+            assert( isActionCapable == true );
+            ( ( ActionRecordManager )recordManager ).setCurrentActionContext( context );
+        }
+    }
+    
+    void unsetAsCurrentAction( ActionContext context )
+    {
+        if ( context != null )
+        {
+            assert( isActionCapable == true );
+            ( ( ActionRecordManager )recordManager ).setCurrentActionContext( context );
+        }
+    }
+    
+    
+    ActionContext beginAction( boolean readOnly )
+    {
+        ActionContext context = null;
+        if ( isActionCapable )
+        {
+            context = ( ( ActionRecordManager )recordManager ).beginAction( readOnly );
+            this.setAsCurrentAction( context );
+        }
+        return context;
+    }
+    
+    
+    void endAction( ActionContext context )
+    {
+        if ( context != null )
+        {
+            assert( isActionCapable );
+            ( ( ActionRecordManager )recordManager ).endAction( context );
+            this.unsetAsCurrentAction( context );
+        }
+    }
+    
+    void abortAction( ActionContext context )
+    {
+        if ( context != null )
+        {
+            assert( isActionCapable );
+            this.abortAction( context );
+            ( ( ActionRecordManager )recordManager ).abortAction( context );
+            this.unsetAsCurrentAction( context );
+        }
+
+    }
+    
+    
+    BPage<K,V> copyOnWrite( BPage<K,V> page) throws IOException
+    {
+        byte[] array;
+        array = this.bpageSerializer.serialize( page );
+        BPage<K,V> pageCopy = this.bpageSerializer.deserialize( array );
+        pageCopy.recordId = page.recordId;
+        pageCopy.btree = page.btree;
+        return pageCopy;
+    }
+    
+    @SuppressWarnings("unchecked") 
+    private BTree<K,V> copyOnWrite() throws IOException
+    {
+        ObjectOutputStream out = null;
+        ObjectInputStream in = null;
+        ByteArrayOutputStream bout = null;
+        ByteArrayInputStream bin = null;
+        
+        BTree<K,V> tree;
+        
+        try 
+        {
+            bout = new ByteArrayOutputStream();
+            out = new ObjectOutputStream( bout );
+            out.writeObject( this );
+            out.flush();
+            byte[]  arr = bout.toByteArray();
+            bin = new ByteArrayInputStream( arr );
+            in =new ObjectInputStream( bin );
+            tree = ( BTree<K, V> )in.readObject();
+        }
+        catch ( ClassNotFoundException e ) 
+        {
+            throw new WrappedRuntimeException( e );
+        }
+        finally
+        {
+            if ( bout != null )
+                bout.close();
+            
+            if ( out != null )
+                out.close();
+            
+            if ( bin != null )
+                bin.close();
+           
+            if ( in != null )
+                in.close();
+        }
+        
+        return tree;
+    }
+    
+    
+    
     public String toString()
     {
         StringBuilder sb = new StringBuilder();

Modified: directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/helper/TupleBrowser.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/helper/TupleBrowser.java?rev=1161416&r1=1161415&r2=1161416&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/helper/TupleBrowser.java (original)
+++ directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/helper/TupleBrowser.java Thu Aug 25 06:58:36 2011
@@ -73,4 +73,11 @@ public abstract class TupleBrowser<K, V>
      *         no previous tuple.
      */
     public abstract boolean getPrevious( Tuple<K, V> tuple ) throws IOException;
+    
+    /**
+     * Closes the browser and deallocates any resources it might have allocated.
+     * Repeated calls of close are OK.
+     */
+    public void close() {}
+    
 }

Modified: directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/recman/BaseRecordManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/recman/BaseRecordManager.java?rev=1161416&r1=1161415&r2=1161416&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/recman/BaseRecordManager.java (original)
+++ directory/apacheds/branches/apacheds-jdbm/jdbm/src/main/java/jdbm/recman/BaseRecordManager.java Thu Aug 25 06:58:36 2011
@@ -52,6 +52,10 @@ import java.io.IOException;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
 
 import org.apache.directory.server.i18n.I18n;
 
@@ -109,6 +113,102 @@ public final class BaseRecordManager
      * the NAME_DIRECTORY_ROOT.
      */
     private Map<String,Long> nameDirectory;
+    
+    private static enum IOType
+    {
+        READ_IO,
+        WRITE_IO
+    }
+
+    /** TODO add asserts to check internal consistency */
+    private static class LockElement
+    {
+        private int readers;
+        private int waiters;
+        private boolean writer;
+
+        private Lock lock = new ReentrantLock();
+        private Condition cv = lock.newCondition();
+
+
+        public boolean anyReaders()
+        {
+            return readers > 0;
+        }
+
+
+        public boolean anyWaiters()
+        {
+            return waiters > 0;
+        }
+
+
+        public boolean beingWritten()
+        {
+            return writer;
+        }
+
+
+        public boolean anyUser()
+        {
+            return ( readers > 0 || waiters > 0 || writer );
+        }
+
+
+        public void bumpReaders()
+        {
+            readers++;
+        }
+
+
+        public void decrementReaders()
+        {
+            readers--;
+        }
+
+
+        public void bumpWaiters()
+        {
+            waiters++;
+        }
+
+
+        public void decrementWaiters()
+        {
+            waiters--;
+        }
+
+
+        public void setWritten()
+        {
+            writer = true;
+        }
+
+
+        public void unsetWritten()
+        {
+            writer = false;
+        }
+
+
+        public Lock getLock()
+        {
+            return lock;
+        }
+
+
+        public Condition getNoConflictingIOCondition()
+        {
+            return cv;
+        }
+
+    }
+
+    /**
+     * Map used to synchronize reads and writes on the same logical
+     * recid.
+     */
+    private final ConcurrentHashMap<Long, LockElement> lockElements;
 
 
     /**
@@ -123,13 +223,14 @@ public final class BaseRecordManager
         pageMgr = new PageManager( recordFile );
         physMgr = new PhysicalRowIdManager( pageMgr );
         logMgr = new LogicalRowIdManager( pageMgr );
+        lockElements = new ConcurrentHashMap<Long, LockElement>();
     }
 
 
     /**
      * Get the underlying Transaction Manager
      */
-    public synchronized TransactionManager getTransactionManager() throws IOException
+    public TransactionManager getTransactionManager() throws IOException
     {
         checkIfClosed();
         return recordFile.getTxnMgr();
@@ -145,7 +246,7 @@ public final class BaseRecordManager
      *  Only call this method directly after opening the file, otherwise
      *  the results will be undefined.
      */
-    public synchronized void disableTransactions()
+    public void disableTransactions()
     {
         checkIfClosed();
         recordFile.disableTransactions();
@@ -157,7 +258,7 @@ public final class BaseRecordManager
      *
      * @throws IOException when one of the underlying I/O operations fails.
      */
-    public synchronized void close() throws IOException
+    public void close() throws IOException
     {
         checkIfClosed();
 
@@ -190,7 +291,7 @@ public final class BaseRecordManager
      * @return the rowid for the new record.
      * @throws IOException when one of the underlying I/O operations fails.
      */
-    public synchronized long insert( Object obj, Serializer serializer ) throws IOException
+    public long insert( Object obj, Serializer serializer ) throws IOException
     {
         byte[]    data;
         long      recid;
@@ -217,8 +318,9 @@ public final class BaseRecordManager
      * @param recid the rowid for the record that should be deleted.
      * @throws IOException when one of the underlying I/O operations fails.
      */
-    public synchronized void delete( long recid ) throws IOException
+    public void delete( long recid ) throws IOException
     {
+    	LockElement element;
         checkIfClosed();
         
         if ( recid <= 0 ) 
@@ -231,10 +333,20 @@ public final class BaseRecordManager
             System.out.println( "BaseRecordManager.delete() recid " + recid ) ;
         }
 
-        Location logRowId = new Location( recid );
-        Location physRowId = logMgr.fetch( logRowId );
-        physMgr.delete( physRowId );
-        logMgr.delete( logRowId );
+        
+        element = this.beginIO(recid, IOType.WRITE_IO);
+        
+        try
+        {
+        	Location logRowId = new Location( recid );
+        	Location physRowId = logMgr.fetch( logRowId );
+        	physMgr.delete( physRowId );
+        	logMgr.delete( logRowId );
+        }
+        finally
+        {
+        	this.endIO(recid, element, IOType.WRITE_IO);
+        }
     }
 
 
@@ -250,7 +362,6 @@ public final class BaseRecordManager
         update( recid, obj, DefaultSerializer.INSTANCE );
     }
 
-    
     /**
      * Updates a record using a custom serializer.
      *
@@ -259,33 +370,45 @@ public final class BaseRecordManager
      * @param serializer a custom serializer
      * @throws IOException when one of the underlying I/O operations fails.
      */
-    public synchronized void update( long recid, Object obj, Serializer serializer ) throws IOException
+    public void update( long recid, Object obj, Serializer serializer ) throws IOException
     {
-        checkIfClosed();
+    	LockElement element;
+    	
+    	checkIfClosed();
 
         if ( recid <= 0 ) 
         {
             throw new IllegalArgumentException( I18n.err( I18n.ERR_536, recid ) );
         }
 
-        Location logRecid = new Location( recid );
-        Location physRecid = logMgr.fetch( logRecid );
-
-        byte[] data = serializer.serialize( obj );
-        
-        if ( DEBUG ) 
-        {
-            System.out.println( "BaseRecordManager.update() recid " + recid + " length " + data.length ) ;
-        }
-        
-        Location newRecid = physMgr.update( physRecid, data, 0, data.length );
-        
-        if ( ! newRecid.equals( physRecid ) ) 
-        {
-            logMgr.update( logRecid, newRecid );
-        }
+        element = this.beginIO(recid, IOType.WRITE_IO);
+     	
+        try
+     	{
+        	Location logRecid = new Location( recid );
+            Location physRecid = logMgr.fetch( logRecid );
+
+            byte[] data = serializer.serialize( obj );
+            
+            if ( DEBUG ) 
+            {
+                System.out.println( "BaseRecordManager.update() recid " + recid + " length " + data.length ) ;
+            }
+            
+            Location newRecid = physMgr.update( physRecid, data, 0, data.length );
+            
+            if ( ! newRecid.equals( physRecid ) ) 
+            {
+                logMgr.update( logRecid, newRecid );
+            }
+
+     	}
+     	finally
+     	{
+     	    this.endIO(recid, element, IOType.WRITE_IO);
+     	} 
     }
-
+    
 
     /**
      * Fetches a record using standard java object serialization.
@@ -299,7 +422,7 @@ public final class BaseRecordManager
         return fetch( recid, DefaultSerializer.INSTANCE );
     }
 
-
+    
     /**
      * Fetches a record using a custom serializer.
      *
@@ -308,26 +431,41 @@ public final class BaseRecordManager
      * @return the object contained in the record.
      * @throws IOException when one of the underlying I/O operations fails.
      */
-    public synchronized Object fetch( long recid, Serializer serializer )
-        throws IOException
+    public Object fetch( long recid, Serializer serializer ) throws IOException
     {
-        byte[] data;
-
-        checkIfClosed();
-       
+    	Object result;
+    	LockElement element;
+    	
+    	checkIfClosed();
+        
         if ( recid <= 0 ) 
         {
             throw new IllegalArgumentException( I18n.err( I18n.ERR_536, recid ) );
         }
-        
-        data = physMgr.fetch( logMgr.fetch( new Location( recid ) ) );
-        
-        if ( DEBUG ) 
-        {
-            System.out.println( "BaseRecordManager.fetch() recid " + recid + " length " + data.length ) ;
-        }
-        return serializer.deserialize( data );
+    	
+    	element = this.beginIO(recid, IOType.READ_IO);
+    	
+    	try
+    	{
+    		byte[] data; 
+            
+            data = physMgr.fetch( logMgr.fetch( new Location( recid ) ) );
+            
+            if ( DEBUG ) 
+            {
+                System.out.println( "BaseRecordManager.fetch() recid " + recid + " length " + data.length ) ;
+            }
+            result = serializer.deserialize( data );
+    	}
+    	finally
+    	{
+    		this.endIO(recid, element, IOType.READ_IO);
+    	}
+    	
+    	return result;
+    	
     }
+    
 
 
     /**
@@ -347,7 +485,7 @@ public final class BaseRecordManager
      *
      *  @see #getRootCount
      */
-    public synchronized long getRoot( int id ) throws IOException
+    public long getRoot( int id ) throws IOException
     {
         checkIfClosed();
 
@@ -360,7 +498,7 @@ public final class BaseRecordManager
      *
      *  @see #getRootCount
      */
-    public synchronized void setRoot( int id, long rowid ) throws IOException
+    public void setRoot( int id, long rowid ) throws IOException
     {
         checkIfClosed();
 
@@ -411,7 +549,7 @@ public final class BaseRecordManager
     /**
      * Commit (make persistent) all changes since beginning of transaction.
      */
-    public synchronized void commit()
+    public void commit()
         throws IOException
     {
         checkIfClosed();
@@ -423,7 +561,7 @@ public final class BaseRecordManager
     /**
      * Rollback (cancel) all changes since beginning of transaction.
      */
-    public synchronized void rollback() throws IOException
+    public void rollback() throws IOException
     {
         checkIfClosed();
 
@@ -478,4 +616,125 @@ public final class BaseRecordManager
             throw new IllegalStateException( I18n.err( I18n.ERR_538 ) );
         }
     }
+    
+
+    /**
+     * Used to serialize reads/write on a given logical rowid. Checks if there is a 
+     * ongoing conflicting IO to the same logical rowid and waits for the ongoing
+     * write if there is one. 
+     *
+     * @param recid the logical rowid for which the fetch will be done.
+     * @param io type of the IO
+     * @return lock element representing the logical lock gotten
+     */
+    private LockElement beginIO( Long recid, IOType io )
+    {
+        boolean lockVerified = false;
+        LockElement element = null;
+
+        // loop until we successfully verify that there is no concurrent writer
+/*
+        element = lockElements.get( recid );
+        do
+        {
+            if ( element == null )
+            {
+                element = new LockElement();
+
+                if ( io == IOType.READ_IO )
+                    element.bumpReaders();
+                else
+                    element.setWritten();
+
+                LockElement existingElement = lockElements.putIfAbsent( recid, element );
+
+                if ( existingElement == null )
+                    lockVerified = true;
+                else
+                    element = existingElement;
+            }
+            else
+            {
+                Lock lock = element.getLock();
+                lock.lock();
+                if ( element.anyUser() )
+                {
+                    if ( this.conflictingIOPredicate( io, element ) )
+                    {
+                        element.bumpWaiters();
+                        do
+                        {
+                            element.getNoConflictingIOCondition()
+                                .awaitUninterruptibly();
+                        }
+                        while ( this.conflictingIOPredicate( io, element ) );
+
+                        element.decrementWaiters();
+                    }
+
+                    // no conflicting IO anymore..done
+                    if ( io == IOType.READ_IO )
+                        element.bumpReaders();
+                    else
+                        element.setWritten();
+                    lockVerified = true;
+                }
+                else
+                {
+                    if ( io == IOType.READ_IO )
+                        element.bumpReaders();
+                    else
+                        element.setWritten();
+
+                    LockElement existingElement = lockElements.get( recid );
+
+                    if ( element != existingElement )
+                        element = existingElement;
+                    else
+                        lockVerified = true; // done
+                }
+                lock.unlock();
+            }
+        }
+        while ( !lockVerified );
+*/
+        return element;
+    }
+
+
+    /**
+     * Ends the IO by releasing the logical lock on the given recid
+     * 
+     * @param recid logical recid for which the IO is being ended
+     * @param element logical lock to be released
+     * @param io type of the io
+     */
+    private void endIO( Long recid, LockElement element, IOType io )
+    {
+  /*      Lock lock = element.getLock();
+        lock.lock();
+
+        if ( io == IOType.READ_IO )
+            element.decrementReaders();
+        else
+            element.unsetWritten();
+
+        if ( element.anyWaiters() )
+            element.getNoConflictingIOCondition().notifyAll();
+
+        if ( !element.anyUser() )
+            lockElements.remove( recid );
+
+        lock.unlock();*/
+    }
+
+
+    private boolean conflictingIOPredicate( IOType io, LockElement element )
+    {
+        if ( io == IOType.READ_IO )
+            return element.beingWritten();
+        else
+            return ( element.anyReaders() || element.beingWritten() );
+    }
+
 }



Mime
View raw message