directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kayyag...@apache.org
Subject svn commit: r1513055 - /directory/mavibot/trunk/mavibot/src/main/java/org/apache/directory/mavibot/btree/BTree.java
Date Mon, 12 Aug 2013 07:05:43 GMT
Author: kayyagari
Date: Mon Aug 12 07:05:42 2013
New Revision: 1513055

URL: http://svn.apache.org/r1513055
Log:
o write to journal after completing the operation (MAVIBOT-5)
o truncate the journal after applying and after flushing the data to disk

Modified:
    directory/mavibot/trunk/mavibot/src/main/java/org/apache/directory/mavibot/btree/BTree.java

Modified: directory/mavibot/trunk/mavibot/src/main/java/org/apache/directory/mavibot/btree/BTree.java
URL: http://svn.apache.org/viewvc/directory/mavibot/trunk/mavibot/src/main/java/org/apache/directory/mavibot/btree/BTree.java?rev=1513055&r1=1513054&r2=1513055&view=diff
==============================================================================
--- directory/mavibot/trunk/mavibot/src/main/java/org/apache/directory/mavibot/btree/BTree.java
(original)
+++ directory/mavibot/trunk/mavibot/src/main/java/org/apache/directory/mavibot/btree/BTree.java
Mon Aug 12 07:05:42 2013
@@ -126,11 +126,9 @@ public class BTree<K, V>
     /** The read transaction timeout */
     private long readTimeOut = DEFAULT_READ_TIMEOUT;
 
-    /** The queue containing all the modifications applied on the bTree */
-    private BlockingQueue<Modification<K, V>> modificationsQueue;
-
     private File envDir;
 
+    private FileChannel journalChannel = null;
 
     /**
      * Create a thread that is responsible of cleaning the transactions when
@@ -205,109 +203,6 @@ public class BTree<K, V>
 
 
     /**
-     * Create a thread that is responsible of writing the modifications in a journal.
-     * The journal will contain all the modifications in the order they have been applied
-     * to the BTree. We will store Insertions and Deletions. Those operations are injected
-     * into a queue, which is read by the thread.
-     */
-    private void createJournalManager()
-    {
-        Runnable journalTask = new Runnable()
-        {
-            private boolean flushModification( FileChannel channel, Modification<K, V>
modification )
-                throws IOException
-            {
-                if ( modification instanceof Addition )
-                {
-                    byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
-                    ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
-                    bb.put( Modification.ADDITION );
-                    bb.put( keyBuffer );
-                    bb.flip();
-
-                    channel.write( bb );
-
-                    byte[] valueBuffer = valueSerializer.serialize( modification.getValue()
);
-                    bb = ByteBuffer.allocateDirect( valueBuffer.length );
-                    bb.put( valueBuffer );
-                    bb.flip();
-
-                    channel.write( bb );
-                }
-                else if ( modification instanceof Deletion )
-                {
-                    byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
-                    ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
-                    bb.put( Modification.DELETION );
-                    bb.put( keyBuffer );
-                    bb.flip();
-
-                    channel.write( bb );
-                }
-                else
-                // This is the poison pill, just exit
-                {
-                    return false;
-                }
-
-                // Flush to the disk for real
-                channel.force( true );
-
-                return true;
-            }
-
-
-            public void run()
-            {
-                Modification<K, V> modification = null;
-                FileOutputStream stream;
-                FileChannel channel = null;
-
-                try
-                {
-                    stream = new FileOutputStream( journal );
-                    channel = stream.getChannel();
-
-                    while ( !Thread.currentThread().isInterrupted() )
-                    {
-                        modification = modificationsQueue.take();
-
-                        boolean stop = flushModification( channel, modification );
-
-                        if ( stop )
-                        {
-                            break;
-                        }
-                    }
-                }
-                catch ( InterruptedException ie )
-                {
-                    //System.out.println( "Interrupted" );
-                    while ( ( modification = modificationsQueue.peek() ) != null );
-
-                    try
-                    {
-                        flushModification( channel, modification );
-                    }
-                    catch ( IOException ioe )
-                    {
-                        // There is little we can do here...
-                    }
-                }
-                catch ( Exception e )
-                {
-                    throw new RuntimeException( e );
-                }
-            }
-        };
-
-        journalManagerThread = new Thread( journalTask );
-        journalManagerThread.setDaemon( true );
-        journalManagerThread.start();
-    }
-
-
-    /**
      * Creates a new BTree, with no initialization. 
      */
     public BTree()
@@ -521,8 +416,6 @@ public class BTree<K, V>
         // Create the queue containing the modifications, if it's not a in-memory btree
         if ( type == BTreeTypeEnum.PERSISTENT )
         {
-            modificationsQueue = new LinkedBlockingDeque<Modification<K, V>>();
-
             if ( file.length() > 0 )
             {
                 // We have some existing file, load it 
@@ -531,15 +424,15 @@ public class BTree<K, V>
 
             withJournal = true;
 
+            FileOutputStream stream = new FileOutputStream( journal );
+            journalChannel = stream.getChannel();
+
             // If the journal is not empty, we have to read it
             // and to apply all the modifications to the current file
             if ( journal.length() > 0 )
             {
                 applyJournal();
             }
-
-            // Initialize the Journal manager thread if it's not a in-memory btree
-            createJournalManager();
         }
         else if ( type == null )
         {
@@ -563,15 +456,12 @@ public class BTree<K, V>
 
         if ( type == BTreeTypeEnum.PERSISTENT )
         {
-            // Stop the journal manager thread, by injecting a poison pill into
-            // the queue this thread is using, so that all the epnding data
-            // will be written before it shuts down
-            modificationsQueue.add( new PoisonPill<K, V>() );
-
             // Flush the data
             flush();
+            journalChannel.close();
         }
 
+        
         rootPage = null;
     }
 
@@ -895,10 +785,10 @@ public class BTree<K, V>
                 tuple = removeResult.getRemovedElement();
             }
 
-            if ( type == BTreeTypeEnum.PERSISTENT )
+            if ( withJournal )
             {
                 // Inject the modification into the modification queue
-                modificationsQueue.add( new Deletion<K, V>( key ) );
+                writeToJournal( new Deletion<K, V>( key ) );
             }
 
             // Decrease the number of elements in the current tree if the deletion is successful
@@ -1262,9 +1152,9 @@ public class BTree<K, V>
         }
 
         // Inject the modification into the modification queue
-        if ( type == BTreeTypeEnum.PERSISTENT )
+        if ( withJournal )
         {
-            modificationsQueue.add( new Addition<K, V>( key, value ) );
+            writeToJournal( new Addition<K, V>( key, value ) );
         }
 
         // Increase the number of element in the current tree if the insertion is successful
@@ -1533,9 +1423,9 @@ public class BTree<K, V>
         }
         catch ( EOFException eofe )
         {
-            // Done reading the journal. Delete it and recreate a new one
-            journal.delete();
-            journal.createNewFile();
+            eofe.printStackTrace();
+            // Done reading the journal. truncate it
+            journalChannel.truncate( 0 );
         }
     }
 
@@ -1631,12 +1521,7 @@ public class BTree<K, V>
         {
             // Then flush the file
             flush( file );
-
-            // And empty the journal
-            FileOutputStream stream = new FileOutputStream( journal );
-            FileChannel channel = stream.getChannel();
-            channel.position( 0 );
-            channel.force( true );
+            journalChannel.truncate( 0 );
         }
     }
 
@@ -1865,7 +1750,43 @@ public class BTree<K, V>
         btreeHeader.setAllowDuplicates( allowDuplicates );
     }
 
+    
+    private void writeToJournal( Modification<K, V> modification )
+        throws IOException
+    {
+        if ( modification instanceof Addition )
+        {
+            byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
+            ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
+            bb.put( Modification.ADDITION );
+            bb.put( keyBuffer );
+            bb.flip();
+
+            journalChannel.write( bb );
+
+            byte[] valueBuffer = valueSerializer.serialize( modification.getValue() );
+            bb = ByteBuffer.allocateDirect( valueBuffer.length );
+            bb.put( valueBuffer );
+            bb.flip();
+
+            journalChannel.write( bb );
+        }
+        else if ( modification instanceof Deletion )
+        {
+            byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
+            ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
+            bb.put( Modification.DELETION );
+            bb.put( keyBuffer );
+            bb.flip();
+
+            journalChannel.write( bb );
+        }
+
+        // Flush to the disk for real
+        journalChannel.force( true );
+    }
 
+    
     /**
      * @see Object#toString()
      */



Mime
View raw message