directory-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gokt...@apache.org
Subject svn commit: r1373629 - /directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
Date Wed, 15 Aug 2012 20:59:28 GMT
Author: gokturk
Date: Wed Aug 15 20:59:27 2012
New Revision: 1373629

URL: http://svn.apache.org/viewvc?rev=1373629&view=rev
Log:
* Added shutdown parameter to flushMethod to set the checkpoint to latest committed LSN regardless
of the flush count when server is being shutdown.

Modified:
    directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java

Modified: directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
URL: http://svn.apache.org/viewvc/directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java?rev=1373629&r1=1373628&r2=1373629&view=diff
==============================================================================
--- directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
(original)
+++ directory/apacheds/branches/apacheds-txns/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
Wed Aug 15 20:59:27 2012
@@ -64,7 +64,7 @@ class DefaultTxnManager implements TxnMa
 {
     /** wal log manager */
     private TxnLogManager txnLogManager;
-    
+
     /** Write ahead log */
     private Log wal;
 
@@ -100,10 +100,10 @@ class DefaultTxnManager implements TxnMa
 
     /** Number of flushed txns */
     private int numFlushedTxns;
-    
+
     /** Number of flushed */
     private int numFlushes;
-    
+
     /** Take a checkpoint every 1000 flushes ~100 secs */
     private final static int DEFAULT_FLUSH_ROUNDS = 1000;
 
@@ -126,16 +126,16 @@ class DefaultTxnManager implements TxnMa
 
     /** Logical data version number */
     private long logicalDataVersion;
-    
+
     /** Initial scan point into the logs */
     private LogAnchor initialScanPoint;
-    
+
     /** Initial set of committed txns */
-    private HashSet<Long> txnsToRecover = new HashSet<Long>(); 
-    
+    private HashSet<Long> txnsToRecover = new HashSet<Long>();
+
     /** last flushed log anchor */
     private LogAnchor lastFlushedLogAnchor;
-    
+
     /** Whether to avoid flushing */
     private boolean doNotFlush = false;
 
@@ -165,8 +165,7 @@ class DefaultTxnManager implements TxnMa
         this.txnLogManager = txnLogManager;
         wal = txnLogManager.getWAL();
         flushInterval = DEFAULT_FLUSH_INTERVAL;
-        
-        
+
         committedQueue.clear();
         latestFlushedTxnLSN.set( LogAnchor.UNKNOWN_LSN );
         txnsToRecover.clear();
@@ -175,57 +174,50 @@ class DefaultTxnManager implements TxnMa
 
         initialScanPoint = wal.getCheckPoint();
         //System.out.println("checkpoint " + initialScanPoint);
-        
+
         lastFlushedLogAnchor.resetLogAnchor( initialScanPoint );
-        
+
         dummyTxn.commitTxn( initialScanPoint.getLogLSN() );
         latestCommittedTxn.set( dummyTxn );
         latestVerifiedTxn.set( dummyTxn );
         committedQueue.offer( dummyTxn );
-        
+
         getTxnsToReover();
 
         if ( syncer == null )
         {
-        	syncer = new LogSyncer();
-        	syncer.setDaemon( true );
-        	syncer.start();
+            syncer = new LogSyncer();
+            syncer.setDaemon( true );
+            syncer.start();
         }
     }
 
 
     public void shutdown()
     {
-    	//System.out.println("in shutdown");
+        //System.out.println("in shutdown");
         syncer.interrupt();
 
         try
         {
-        	syncer.join();
+            syncer.join();
         }
         catch ( InterruptedException e )
         {
-        	//Ignore
+            //Ignore
         }
-    	
 
         // Do a best effort last flush
         flushLock.lock();
 
         try
         {
-        	ReadWriteTxn latestCommitted = latestCommittedTxn.get();
-            
             if ( !doNotFlush )
             {
-                flushTxns();
+                flushTxns( true );
             }
-            
+
             advanceCheckPoint( lastFlushedLogAnchor );
-            
-          //  System.out.println("latest committed txn " + latestCommitted.getCommitTime()
+ 
-         //       " last flushed log anchor " + 
-          //      lastFlushedLogAnchor );
         }
         catch ( Exception e )
         {
@@ -235,7 +227,7 @@ class DefaultTxnManager implements TxnMa
         {
             flushLock.unlock();
         }
-        
+
         syncer = null;
     }
 
@@ -452,7 +444,7 @@ class DefaultTxnManager implements TxnMa
 
         try
         {
-            flushTxns();
+            flushTxns( false );
         }
         catch ( Exception e )
         {
@@ -474,7 +466,7 @@ class DefaultTxnManager implements TxnMa
     {
         Transaction curTxn = getCurTxn();
 
-        if (curTxn == null)
+        if ( curTxn == null )
         {
             return;
         }
@@ -491,10 +483,10 @@ class DefaultTxnManager implements TxnMa
         if ( !curTxn.isOptimisticLockHeld() )
         {
             throw new IllegalStateException( "Unexpected txn state when starting logical
data change txn: " +
-                " txn is not holding optimistic lock:"+
+                " txn is not holding optimistic lock:" +
                 curTxn );
         }
-        
+
         // If lock is already held exclusively by the txn, then return
         if ( optimisticLock.isWriteLockedByCurrentThread() )
         {
@@ -510,16 +502,16 @@ class DefaultTxnManager implements TxnMa
         // If somebody raced and changed logical data, then bail out
         if ( getLogicalDataVersion() != txnLogicalDataVersion )
         {
-            
+
             TxnConflictException e = new TxnConflictException();
-            throw new LdapException(e);
+            throw new LdapException( e );
         }
 
         // Finally bump of logical data version number
         bumpLogicalDataVersion();
     }
-    
-    
+
+
     /**
      * {@inheritDoc}
      */
@@ -528,21 +520,22 @@ class DefaultTxnManager implements TxnMa
         // Should only be called for read only txns
         Transaction curTxn = getCurTxn();
 
-        if (curTxn == null || !( curTxn instanceof ReadOnlyTxn ))
+        if ( curTxn == null || !( curTxn instanceof ReadOnlyTxn ) )
         {
             throw new IllegalStateException( "Unexpected txn state when ending logical data
read:" + curTxn );
         }
-        
+
         if ( !curTxn.isOptimisticLockHeld() )
         {
-            throw new IllegalStateException( "Unexpected txn state when ending logical data
read, optimistic lock not held:" + 
-                curTxn );
+            throw new IllegalStateException(
+                "Unexpected txn state when ending logical data read, optimistic lock not
held:" +
+                    curTxn );
         }
-        
+
         releaseOptimisticLock();
     }
-    
-    
+
+
     /**
      * {@inheritDoc}
      */
@@ -550,11 +543,11 @@ class DefaultTxnManager implements TxnMa
     {
         Transaction curTxn = getCurTxn();
 
-        if (curTxn == null || !( curTxn instanceof ReadWriteTxn ))
+        if ( curTxn == null || !( curTxn instanceof ReadWriteTxn ) )
         {
             throw new IllegalStateException( "Unexpected txn state when preparing for logical
data reinit:" + curTxn );
         }
-        
+
         if ( optimisticLock.isWriteLockedByCurrentThread() )
         {
             return true;
@@ -565,14 +558,14 @@ class DefaultTxnManager implements TxnMa
         }
     }
 
-    
+
     /**
      * If the thread holds optimistic lock, release it
      */
     private void releaseOptimisticLock()
     {
         Transaction curTxn = getCurTxn();
-        
+
         if ( curTxn.isOptimisticLockHeld() )
         {
             if ( optimisticLock.isWriteLockedByCurrentThread() )
@@ -583,11 +576,12 @@ class DefaultTxnManager implements TxnMa
             {
                 optimisticLock.readLock().unlock();
             }
-            
+
             curTxn.clearOptimisticLockHeld();
         }
     }
 
+
     /**
      * Begins a read only txn. A read only txn does not put any log edits
      * to the txn log.Its start time is the latest committed txn's commit time. 
@@ -600,7 +594,7 @@ class DefaultTxnManager implements TxnMa
         optimisticLock.readLock().lock();
         txn.setOptimisticLockHeld();
         txn.setLogicalDataVersion( logicalDataVersion );
-        
+
         /*
          * Set the start time as the latest committed txn's commit time. We need to make
sure that
          * any change after our start time is not flushed to the partitions. Say we have
txn1 as the
@@ -661,7 +655,7 @@ class DefaultTxnManager implements TxnMa
         {
             optimisticLock.writeLock().lock();
         }
-        
+
         txn.setOptimisticLockHeld();
         txn.setLogicalDataVersion( logicalDataVersion );
 
@@ -695,9 +689,9 @@ class DefaultTxnManager implements TxnMa
         catch ( Exception e )
         {
             // Release optimistic lock if held
-            setCurTxn(txn);
+            setCurTxn( txn );
             releaseOptimisticLock();
-            
+
         }
         finally
         {
@@ -910,12 +904,14 @@ class DefaultTxnManager implements TxnMa
      *  only if flushing it will not cause a pending txn to see changes beyond its
      *  start time.
      *  throws Exception thrown if anything goes wrong during flush.
+     *  
+     *  @param shutdown is TxnManager is shutting down
      *
      */
-    private void flushTxns() throws Exception
+    private void flushTxns( boolean shutdown ) throws Exception
     {
-    	UserLogRecord lastLogRecord = null;
-    	
+        UserLogRecord lastLogRecord = null;
+
         // If flushing failed already, dont do anything anymore
         if ( flushFailed )
         {
@@ -943,7 +939,7 @@ class DefaultTxnManager implements TxnMa
                 txnToFlush.flushLogEdits( flushedToPartitions );
 
                 latestFlushedTxnLSN.set( txnToFlush.getCommitTime() );
-                
+
                 lastLogRecord = txnToFlush.getUserLogRecord();
             }
 
@@ -985,138 +981,143 @@ class DefaultTxnManager implements TxnMa
         {
             partitionIt.next().sync();
         }
-        
+
         numFlushes++;
-        
+
         if ( lastLogRecord != null )
         {
-        	lastFlushedLogAnchor.resetLogAnchor(lastLogRecord.getLogAnchor());
+            lastFlushedLogAnchor.resetLogAnchor( lastLogRecord.getLogAnchor() );
+        }
+
+        if ( shutdown )
+        {
+            advanceCheckPoint( lastFlushedLogAnchor );
         }
-        
-        if (numFlushes % DEFAULT_FLUSH_ROUNDS == 0 )
+        else if ( numFlushes % DEFAULT_FLUSH_ROUNDS == 0 )
         {
-        	advanceCheckPoint( lastFlushedLogAnchor );
+            advanceCheckPoint( lastFlushedLogAnchor );
         }
 
     }
-    
-    
+
+
     private void advanceCheckPoint( LogAnchor checkPoint )
     {
-    	wal.advanceCheckPoint(checkPoint);
+        wal.advanceCheckPoint( checkPoint );
     }
-    
-    
+
+
     private void getTxnsToReover()
     {
-    	LogScanner logScanner = wal.beginScan( initialScanPoint );
-    	UserLogRecord logRecord = new UserLogRecord();
-    	byte userRecord[]; 
-    	
-    	//System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
-    	
-    	try
-    	{
-	    	while ( logScanner.getNextRecord( logRecord ) )
-	        {
-	            userRecord = logRecord.getDataBuffer();
-	            ObjectInputStream in = buildStream( userRecord );
-	            
-	            EditType editType = EditType.values()[in.read()];
-	           
-	            
-	            if (editType == EditType.TXN_MARKER)
-	            {
-	            	TxnStateChange stateChange = new TxnStateChange();
-	            	stateChange.readExternal(in);
-	            	
-	            	if ( stateChange.getTxnState() == ChangeState.TXN_COMMIT )
-	            	{
-	            		//System.out.println("Adding txn " + stateChange.getTxnID() + " to the tobe
recovered txns");
-	            		txnsToRecover.add( new Long( stateChange.getTxnID() ) );
-	            	}
-	            }
-	        }
-    	}
-    	catch ( Exception e )
-    	{
-    		e.printStackTrace();
-    		// Ignore
-    	}
+        LogScanner logScanner = wal.beginScan( initialScanPoint );
+        UserLogRecord logRecord = new UserLogRecord();
+        byte userRecord[];
+
+        //System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
+
+        try
+        {
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                ObjectInputStream in = buildStream( userRecord );
+
+                EditType editType = EditType.values()[in.read()];
+
+                if ( editType == EditType.TXN_MARKER )
+                {
+                    TxnStateChange stateChange = new TxnStateChange();
+                    stateChange.readExternal( in );
+
+                    if ( stateChange.getTxnState() == ChangeState.TXN_COMMIT )
+                    {
+                        //System.out.println("Adding txn " + stateChange.getTxnID() + " to
the tobe recovered txns");
+                        txnsToRecover.add( new Long( stateChange.getTxnID() ) );
+                    }
+                }
+            }
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();
+            // Ignore
+        }
     }
-    
-    
+
+
     // Walk over the txn log records from the latest checkpoint and apply the
     // log records to the partition
     public void recoverPartition( Partition partition )
     {
-    	Dn partitionSuffix = partition.getSuffixDn();
-    	
-    	//System.out.println("Recover partition " + partitionSuffix);
-    	
-    	LogScanner logScanner = wal.beginScan( initialScanPoint );
-    	UserLogRecord logRecord = new UserLogRecord();
-    	byte userRecord[]; 
-    	
-    	boolean recoveredChanges = false;
-    	
-    	try
-    	{
-	    	while ( logScanner.getNextRecord( logRecord ) )
-	        {
-	            userRecord = logRecord.getDataBuffer();
-	            ObjectInputStream in = buildStream( userRecord );
-	            
-	            EditType editType = EditType.values()[in.read()];
-	            
-	            if (editType == EditType.DATA_CHANGE)
-	            {
-	            	DataChangeContainer dataChangeContainer = new DataChangeContainer();
-	            	dataChangeContainer.readExternal(in);
-	            	
-	            	//System.out.println("Data change container for " + dataChangeContainer.getPartitionDn()
+ 
-	            		//	" txn id " + dataChangeContainer.getTxnID() );
-	            	
-	            	// If this change is for the partition we are tyring to recover 
-	                // and belongs to a txn that committed, then 
-	            	Long txnID = new Long( dataChangeContainer.getTxnID() );
-	         
-	            	if ( txnsToRecover.contains( txnID ) )
-	                {
-	            		if(	dataChangeContainer.getPartitionDn().equals( partitionSuffix ) )
-	            		{
-	            			//System.out.println("Apply change to partition " + partitionSuffix);
-	            			dataChangeContainer.setPartition( partition );
-	            			dataChangeContainer.apply( true );
-	            			recoveredChanges = true;
-	            		}
-	                }
-	            }
-	        }
-	    	
-	    	if ( recoveredChanges && partition instanceof SchemaPartition )
-	    	{
-	    		( (SchemaPartition) partition ).getSchemaManager().reloadAllEnabled();
-	    	}
-    	}
-    	catch ( Exception e )
-    	{
-    		e.printStackTrace();
-    		// Ignore for now
-    	}
+        Dn partitionSuffix = partition.getSuffixDn();
+
+        //System.out.println("Recover partition " + partitionSuffix);
+
+        LogScanner logScanner = wal.beginScan( initialScanPoint );
+        UserLogRecord logRecord = new UserLogRecord();
+        byte userRecord[];
+
+        boolean recoveredChanges = false;
+
+        try
+        {
+            while ( logScanner.getNextRecord( logRecord ) )
+            {
+                userRecord = logRecord.getDataBuffer();
+                ObjectInputStream in = buildStream( userRecord );
+
+                EditType editType = EditType.values()[in.read()];
+
+                if ( editType == EditType.DATA_CHANGE )
+                {
+                    DataChangeContainer dataChangeContainer = new DataChangeContainer();
+                    dataChangeContainer.readExternal( in );
+
+                    //System.out.println("Data change container for " + dataChangeContainer.getPartitionDn()
+ 
+                    //	" txn id " + dataChangeContainer.getTxnID() );
+
+                    // If this change is for the partition we are tyring to recover 
+                    // and belongs to a txn that committed, then 
+                    Long txnID = new Long( dataChangeContainer.getTxnID() );
+
+                    if ( txnsToRecover.contains( txnID ) )
+                    {
+                        if ( dataChangeContainer.getPartitionDn().equals( partitionSuffix
) )
+                        {
+                            //System.out.println("Apply change to partition " + partitionSuffix);
+                            dataChangeContainer.setPartition( partition );
+                            dataChangeContainer.apply( true );
+                            recoveredChanges = true;
+                        }
+                    }
+                }
+            }
+
+            if ( recoveredChanges && partition instanceof SchemaPartition )
+            {
+                ( ( SchemaPartition ) partition ).getSchemaManager().reloadAllEnabled();
+            }
+        }
+        catch ( Exception e )
+        {
+            e.printStackTrace();
+            // Ignore for now
+        }
     }
-    
-    
+
+
     public void setDoNotFlush()
     {
         doNotFlush = true;
     }
-    
+
+
     public void unsetDoNotFlush()
     {
         doNotFlush = false;
     }
-    
+
+
     private ObjectInputStream buildStream( byte[] buffer ) throws IOException
     {
         ObjectInputStream oIn = null;
@@ -1135,7 +1136,7 @@ class DefaultTxnManager implements TxnMa
     }
 
     class LogSyncer extends Thread
-    {	
+    {
         @Override
         public void run()
         {
@@ -1144,12 +1145,12 @@ class DefaultTxnManager implements TxnMa
             try
             {
                 while ( true )
-                {	
+                {
                     flushCondition.await( flushInterval, TimeUnit.MILLISECONDS );
-                    
+
                     if ( !doNotFlush )
                     {
-                        flushTxns();
+                        flushTxns( false );
                     }
                 }
             }



Mime
View raw message