activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r759304 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
Date Fri, 27 Mar 2009 18:55:00 GMT
Author: chirino
Date: Fri Mar 27 18:55:00 2009
New Revision: 759304

URL: http://svn.apache.org/viewvc?rev=759304&view=rev
Log:
Implemented the unit of work feature in the kaha db store.
Implemented Flushing and also the onFlush callbacks.


Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=759304&r1=759303&r2=759304&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Fri Mar 27 18:55:00 2009
@@ -24,6 +24,8 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -43,6 +45,7 @@
 import org.apache.activemq.broker.store.kahadb.Data.Type.TypeCreatable;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
 import org.apache.activemq.protobuf.MessageBuffer;
 import org.apache.activemq.protobuf.PBMessage;
 import org.apache.commons.logging.Log;
@@ -59,27 +62,36 @@
 
 public class KahaDBStore implements Store {
 
+    private static final int BEGIN_UNIT_OF_WORK = -1;
+    private static final int END_UNIT_OF_WORK = -2;
+    private static final int FLUSH = -3;
+
     private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+    private static final ByteSequence BEGIN_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { END_UNIT_OF_WORK });
+    private static final ByteSequence END_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { END_UNIT_OF_WORK });
+    private static final ByteSequence FLUSH_DATA = new ByteSequence(new byte[] { FLUSH });
+
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
-    
+
     protected PageFile pageFile;
     protected Journal journal;
-    
+
     protected RootEntity rootEntity = new RootEntity();
 
     protected boolean failIfDatabaseIsLocked;
     protected boolean deleteAllMessages;
     protected File directory;
     protected Thread checkpointThread;
-    protected boolean enableJournalDiskSyncs=true;
-    long checkpointInterval = 5*1000;
-    long cleanupInterval = 30*1000;
+    protected boolean enableJournalDiskSyncs = true;
+    long checkpointInterval = 5 * 1000;
+    long cleanupInterval = 30 * 1000;
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     boolean enableIndexWriteAsync = false;
-    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
-    
+    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+
     protected AtomicBoolean started = new AtomicBoolean();
     protected AtomicBoolean opened = new AtomicBoolean();
     private LockFile lockFile;
@@ -89,12 +101,18 @@
     protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
     private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
 
-    ///////////////////////////////////////////////////////////////////
+    private static class UoWOperation {
+        public TypeCreatable bean;
+        public Location location;
+        public ByteSequence data;
+    }
+
+    // /////////////////////////////////////////////////////////////////
     // Lifecylce methods
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     public void start() throws Exception {
         if (started.compareAndSet(false, true)) {
-        	load();
+            load();
         }
     }
 
@@ -104,10 +122,10 @@
         }
     }
 
-	private void loadPageFile() throws IOException {
-	    indexLock.writeLock().lock();
-		try {
-		    final PageFile pageFile = getPageFile();
+    private void loadPageFile() throws IOException {
+        indexLock.writeLock().lock();
+        try {
+            final PageFile pageFile = getPageFile();
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -126,114 +144,113 @@
         } finally {
             indexLock.writeLock().unlock();
         }
-	}
-	
-	
-    
-	/**
-	 * @throws IOException
-	 */
-	public void open() throws IOException {
-		if( opened.compareAndSet(false, true) ) {
-		    if( directory ==null ) {
-		        throw new IllegalArgumentException("The directory property must be set.");
-		    }
-		    
+    }
+
+    /**
+     * @throws IOException
+     */
+    public void open() throws IOException {
+        if (opened.compareAndSet(false, true)) {
+            if (directory == null) {
+                throw new IllegalArgumentException("The directory property must be set.");
+            }
+
             File lockFileName = new File(directory, "lock");
             lockFile = new LockFile(lockFileName, true);
-	        if (failIfDatabaseIsLocked) {
-	            lockFile.lock();
-	        } else {
-	            while (true) {
-	                try {
-	                    lockFile.lock();
-	                    break;
-	                } catch (IOException e) {
-	                    LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.");
-	                    try {
-	                        Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
-	                    } catch (InterruptedException e1) {
-	                    }
-	                }
-	            }
-	        }
-	        
+            if (failIfDatabaseIsLocked) {
+                lockFile.lock();
+            } else {
+                while (true) {
+                    try {
+                        lockFile.lock();
+                        break;
+                    } catch (IOException e) {
+                        LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.");
+                        try {
+                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
+                        } catch (InterruptedException e1) {
+                        }
+                    }
+                }
+            }
+
             getJournal().start();
-            
-	        loadPageFile();
-	        
-	        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-	            public void run() {
-	                try {
-	                    long lastCleanup = System.currentTimeMillis();
-	                    long lastCheckpoint = System.currentTimeMillis();
-	                    
-	                    // Sleep for a short time so we can periodically check 
-	                    // to see if we need to exit this thread.
-	                    long sleepTime = Math.min(checkpointInterval, 500);
-	                    while (opened.get()) {
-	                        Thread.sleep(sleepTime);
-	                        long now = System.currentTimeMillis();
-	                        if( now - lastCleanup >= cleanupInterval ) {
-	                            checkpointCleanup(true);
-	                            lastCleanup = now;
-	                            lastCheckpoint = now;
-	                        } else if( now - lastCheckpoint >= checkpointInterval ) {
-	                            checkpointCleanup(false);
-	                            lastCheckpoint = now;
-	                        }
-	                    }
-	                } catch (InterruptedException e) {
-	                    // Looks like someone really wants us to exit this thread...
-	                }
-	            }
-	        };
-	        checkpointThread.start();
+
+            loadPageFile();
+
+            checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+                public void run() {
+                    try {
+                        long lastCleanup = System.currentTimeMillis();
+                        long lastCheckpoint = System.currentTimeMillis();
+
+                        // Sleep for a short time so we can periodically check
+                        // to see if we need to exit this thread.
+                        long sleepTime = Math.min(checkpointInterval, 500);
+                        while (opened.get()) {
+                            Thread.sleep(sleepTime);
+                            long now = System.currentTimeMillis();
+                            if (now - lastCleanup >= cleanupInterval) {
+                                checkpointCleanup(true);
+                                lastCleanup = now;
+                                lastCheckpoint = now;
+                            } else if (now - lastCheckpoint >= checkpointInterval) {
+                                checkpointCleanup(false);
+                                lastCheckpoint = now;
+                            }
+                        }
+                    } catch (InterruptedException e) {
+                        // Looks like someone really wants us to exit this
+                        // thread...
+                    }
+                }
+            };
+            checkpointThread.start();
             recover();
-		}
-	}
-	
+        }
+    }
+
     public void load() throws IOException {
-    	indexLock.writeLock().lock();
+        indexLock.writeLock().lock();
         try {
-	    	open();
-	    	
-	        if (deleteAllMessages) {
-	            journal.delete();
-	
-	            pageFile.unload();
-	            pageFile.delete();
-	            rootEntity = new RootEntity();
-	            
-	            LOG.info("Persistence store purged.");
-	            deleteAllMessages = false;
-	            
-	            loadPageFile();
-	        }
-	        store( new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
+            open();
+
+            if (deleteAllMessages) {
+                journal.delete();
+
+                pageFile.unload();
+                pageFile.delete();
+                rootEntity = new RootEntity();
+
+                LOG.info("Persistence store purged.");
+                deleteAllMessages = false;
+
+                loadPageFile();
+            }
+            store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
         } finally {
             indexLock.writeLock().unlock();
         }
 
     }
 
-	public void close() throws IOException, InterruptedException {
-		if( opened.compareAndSet(true, false)) {
-		    
-	        indexLock.writeLock().lock();
-	        try {
-	            pageFile.unload();
-	            rootEntity = new RootEntity();
-	        } finally {
-	            indexLock.writeLock().unlock();
-	        }
-	        journal.close();
-	        checkpointThread.join();
-	        lockFile.unlock();
-	        lockFile=null;
-		}
-	}
-	
+    public void close() throws IOException, InterruptedException {
+        if (opened.compareAndSet(true, false)) {
+
+            indexLock.writeLock().lock();
+            try {
+                pageFile.unload();
+                rootEntity = new RootEntity();
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+            journal.close();
+            checkpointThread.join();
+            lockFile.unlock();
+            lockFile = null;
+        }
+    }
+
     public void unload() throws IOException, InterruptedException {
         if (pageFile.isLoaded()) {
             indexLock.writeLock().lock();
@@ -251,9 +268,9 @@
         }
     }
 
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     // Recovery methods
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
 
     /**
      * Move all the messages that were in the journal into long term storage. We
@@ -267,30 +284,59 @@
     private void recover() throws IllegalStateException, IOException {
         indexLock.writeLock().lock();
         try {
-	        long start = System.currentTimeMillis();
-	        
-	        Location recoveryPosition = getRecoveryPosition();
-	        if( recoveryPosition!=null ) {
-		        int redoCounter = 0;
-		        while (recoveryPosition != null) {
-		            final TypeCreatable message = load(recoveryPosition);
-		            final Location location = recoveryPosition;
-		            rootEntity.setLastUpdate(recoveryPosition);
-		            
-	                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-	                    public void execute(Transaction tx) throws IOException {
-	                        updateIndex(tx, message.toType(), (MessageBuffer)message, location);
-	                    }
-	                });		            
-		            
-		            redoCounter++;
-		            recoveryPosition = journal.getNextLocation(recoveryPosition);
-		        }
-		        long end = System.currentTimeMillis();
-	        	LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
-	        }
-	     
-	        // We may have to undo some index updates.
+            long start = System.currentTimeMillis();
+
+            ArrayList<UoWOperation> uow = null;
+            Location recoveryPosition = getRecoveryPosition();
+            if (recoveryPosition != null) {
+                int redoCounter = 0;
+                while (recoveryPosition != null) {
+
+                    ByteSequence data = journal.read(recoveryPosition);
+                    if (data.length == 1 && data.data[0] == BEGIN_UNIT_OF_WORK) {
+                        uow = new ArrayList<UoWOperation>();
+                    } else if (data.length == 1 && data.data[0] == END_UNIT_OF_WORK) {
+                        if (uow != null) {
+                            final ArrayList<UoWOperation> list = uow;
+                            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                                public void execute(Transaction tx) throws IOException {
+                                    for (UoWOperation op : list) {
+                                        updateIndex(tx, op.bean.toType(), (MessageBuffer) op.bean, op.location);
+                                        rootEntity.setLastUpdate(op.location);
+                                    }
+                                }
+                            });
+                            redoCounter += uow.size();
+                            uow = null;
+                        }
+                    } else if (data.length == 1 && data.data[0] == FLUSH) {
+                    } else {
+                        final TypeCreatable message = load(recoveryPosition);
+                        final Location location = recoveryPosition;
+                        if (uow != null) {
+                            UoWOperation op = new UoWOperation();
+                            op.bean = message;
+                            op.data = data;
+                            op.location = recoveryPosition;
+                            uow.add(op);
+                        } else {
+                            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                                public void execute(Transaction tx) throws IOException {
+                                    updateIndex(tx, message.toType(), (MessageBuffer) message, location);
+                                    rootEntity.setLastUpdate(location);
+                                }
+                            });
+                            redoCounter++;
+                        }
+                    }
+
+                    recoveryPosition = journal.getNextLocation(recoveryPosition);
+                }
+                long end = System.currentTimeMillis();
+                LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+            }
+
+            // We may have to undo some index updates.
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     recoverIndex(tx);
@@ -300,28 +346,28 @@
             indexLock.writeLock().unlock();
         }
     }
-    
+
     public void incrementalRecover() throws IOException {
         indexLock.writeLock().lock();
         try {
-            if( nextRecoveryPosition == null ) {
-                if( lastRecoveryPosition==null ) {
+            if (nextRecoveryPosition == null) {
+                if (lastRecoveryPosition == null) {
                     nextRecoveryPosition = getRecoveryPosition();
                 } else {
                     nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
-                }           
+                }
             }
             while (nextRecoveryPosition != null) {
                 lastRecoveryPosition = nextRecoveryPosition;
                 rootEntity.setLastUpdate(lastRecoveryPosition);
                 final TypeCreatable message = load(lastRecoveryPosition);
                 final Location location = lastRecoveryPosition;
-                
+
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        updateIndex(tx, message.toType(), (MessageBuffer)message, location);
+                        updateIndex(tx, message.toType(), (MessageBuffer) message, location);
                     }
-                });                 
+                });
 
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
@@ -329,67 +375,72 @@
             indexLock.writeLock().unlock();
         }
     }
-    
-	protected void recoverIndex(Transaction tx) throws IOException {
+
+    protected void recoverIndex(Transaction tx) throws IOException {
         long start = System.currentTimeMillis();
-        // It is possible index updates got applied before the journal updates.. 
-        // in that case we need to removed references to messages that are not in the journal
+        // It is possible index updates got applied before the journal updates..
+        // in that case we need to removed references to messages that are not
+        // in the journal
         final Location lastAppendLocation = journal.getLastAppendLocation();
-        long undoCounter=0;
+        long undoCounter = 0;
 
-// TODO        
-//        // Go through all the destinations to see if they have messages past the lastAppendLocation
-//        for (StoredDestinationState sd : storedDestinations.values()) {
-//        	
-//            final ArrayList<Long> matches = new ArrayList<Long>();
-//            // Find all the Locations that are >= than the last Append Location.
-//            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
-//				@Override
-//				protected void matched(Location key, Long value) {
-//					matches.add(value);
-//				}
-//            });
-//            
-//            
-//            for (Long sequenceId : matches) {
-//                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-//                sd.locationIndex.remove(tx, keys.location);
-//                sd.messageIdIndex.remove(tx, keys.messageId);
-//                undoCounter++;
-//                // TODO: do we need to modify the ack positions for the pub sub case?
-//			}
-//        }
+        // TODO
+        // // Go through all the destinations to see if they have messages past
+        // the lastAppendLocation
+        // for (StoredDestinationState sd : storedDestinations.values()) {
+        //        	
+        // final ArrayList<Long> matches = new ArrayList<Long>();
+        // // Find all the Locations that are >= than the last Append Location.
+        // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
+        // Long>(lastAppendLocation) {
+        // @Override
+        // protected void matched(Location key, Long value) {
+        // matches.add(value);
+        // }
+        // });
+        //            
+        //            
+        // for (Long sequenceId : matches) {
+        // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+        // sd.locationIndex.remove(tx, keys.location);
+        // sd.messageIdIndex.remove(tx, keys.messageId);
+        // undoCounter++;
+        // // TODO: do we need to modify the ack positions for the pub sub case?
+        // }
+        // }
         long end = System.currentTimeMillis();
-        if( undoCounter > 0 ) {
-        	// The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
-        	// should do sync writes to the journal.
-	        LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
+        if (undoCounter > 0) {
+            // The rolledback operations are basically in flight journal writes.
+            // To avoid getting these the end user
+            // should do sync writes to the journal.
+            LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
         }
-	}
-	
+    }
+
     public Location getLastUpdatePosition() throws IOException {
         return rootEntity.getLastUpdate();
     }
-    
-	private Location getRecoveryPosition() throws IOException {
-		
-        if( rootEntity.getLastUpdate()!=null) {
-            // Start replay at the record after the last one recorded in the index file.
+
+    private Location getRecoveryPosition() throws IOException {
+
+        if (rootEntity.getLastUpdate() != null) {
+            // Start replay at the record after the last one recorded in the
+            // index file.
             return journal.getNextLocation(rootEntity.getLastUpdate());
         }
-        
+
         // This loads the first position.
         return journal.getNextLocation(null);
-	}
+    }
 
     protected void checkpointCleanup(final boolean cleanup) {
         try {
-        	long start = System.currentTimeMillis();
+            long start = System.currentTimeMillis();
             indexLock.writeLock().lock();
             try {
-            	if( !opened.get() ) {
-            		return;
-            	}
+                if (!opened.get()) {
+                    return;
+                }
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
                         checkpointUpdate(tx, cleanup);
@@ -398,17 +449,16 @@
             } finally {
                 indexLock.writeLock().unlock();
             }
-        	long end = System.currentTimeMillis();
-        	if( end-start > 100 ) { 
-        		LOG.warn("KahaDB Cleanup took "+(end-start));
-        	}
+            long end = System.currentTimeMillis();
+            if (end - start > 100) {
+                LOG.warn("KahaDB Cleanup took " + (end - start));
+            }
         } catch (IOException e) {
-        	e.printStackTrace();
+            e.printStackTrace();
         }
     }
 
-    
-	public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
+    public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
         indexLock.writeLock().lock();
         try {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -420,8 +470,8 @@
         } finally {
             indexLock.writeLock().unlock();
         }
-	}
-    
+    }
+
     /**
      * @param tx
      * @throws IOException
@@ -429,112 +479,124 @@
     private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
 
         LOG.debug("Checkpoint started.");
-        
+
         rootEntity.setState(OPEN_STATE);
         rootEntity.store(tx);
         pageFile.flush();
 
-        if( cleanup ) {
-        	
-        	final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
-        	
-        	// Don't GC files under replication
-        	if( journalFilesBeingReplicated!=null ) {
-        		gcCandidateSet.removeAll(journalFilesBeingReplicated);
-        	}
-        	
-        	// Don't GC files after the first in progress tx
-        	Location firstTxLocation = rootEntity.getLastUpdate();
-            
-            if( firstTxLocation!=null ) {
-            	while( !gcCandidateSet.isEmpty() ) {
-            		Integer last = gcCandidateSet.last();
-            		if( last >= firstTxLocation.getDataFileId() ) {
-            			gcCandidateSet.remove(last);
-            		} else {
-            			break;
-            		}
-            	}
-            }
-
-//            // Go through all the destinations to see if any of them can remove GC candidates.
-//            for (StoredDestinationState sd : storedDestinations.values()) {
-//            	if( gcCandidateSet.isEmpty() ) {
-//                	break;
-//                }
-//                
-//                // Use a visitor to cut down the number of pages that we load
-//                dbstate.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-//                    int last=-1;
-//                    public boolean isInterestedInKeysBetween(Location first, Location second) {
-//                    	if( first==null ) {
-//                    		SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
-//                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-//                    			subset.remove(second.getDataFileId());
-//                    		}
-//							return !subset.isEmpty();
-//                    	} else if( second==null ) {
-//                    		SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
-//                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-//                    			subset.remove(first.getDataFileId());
-//                    		}
-//							return !subset.isEmpty();
-//                    	} else {
-//                    		SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
-//                    		if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
-//                    			subset.remove(first.getDataFileId());
-//                    		}
-//                    		if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
-//                    			subset.remove(second.getDataFileId());
-//                    		}
-//							return !subset.isEmpty();
-//                    	}
-//                    }
-//    
-//                    public void visit(List<Location> keys, List<Long> values) {
-//                    	for (Location l : keys) {
-//                            int fileId = l.getDataFileId();
-//							if( last != fileId ) {
-//                        		gcCandidateSet.remove(fileId);
-//                                last = fileId;
-//                            }
-//						}                        
-//                    }
-//    
-//                });
-//            }
-
-            if( !gcCandidateSet.isEmpty() ) {
-	            LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
-	            journal.removeDataFiles(gcCandidateSet);
+        if (cleanup) {
+
+            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
+
+            // Don't GC files under replication
+            if (journalFilesBeingReplicated != null) {
+                gcCandidateSet.removeAll(journalFilesBeingReplicated);
+            }
+
+            // Don't GC files after the first in progress tx
+            Location firstTxLocation = rootEntity.getLastUpdate();
+
+            if (firstTxLocation != null) {
+                while (!gcCandidateSet.isEmpty()) {
+                    Integer last = gcCandidateSet.last();
+                    if (last >= firstTxLocation.getDataFileId()) {
+                        gcCandidateSet.remove(last);
+                    } else {
+                        break;
+                    }
+                }
+            }
+
+            // // Go through all the destinations to see if any of them can
+            // remove GC candidates.
+            // for (StoredDestinationState sd : storedDestinations.values()) {
+            // if( gcCandidateSet.isEmpty() ) {
+            // break;
+            // }
+            //                
+            // // Use a visitor to cut down the number of pages that we load
+            // dbstate.locationIndex.visit(tx, new BTreeVisitor<Location,
+            // Long>() {
+            // int last=-1;
+            // public boolean isInterestedInKeysBetween(Location first, Location
+            // second) {
+            // if( first==null ) {
+            // SortedSet<Integer> subset =
+            // gcCandidateSet.headSet(second.getDataFileId()+1);
+            // if( !subset.isEmpty() && subset.last() == second.getDataFileId()
+            // ) {
+            // subset.remove(second.getDataFileId());
+            // }
+            // return !subset.isEmpty();
+            // } else if( second==null ) {
+            // SortedSet<Integer> subset =
+            // gcCandidateSet.tailSet(first.getDataFileId());
+            // if( !subset.isEmpty() && subset.first() == first.getDataFileId()
+            // ) {
+            // subset.remove(first.getDataFileId());
+            // }
+            // return !subset.isEmpty();
+            // } else {
+            // SortedSet<Integer> subset =
+            // gcCandidateSet.subSet(first.getDataFileId(),
+            // second.getDataFileId()+1);
+            // if( !subset.isEmpty() && subset.first() == first.getDataFileId()
+            // ) {
+            // subset.remove(first.getDataFileId());
+            // }
+            // if( !subset.isEmpty() && subset.last() == second.getDataFileId()
+            // ) {
+            // subset.remove(second.getDataFileId());
+            // }
+            // return !subset.isEmpty();
+            // }
+            // }
+            //    
+            // public void visit(List<Location> keys, List<Long> values) {
+            // for (Location l : keys) {
+            // int fileId = l.getDataFileId();
+            // if( last != fileId ) {
+            // gcCandidateSet.remove(fileId);
+            // last = fileId;
+            // }
+            // }
+            // }
+            //    
+            // });
+            // }
+
+            if (!gcCandidateSet.isEmpty()) {
+                LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
+                journal.removeDataFiles(gcCandidateSet);
             }
         }
-        
+
         LOG.debug("Checkpoint done.");
     }
-    
+
     public HashSet<Integer> getJournalFilesBeingReplicated() {
-		return journalFilesBeingReplicated;
-	}
-    
-    ///////////////////////////////////////////////////////////////////
+        return journalFilesBeingReplicated;
+    }
+
+    // /////////////////////////////////////////////////////////////////
     // Store interface
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
     long messageSequence;
 
     public Location store(TypeCreatable data) throws IOException {
-        return store(data, false);
+        return store(data, null);
     }
-    
+
     /**
      * All updated are are funneled through this method. The updates a converted
-     * to a PBMessage which is logged to the journal and then the data from
-     * the PBMessage is used to update the index just like it would be done
-     * during a recovery process.
-     * @throws IOException 
+     * to a PBMessage which is logged to the journal and then the data from the
+     * PBMessage is used to update the index just like it would be done during a
+     * recovery process.
+     * 
+     * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    public Location store(final TypeCreatable data, boolean sync) throws IOException {
+    public Location store(final TypeCreatable data, Runnable onFlush) throws IOException {
         final MessageBuffer message = ((PBMessage) data).freeze();
         int size = message.serializedSizeUnframed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -543,11 +605,11 @@
 
         long start = System.currentTimeMillis();
         final Location location;
-        synchronized(journal) {
-            location = journal.write(os.toByteSequence(), sync);
+        synchronized (journal) {
+            location = journal.write(os.toByteSequence(), onFlush);
         }
         long start2 = System.currentTimeMillis();
-        
+
         try {
             indexLock.writeLock().lock();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -561,67 +623,66 @@
         }
 
         long end = System.currentTimeMillis();
-        if( end-start > 100 ) { 
-            LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
+        if (end - start > 100) {
+            LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
         }
         return location;
     }
-    
-    
+
     public void store(List<TypeCreatable> batch) throws IOException {
-        store(batch, false);
+        store(batch, null);
     }
-    
+
     // ArrayList<TypeCreatable>
     /**
      * All updated are are funneled through this method. The updates a converted
-     * to a PBMessage which is logged to the journal and then the data from
-     * the PBMessage is used to update the index just like it would be done
-     * during a recovery process.
-     * @throws IOException 
+     * to a PBMessage which is logged to the journal and then the data from the
+     * PBMessage is used to update the index just like it would be done during a
+     * recovery process.
+     * 
+     * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    public void store(final List<TypeCreatable> batch, boolean sync) throws IOException {
-        if( batch.isEmpty() ) {
+    public void store(final List<TypeCreatable> batch, Runnable onFlush) throws IOException {
+        if (batch.isEmpty()) {
             return;
         }
-        if( batch.size()==1 ) {
-            store(batch.get(0), sync);
+        if (batch.size() == 1) {
+            store(batch.get(0), onFlush);
             return;
         }
-        
-        
-        ArrayList<ByteSequence> encodedData = new ArrayList<ByteSequence>(batch.size());
-        for (TypeCreatable data : batch) {
-            final MessageBuffer message = ((PBMessage) data).freeze();
+
+        final ArrayList<UoWOperation> uow = new ArrayList<UoWOperation>(batch.size());
+        for (TypeCreatable bean : batch) {
+            final MessageBuffer message = ((PBMessage) bean).freeze();
             int size = message.serializedSizeUnframed();
             DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-            os.writeByte(data.toType().getNumber());
+            os.writeByte(bean.toType().getNumber());
             message.writeUnframed(os);
-            encodedData.add(os.toByteSequence());
+            UoWOperation op = new UoWOperation();
+            op.bean = bean;
+            op.data = os.toByteSequence();
+            uow.add(op);
         }
-        
 
-        final ArrayList<Location> locations = new ArrayList<Location>(batch.size());
         long start = System.currentTimeMillis();
-        synchronized(journal) {
-            for (ByteSequence bs : encodedData) {
-                Location location = journal.write(bs, sync);
-                locations.add(location);
+        synchronized (journal) {
+            journal.write(BEGIN_UNIT_OF_WORK_DATA, false);
+            for (UoWOperation op : uow) {
+                op.location = journal.write(op.data, false);
             }
+            journal.write(END_UNIT_OF_WORK_DATA, onFlush);
         }
         long start2 = System.currentTimeMillis();
-        
+
         try {
             indexLock.writeLock().lock();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    for( int i=0; i < batch.size(); i++) {
-                        TypeCreatable data = batch.get(i);
-                        MessageBuffer message = ((PBMessage) data).freeze();
-                        Location location = locations.get(i);
-                        updateIndex(tx, data.toType(), message, location);
-                        rootEntity.setLastUpdate(location);
+                    for (UoWOperation op : uow) {
+                        MessageBuffer message = ((PBMessage) op.bean).freeze();
+                        updateIndex(tx, op.bean.toType(), message, op.location);
+                        rootEntity.setLastUpdate(op.location);
                     }
                 }
             });
@@ -630,8 +691,8 @@
         }
 
         long end = System.currentTimeMillis();
-        if( end-start > 100 ) { 
-            LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
+        if (end - start > 100) {
+            LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
         }
     }
 
@@ -645,35 +706,39 @@
     @SuppressWarnings("unchecked")
     public TypeCreatable load(Location location) throws IOException {
         ByteSequence data = journal.read(location);
+        return load(location, data);
+    }
+
+    private TypeCreatable load(Location location, ByteSequence data) throws IOException, InvalidProtocolBufferException {
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         byte readByte = is.readByte();
         Type type = Type.valueOf(readByte);
-        if( type == null ) {
-            throw new IOException("Could not load journal record. Invalid location: "+location);
+        if (type == null) {
+            throw new IOException("Could not load journal record. Invalid location: " + location);
         }
-        MessageBuffer message = type.parseUnframed(new Buffer(data.data, data.offset+1, data.length-1));
-        return (TypeCreatable)message;
+        MessageBuffer message = type.parseUnframed(new Buffer(data.data, data.offset + 1, data.length - 1));
+        return (TypeCreatable) message;
     }
 
     @SuppressWarnings("unchecked")
     public void updateIndex(Transaction tx, Type type, MessageBuffer command, Location location) throws IOException {
         switch (type) {
         case MESSAGE_ADD:
-            messageAdd(tx, (MessageAdd)command, location);
+            messageAdd(tx, (MessageAdd) command, location);
             return;
         case QUEUE_ADD:
-            queueAdd(tx, (QueueAdd)command, location);
+            queueAdd(tx, (QueueAdd) command, location);
             return;
         case QUEUE_REMOVE:
-            queueRemove(tx, (QueueRemove)command, location);
+            queueRemove(tx, (QueueRemove) command, location);
             return;
         case QUEUE_ADD_MESSAGE:
-            queueAddMessage(tx, (QueueAddMessage)command, location);
+            queueAddMessage(tx, (QueueAddMessage) command, location);
             return;
         case QUEUE_REMOVE_MESSAGE:
-            queueRemoveMessage(tx, (QueueRemoveMessage)command, location);
+            queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
             return;
-            
+
         case TRANSACTION_BEGIN:
         case TRANSACTION_ADD_MESSAGE:
         case TRANSACTION_REMOVE_MESSAGE:
@@ -694,24 +759,25 @@
     private void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
         rootEntity.messageAdd(tx, command, location);
     }
-    
+
     private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
         rootEntity.queueAdd(tx, command.getQueueName());
     }
-    
+
     private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
         rootEntity.queueRemove(tx, command.getQueueName());
     }
-    
+
     private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
         DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
-        if( destination!=null ) {
+        if (destination != null) {
             destination.add(tx, command);
         }
     }
+
     private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
         DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
-        if( destination!=null ) {
+        if (destination != null) {
             destination.remove(tx, command.getQueueKey());
         }
     }
@@ -719,54 +785,55 @@
     class KahaDBSession implements Session {
         ArrayList<TypeCreatable> updates = new ArrayList<TypeCreatable>();
 
-        private Transaction tx; 
+        private Transaction tx;
+
         private Transaction tx() {
-            if( tx ==null ) {
+            if (tx == null) {
                 indexLock.readLock().lock();
                 tx = pageFile.tx();
             }
             return tx;
         }
-        
+
         public void close() {
             try {
-                if( tx!=null ) {
+                if (tx != null) {
                     tx.rollback();
                 }
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             } finally {
-                if( tx!=null ) {
+                if (tx != null) {
                     indexLock.readLock().unlock();
-                    tx=null;
+                    tx = null;
                 }
             }
         }
 
-        public void commit() {
+        public void commit(Runnable onFlush) {
             try {
-                if( tx!=null ) {
+                if (tx != null) {
                     tx.commit();
                 }
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             } finally {
-                if( tx!=null ) {
+                if (tx != null) {
                     indexLock.readLock().unlock();
-                    tx=null;
+                    tx = null;
                 }
             }
-            
+
             try {
-                store(updates);
+                store(updates, onFlush);
             } catch (IOException e) {
                 throw new FatalStoreException(e);
             }
         }
-        
-        ///////////////////////////////////////////////////////////////
+
+        // /////////////////////////////////////////////////////////////
         // Message related methods.
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         public Long messageAdd(MessageRecord message) {
             Long id = rootEntity.nextMessageKey();
             MessageAddBean bean = new MessageAddBean();
@@ -774,25 +841,25 @@
             bean.setMessageId(message.getMessageId());
             bean.setEncoding(message.getEncoding());
             Buffer buffer = message.getBuffer();
-            if( buffer!=null ) {
+            if (buffer != null) {
                 bean.setBuffer(buffer);
-            }            
+            }
             Long streamKey = message.getStreamKey();
-            if( streamKey !=null ) {
+            if (streamKey != null) {
                 bean.setStreamKey(streamKey);
             }
             updates.add(bean);
             return id;
         }
-        
+
         public Long messageGetKey(AsciiBuffer messageId) {
             return rootEntity.messageGetKey(tx(), messageId);
         }
-        
+
         public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
             Location location = rootEntity.messageGetLocation(tx(), key);
-            if( location ==null ) {
-                throw new KeyNotFoundException("message key: "+key);
+            if (location == null) {
+                throw new KeyNotFoundException("message key: " + key);
             }
             try {
                 MessageAdd bean = (MessageAdd) load(location);
@@ -800,10 +867,10 @@
                 rc.setKey(bean.getMessageKey());
                 rc.setMessageId(bean.getMessageId());
                 rc.setEncoding(bean.getEncoding());
-                if( bean.hasBuffer() ) {
+                if (bean.hasBuffer()) {
                     rc.setBuffer(bean.getBuffer());
                 }
-                if( bean.hasStreamKey() ) {
+                if (bean.hasStreamKey()) {
                     rc.setStreamKey(bean.getStreamKey());
                 }
                 return rc;
@@ -812,44 +879,49 @@
             }
         }
 
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         // Queue related methods.
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         public void queueAdd(AsciiBuffer queueName) {
             updates.add(new QueueAddBean().setQueueName(queueName));
         }
+
         public void queueRemove(AsciiBuffer queueName) {
             updates.add(new QueueRemoveBean().setQueueName(queueName));
         }
+
         public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
             return rootEntity.queueList(tx(), firstQueueName, max);
         }
+
         public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
             DestinationEntity destination = rootEntity.getDestination(queueName);
-            if( destination ==null ) {
-                throw new KeyNotFoundException("queue key: "+queueName);
+            if (destination == null) {
+                throw new KeyNotFoundException("queue key: " + queueName);
             }
             Long queueKey = destination.nextQueueKey();
             QueueAddMessageBean bean = new QueueAddMessageBean();
             bean.setQueueName(queueName);
             bean.setQueueKey(queueKey);
             bean.setMessageKey(record.getMessageKey());
-            if( record.getAttachment()!=null ) {
+            if (record.getAttachment() != null) {
                 bean.setAttachment(record.getAttachment());
             }
             updates.add(bean);
             return queueKey;
         }
+
         public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
             QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
             bean.setQueueKey(queueKey);
             bean.setQueueName(queueName);
             updates.add(bean);
         }
+
         public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
             DestinationEntity destination = rootEntity.getDestination(queueName);
-            if( destination ==null ) {
-                throw new KeyNotFoundException("queue key: "+queueName);
+            if (destination == null) {
+                throw new KeyNotFoundException("queue key: " + queueName);
             }
             try {
                 return destination.listMessages(tx(), firstQueueKey, max);
@@ -857,73 +929,87 @@
                 throw new FatalStoreException(e);
             }
         }
-        
-        
-        ///////////////////////////////////////////////////////////////
+
+        // /////////////////////////////////////////////////////////////
         // Map related methods.
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         public boolean mapAdd(AsciiBuffer map) {
             return false;
         }
+
         public boolean mapRemove(AsciiBuffer map) {
             return false;
         }
+
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
             return null;
         }
+
         public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
             return null;
         }
+
         public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
             return null;
         }
+
         public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
             return null;
         }
+
         public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
             return null;
         }
 
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         // Stream related methods.
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         public Long streamOpen() {
             return null;
         }
+
         public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
         }
+
         public void streamClose(Long streamKey) throws KeyNotFoundException {
         }
+
         public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
             return null;
         }
+
         public boolean streamRemove(Long streamKey) {
             return false;
         }
 
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         // Transaction related methods.
-        ///////////////////////////////////////////////////////////////
+        // /////////////////////////////////////////////////////////////
         public void transactionAdd(Buffer txid) {
         }
+
         public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
         }
+
         public void transactionCommit(Buffer txid) throws KeyNotFoundException {
         }
+
         public Iterator<Buffer> transactionList(Buffer first, int max) {
             return null;
         }
+
         public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
         }
+
         public void transactionRollback(Buffer txid) throws KeyNotFoundException {
         }
     }
-    
+
     public <R, T extends Exception> R execute(final Callback<R, T> callback, final Runnable onFlush) throws T {
         KahaDBSession session = new KahaDBSession();
         try {
             R rc = callback.execute(session);
-            session.commit();
+            session.commit(onFlush);
             return rc;
         } finally {
             session.close();
@@ -931,11 +1017,31 @@
     }
 
     public void flush() {
+        try {
+            final CountDownLatch done = new CountDownLatch(1);
+            synchronized (journal) {
+                journal.write(FLUSH_DATA, new Runnable() {
+                    public void run() {
+                        done.countDown();
+                    }
+                });
+            }
+            
+            // Keep trying waiting for the flush to happen unless the store 
+            // has been stopped.
+            while(started.get()) {
+                if( done.await(100, TimeUnit.MILLISECONDS) ) {
+                    return;
+                }
+            }
+        } catch (Exception e) {
+            throw new FatalStoreException(e);
+        }
     }
-    
-    ///////////////////////////////////////////////////////////////////
+
+    // /////////////////////////////////////////////////////////////////
     // IoC Properties.
-    ///////////////////////////////////////////////////////////////////
+    // /////////////////////////////////////////////////////////////////
 
     protected PageFile createPageFile() {
         PageFile index = new PageFile(directory, "db");
@@ -966,7 +1072,7 @@
     public void setDeleteAllMessages(boolean deleteAllMessages) {
         this.deleteAllMessages = deleteAllMessages;
     }
-    
+
     public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
         this.setIndexWriteBatchSize = setIndexWriteBatchSize;
     }
@@ -974,15 +1080,15 @@
     public int getIndexWriteBatchSize() {
         return setIndexWriteBatchSize;
     }
-    
+
     public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
         this.enableIndexWriteAsync = enableIndexWriteAsync;
     }
-    
+
     boolean isEnableIndexWriteAsync() {
         return enableIndexWriteAsync;
     }
-    
+
     public boolean isEnableJournalDiskSyncs() {
         return enableJournalDiskSyncs;
     }
@@ -1010,11 +1116,11 @@
     public void setJournalMaxFileLength(int journalMaxFileLength) {
         this.journalMaxFileLength = journalMaxFileLength;
     }
-    
+
     public int getJournalMaxFileLength() {
         return journalMaxFileLength;
     }
-    
+
     public PageFile getPageFile() {
         if (pageFile == null) {
             pageFile = createPageFile();

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=759304&r1=759303&r2=759304&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Fri Mar 27 18:55:00 2009
@@ -22,7 +22,6 @@
 
 import junit.framework.TestCase;
 
-import org.apache.activemq.broker.store.Store.Callback;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
@@ -35,7 +34,10 @@
 
 public abstract class StorePerformanceBase extends TestCase {
 
-    private static final int PERFORMANCE_SAMPLES = 30000;
+    private static int PERFORMANCE_SAMPLES = 3;
+    private static boolean SYNC_TO_DISK = false;
+    
+    
     private Store store;
     private AsciiBuffer queueName;
 
@@ -99,6 +101,14 @@
                     messageRecord.setEncoding(new AsciiBuffer("encoding"));
                     messageRecord.setBuffer(buffer);
 
+                    Runnable onFlush = new Runnable(){
+                        public void run() {
+                            rate.increment();
+                            synchronized(wakeupMutex){
+                                wakeupMutex.notify();
+                            }
+                        }
+                    };
                     store.execute(new VoidCallback<Exception>() {
                         @Override
                         public void run(Session session) throws Exception {
@@ -107,11 +117,12 @@
                             queueRecord.setMessageKey(messageKey);
                             session.queueAddMessage(queueName, queueRecord);
                         }
-                    }, null);
-                    rate.increment();
-                    synchronized(wakeupMutex){
-                        wakeupMutex.notify();
+                    }, onFlush);
+                    
+                    if( SYNC_TO_DISK ) {
+                        store.flush();
                     }
+
                     
                     if( sleep>0 ) {
                         Thread.sleep(sleep);
@@ -146,23 +157,33 @@
         public void run() {
             try {
                 while( !stopped.get() ) {
-                    ArrayList<MessageRecord> records = store.execute(new Callback<ArrayList<MessageRecord>, Exception>() {
-                        public ArrayList<MessageRecord> execute(Session session) throws Exception {
-                            ArrayList<MessageRecord> rc = new ArrayList<MessageRecord>(1000);
+                    final ArrayList<MessageRecord> records = new ArrayList<MessageRecord>(1000);;
+                    Runnable onFlush = new Runnable(){
+                        public void run() {
+                            rate.increment(records.size());
+                            if( records.isEmpty() ) {
+                                synchronized(wakeupMutex){
+                                    try {
+                                        wakeupMutex.wait(500);
+                                    } catch (InterruptedException e) {
+                                    }
+                                }
+                            }
+                        }
+                    };
+                    store.execute(new VoidCallback<Exception>() {
+                        @Override
+                        public void run(Session session) throws Exception {
                             Iterator<QueueRecord> queueRecords = session.queueListMessagesQueue(queueName, null, 1000);
                             for (Iterator<QueueRecord> iterator = queueRecords; iterator.hasNext();) {
                                 QueueRecord r = iterator.next();
-                                rc.add(session.messageGetRecord(r.getMessageKey()));
+                                records.add(session.messageGetRecord(r.getMessageKey()));
                                 session.queueRemoveMessage(queueName, r.queueKey);
                             }
-                            return rc;
-                        }
-                    }, null);
-                    rate.increment(records.size());
-                    if( records.isEmpty() ) {
-                        synchronized(wakeupMutex){
-                            wakeupMutex.wait(500);
                         }
+                    }, onFlush);
+                    if( SYNC_TO_DISK ) {
+                        store.flush();
                     }
                 }
             } catch (Exception e) {



Mime
View raw message