activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r696308 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/index/ main/java/org/apache/kahadb/journal/ main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/store/ test/java/org/apache/kahadb/store/perf/
Date Wed, 17 Sep 2008 13:59:38 GMT
Author: chirino
Date: Wed Sep 17 06:59:36 2008
New Revision: 696308

URL: http://svn.apache.org/viewvc?rev=696308&view=rev
Log:
- Checking the amount of disk space used is now much faster and accurate.
- removed ref counting of journal files since that was un-used.
- Fixed NPE that would occur in the BTree when the getFirst/getLast was used and tree depth
was > 1
- We now checkpoint and clean up at two different intervals.  We now checkpoint a much higher
rate so that recover is much faster.
- Made recovery error handling more robust.


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Wed Sep 17
06:59:36 2008
@@ -541,7 +541,7 @@
             node = node.getChild(tx, 0);
         }
         if( node.values.length>0 ) {
-            return new KeyValueEntry(keys[0], values[0]);
+            return new KeyValueEntry(node.keys[0], node.values[0]);
         } else {
             return null;
         }
@@ -553,8 +553,8 @@
             node = node.getChild(tx, children.length-1);
         }
         if( node.values.length>0 ) {
-            int idx = values.length-1;
-            return new KeyValueEntry(keys[idx], values[idx]);
+            int idx = node.values.length-1;
+            return new KeyValueEntry(node.keys[idx], node.values[idx]);
         } else {
             return null;
         }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Wed Sep
17 06:59:36 2008
@@ -33,14 +33,10 @@
 
     protected final File file;
     protected final Integer dataFileId;
-    protected final int preferedSize;
-
     protected int length;
-    protected int referenceCount;
 
     DataFile(File file, int number, int preferedSize) {
         this.file = file;
-        this.preferedSize = preferedSize;
         this.dataFileId = Integer.valueOf(number);
         length = (int)(file.exists() ? file.length() : 0);
     }
@@ -65,44 +61,15 @@
         length += size;
     }
 
-    public synchronized int increment() {
-        return ++referenceCount;
-    }
-
-    public synchronized int decrement() {
-        return --referenceCount;
-    }
-    
-    public synchronized int getReferenceCount(){
-    	return referenceCount;
-    }
-
-    public synchronized boolean isUnused() {
-        return referenceCount <= 0;
-    }
-
     public synchronized String toString() {
-        String result = file.getName() + " number = " + dataFileId + " , length = " + length
+ " refCount = " + referenceCount;
-        return result;
+        return file.getName() + " number = " + dataFileId + " , length = " + length;
     }
 
-    public synchronized RandomAccessFile openRandomAccessFile(boolean appender) throws IOException
{
-        RandomAccessFile rc = new RandomAccessFile(file, "rw");
-        // When we start to write files size them up so that the OS has a chance
-        // to allocate the file contigously.
-        if (appender) {
-            if (length < preferedSize) {
-                rc.setLength(preferedSize);
-            }
-        }
-        return rc;
+    public synchronized RandomAccessFile openRandomAccessFile() throws IOException {
+        return new RandomAccessFile(file, "rw");
     }
 
     public synchronized void closeRandomAccessFile(RandomAccessFile file) throws IOException
{
-        // On close set the file size to the real size.
-        if (length != file.length()) {
-            file.setLength(getLength());
-        }
         file.close();
     }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
Wed Sep 17 06:59:36 2008
@@ -46,7 +46,7 @@
     public DataFileAccessor(Journal dataManager, DataFile dataFile) throws IOException {
         this.dataFile = dataFile;
         this.inflightWrites = dataManager.getInflightWrites();
-        this.file = dataFile.openRandomAccessFile(false);
+        this.file = dataFile.openRandomAccessFile();
     }
 
     public DataFile getDataFile() {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Wed Sep 17 06:59:36 2008
@@ -326,10 +326,14 @@
                 WriteBatch wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
+                        file.setLength(dataFile.getLength());
                         dataFile.closeRandomAccessFile(file);
                     }
                     dataFile = wb.dataFile;
-                    file = dataFile.openRandomAccessFile(true);
+                    file = dataFile.openRandomAccessFile();
+                    if( file.length() < dataManager.preferedFileLength ) {
+                        file.setLength(dataManager.preferedFileLength);
+                    }
                 }
 
                 WriteCommand write = wb.writes.getHead();

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Wed Sep 17
06:59:36 2008
@@ -101,17 +101,9 @@
     protected Location mark;
     protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
     protected Runnable cleanupTask;
-    protected final AtomicLong storeSize;
+    protected final AtomicLong totalLength = new AtomicLong();
     protected boolean archiveDataLogs;
 
-    public Journal(AtomicLong storeSize) {
-        this.storeSize = storeSize;
-    }
-
-    public Journal() {
-        this(new AtomicLong());
-    }
-
     @SuppressWarnings("unchecked")
     public synchronized void start() throws IOException {
         if (started) {
@@ -147,7 +139,7 @@
                     int num = Integer.parseInt(numStr);
                     DataFile dataFile = new DataFile(file, num, preferedFileLength);
                     fileMap.put(dataFile.getDataFileId(), dataFile);
-                    storeSize.addAndGet(dataFile.getLength());
+                    totalLength.addAndGet(dataFile.getLength());
                 } catch (NumberFormatException e) {
                     // Ignore file that do not match the pattern.
                 }
@@ -270,32 +262,19 @@
             File file = new File(directory, fileName);
             DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
             // actually allocate the disk space
-            nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
             fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
             fileByFileMap.put(file, nextWriteFile);
             dataFiles.addLast(nextWriteFile);
-            
-            DataFile previous = dataFiles.getTail().getPrevious();
-            if (previous!=null && previous.isUnused()) {
-                removeDataFile(previous);
-            }
         }
         DataFile currentWriteFile = dataFiles.getTail();
         location.setOffset(currentWriteFile.getLength());
         location.setDataFileId(currentWriteFile.getDataFileId().intValue());
         int size = location.getSize();
         currentWriteFile.incrementLength(size);
-        currentWriteFile.increment();
-        storeSize.addAndGet(size);
+        totalLength.addAndGet(size);
         return currentWriteFile;
     }
 
-    public synchronized void removeLocation(Location location) throws IOException {
-
-        DataFile dataFile = getDataFile(location);
-        dataFile.decrement();
-    }
-
     synchronized DataFile getDataFile(Location item) throws IOException {
         Integer key = Integer.valueOf(item.getDataFileId());
         DataFile dataFile = fileMap.get(key);
@@ -350,7 +329,7 @@
         boolean result = true;
         for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
             DataFile dataFile = i.next();
-            storeSize.addAndGet(-dataFile.getLength());
+            totalLength.addAndGet(-dataFile.getLength());
             result &= dataFile.delete();
         }
         fileMap.clear();
@@ -369,40 +348,6 @@
         return result;
     }
 
-    public synchronized void addInterestInFile(int file) throws IOException {
-        if (file >= 0) {
-            Integer key = Integer.valueOf(file);
-            DataFile dataFile = fileMap.get(key);
-            if (dataFile == null) {
-                throw new IOException("That data file does not exist");
-            }
-            addInterestInFile(dataFile);
-        }
-    }
-
-    synchronized void addInterestInFile(DataFile dataFile) {
-        if (dataFile != null) {
-            dataFile.increment();
-        }
-    }
-
-    public synchronized void removeInterestInFile(int file) throws IOException {
-        if (file >= 0) {
-            Integer key = Integer.valueOf(file);
-            DataFile dataFile = fileMap.get(key);
-            removeInterestInFile(dataFile);
-        }
-
-    }
-
-    synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
-        if (dataFile != null) {
-            if (dataFile.decrement() <= 0) {
-                removeDataFile(dataFile);
-            }
-        }
-    }
-
     public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer
lastFile) throws IOException {
         Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
         unUsed.removeAll(inUse);
@@ -422,33 +367,11 @@
         }
     }
 
-    public synchronized void consolidateDataFiles() throws IOException {
-        List<DataFile> purgeList = new ArrayList<DataFile>();
-        for (DataFile dataFile : fileMap.values()) {
-            if (dataFile.isUnused()) {
-                purgeList.add(dataFile);
-            }
-        }
-        for (DataFile dataFile : purgeList) {
-            removeDataFile(dataFile);
-        }
-    }
-
-    private synchronized void removeDataFile(DataFile dataFile) throws IOException {
-
-        // Make sure we don't delete too much data.
-        if (dataFile == dataFiles.getTail() || mark == null || dataFile.getDataFileId() >=
mark.getDataFileId()) {
-            LOG.debug("Won't remove DataFile" + dataFile);
-            return;
-        }
-        forceRemoveDataFile(dataFile);
-    }
-
     private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
         accessorPool.disposeDataFileAccessors(dataFile);
         fileByFileMap.remove(dataFile.getFile());
         fileMap.remove(dataFile.getDataFileId());
-        storeSize.addAndGet(-dataFile.getLength());
+        totalLength.addAndGet(-dataFile.getLength());
         dataFile.unlink();
         if (archiveDataLogs) {
             dataFile.move(getDirectoryArchive());
@@ -706,25 +629,20 @@
         return fileByFileMap.keySet();
     }
 
-    synchronized public long getDiskSize() {
-        long rc = 0;
-        DataFile cur = dataFiles.getHead();
-        while (cur != null) {
-            rc += cur.getLength();
-            cur = cur.getNext();
-        }
-        return rc;
-    }
-
-    synchronized public long getDiskSizeUntil(Location startPosition) {
-        long rc = 0;
-        DataFile cur = dataFiles.getHead();
-        while (cur != null) {
-            if (cur.getDataFileId().intValue() >= startPosition.getDataFileId()) {
-                return rc + startPosition.getOffset();
+    public long getDiskSize() {
+        long tailLength=0;
+        synchronized( this ) {
+            if( !dataFiles.isEmpty() ) {
+                tailLength = dataFiles.getTail().getLength();
             }
-            rc += cur.getLength();
-            cur = cur.getNext();
+        }
+        
+        long rc = totalLength.get();
+        
+        // The last file is actually at a minimum preferedFileLength big.
+        if( tailLength < preferedFileLength ) {
+            rc -= tailLength;
+            rc += preferedFileLength;
         }
         return rc;
     }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
Wed Sep 17 06:59:36 2008
@@ -84,10 +84,14 @@
                 WriteBatch wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
+                        file.setLength(dataFile.getLength());
                         dataFile.closeRandomAccessFile(file);
                     }
                     dataFile = wb.dataFile;
-                    file = dataFile.openRandomAccessFile(true);
+                    file = dataFile.openRandomAccessFile();
+                    if( file.length() < dataManager.preferedFileLength ) {
+                        file.setLength(dataManager.preferedFileLength);
+                    }
                     channel = file.getChannel();
                 }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
Wed Sep 17 06:59:36 2008
@@ -32,17 +32,8 @@
         super(file, number, preferedSize);
     }
     
-    
     public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
-        RandomAccessFile rc = new RandomAccessFile(file, "r");
-        // When we start to write files size them up so that the OS has a chance
-        // to allocate the file contigously.
-        if (appender) {
-            if (length < preferedSize) {
-                rc.setLength(preferedSize);
-            }
-        }
-        return rc;
+        return new RandomAccessFile(file, "r");
     }
 
     public void closeRandomAccessFile(RandomAccessFile file) throws IOException {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java Wed
Sep 17 06:59:36 2008
@@ -67,7 +67,7 @@
                 int num = Integer.parseInt(numStr);
                 DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
                 fileMap.put(dataFile.getDataFileId(), dataFile);
-                storeSize.addAndGet(dataFile.getLength());
+                totalLength.addAndGet(dataFile.getLength());
             } catch (NumberFormatException e) {
                 // Ignore file that do not match the pattern.
             }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Wed Sep 17
06:59:36 2008
@@ -111,7 +111,7 @@
     // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
     private boolean enableSyncedWrites=true;
     // Will writes be done in an async thread?
-    private boolean enableAsyncWrites=true;
+    private boolean enableAsyncWrites=false;
 
     // These are used if enableAsyncWrites==true 
     private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -122,7 +122,7 @@
     private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
 
     // Keeps track of free pages.
-    private long nextFreePageId;
+    private final AtomicLong nextFreePageId = new AtomicLong();
     private SequenceSet freeList = new SequenceSet();
     
     private AtomicLong nextTxid = new AtomicLong();
@@ -345,7 +345,7 @@
             if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
                 writeFile.setLength(PAGE_FILE_HEADER_SIZE);
             }
-            nextFreePageId=(writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize;
+            nextFreePageId.set((writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize);
             startWriter();
                 
         } else {
@@ -662,14 +662,14 @@
     }
 
     public long getDiskSize() throws IOException {
-        return readFile.length();
+        return toOffset(nextFreePageId.get());
     }
     
     /**
      * @return the number of pages allocated in the PageFile
      */
     public long getPageCount() {
-        return nextFreePageId;
+        return nextFreePageId.get();
     }
 
     public int getRecoveryFileMinPageCount() {
@@ -732,7 +732,7 @@
             Page<T> first = null;
             int c = count;
             while (c > 0) {
-                Page<T> page = new Page<T>(nextFreePageId++);
+                Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
                 page.makeFree(getNextWriteTransactionId());
 
                 if (first == null) {
@@ -1039,9 +1039,10 @@
                 checksum.update(data, 0, pageSize);
                 batch.put(offset, data);
             }
-        } catch (IllegalStateException e) {
+        } catch (Exception e) {
             // If an error occurred it was cause the redo buffer was not full written out
correctly.. so don't redo it. 
             // as the pages should still be consistent.
+            LOG.debug("Redo buffer was not fully intact: ", e);
             return nextTxId;
         }
         

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
Wed Sep 17 06:59:36 2008
@@ -460,11 +460,7 @@
             return 0;
         }
         try {
-            long diskSize; 
-            synchronized(indexMutex) {
-                diskSize = pageFile.getDiskSize();
-            }
-            return asyncDataManager.getDiskSize() + diskSize;
+            return asyncDataManager.getDiskSize() + pageFile.getDiskSize();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Wed
Sep 17 06:59:36 2008
@@ -151,7 +151,8 @@
     protected boolean recovering;
     protected Thread checkpointThread;
     protected boolean syncWrites;
-    int checkpointInterval = 30*1000;
+    int checkpointInterval = 1000;
+    int cleanupInterval = 30*1000;
     
     protected AtomicBoolean started = new AtomicBoolean();
 
@@ -233,6 +234,7 @@
                     metadata.destinations.load(tx);
                 }
             });
+            pageFile.flush();
             
             // Load up all the destinations since we need to scan all the indexes to figure
out which journal files can be deleted.
             // Perhaps we should just keep an index of file 
@@ -256,13 +258,22 @@
         checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
             public void run() {
                 try {
-                    long start = System.currentTimeMillis();
+                    long lastCleanup = System.currentTimeMillis();
+                    long lastCheckpoint = System.currentTimeMillis();
+                    
+                    // Sleep for a short time so we can periodically check 
+                    // to see if we need to exit this thread.
+                    long sleepTime = Math.min(checkpointInterval, 500);
                     while (started.get()) {
-                        Thread.sleep(500);
+                        Thread.sleep(sleepTime);
                         long now = System.currentTimeMillis();
-                        if( now - start >= checkpointInterval ) {
-                            checkpoint();
-                            start = now;
+                        if( now - lastCleanup >= cleanupInterval ) {
+                            checkpoint(true);
+                            lastCleanup = now;
+                            lastCheckpoint = now;
+                        } else if( now - lastCheckpoint >= checkpointInterval ) {
+                            checkpoint(false);
+                            lastCheckpoint = now;
                         }
                     }
                 } catch (InterruptedException e) {
@@ -363,12 +374,12 @@
         LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start)
/ 1000.0f) + " seconds.");
     }
 
-    private void checkpoint() {
+    private void checkpoint(final boolean cleanup) {
         try {
             synchronized (indexMutex) {
                 pageFile.tx().execute(new Transaction.Closure<IOException>() {
                     public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx);
+                        checkpointUpdate(tx, cleanup);
                     }
                 });
                 pageFile.flush();
@@ -674,58 +685,57 @@
      * @param tx
      * @throws IOException
      */
-    private void checkpointUpdate(Transaction tx) throws IOException {
+    private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
 
         LOG.debug("Checkpoint started.");
-
-        // Find empty journal files to remove.
-        final HashSet<Integer> inUseFiles = new HashSet<Integer>();
-        
         
-        for (StoredDestination sd : storedDestinations.values()) {
-            // Use a visitor to cut down the number of pages that we load
-            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
-                int last=-1;
-                public boolean isInterestedInKeysBetween(Location first, Location second)
{
-                    if( second!=null ) {
-                        if( last+1 == second.getDataFileId() ) {
-                            last++;
-                            inUseFiles.add(last);
-                        }
-                        if( last == second.getDataFileId() ) {
-                            return false;
-                        }
-                    }
-                    return true;
-                }
-
-                public void visit(List<Location> keys, List<Long> values) {
-                    for (int i = 0; i < keys.size(); i++) {
-                        if( last != keys.get(i).getDataFileId() ) {
-                            inUseFiles.add(keys.get(i).getDataFileId());
-                            last = keys.get(i).getDataFileId();
-                        }
-                    }
-                    
-                }
-
-            });
-        }
-                
         metadata.state = OPEN_STATE;
         metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
         tx.store(metadata.page, metadataMarshaller, true);
+        pageFile.flush();
 
-        Location l = metadata.lastUpdate;
-        if( metadata.firstInProgressTransactionLocation!=null ) {
-            l = metadata.firstInProgressTransactionLocation;
+        if( cleanup ) {
+            // Find empty journal files to remove.
+            final HashSet<Integer> inUseFiles = new HashSet<Integer>();
+            for (StoredDestination sd : storedDestinations.values()) {
+                // Use a visitor to cut down the number of pages that we load
+                sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+                    int last=-1;
+                    public boolean isInterestedInKeysBetween(Location first, Location second)
{
+                        if( second!=null ) {
+                            if( last+1 == second.getDataFileId() ) {
+                                last++;
+                                inUseFiles.add(last);
+                            }
+                            if( last == second.getDataFileId() ) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    }
+    
+                    public void visit(List<Location> keys, List<Long> values)
{
+                        for (int i = 0; i < keys.size(); i++) {
+                            if( last != keys.get(i).getDataFileId() ) {
+                                inUseFiles.add(keys.get(i).getDataFileId());
+                                last = keys.get(i).getDataFileId();
+                            }
+                        }
+                        
+                    }
+    
+                });
+            }
+            
+            Location l = metadata.lastUpdate;
+            if( metadata.firstInProgressTransactionLocation!=null ) {
+                l = metadata.firstInProgressTransactionLocation;
+            }
+            
+            LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
+            asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
         }
         
-        LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
-
-        pageFile.flush();
-        asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
-        
         LOG.debug("Checkpoint done.");
     }
 
@@ -1131,4 +1141,12 @@
         this.checkpointInterval = checkpointInterval;
     }
 
+    public int getCleanupInterval() {
+        return cleanupInterval;
+    }
+
+    public void setCleanupInterval(int cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
 }

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java?rev=696308&r1=696307&r2=696308&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
(original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaStoreDurableTopicTest.java
Wed Sep 17 06:59:36 2008
@@ -29,7 +29,7 @@
     protected void configureBroker(BrokerService answer,String uri) throws Exception {
         File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
         dataFileDir.mkdirs();
-        answer.setDeleteAllMessagesOnStartup(true);
+        // answer.setDeleteAllMessagesOnStartup(true);
                
          KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
          adaptor.setDirectory(dataFileDir);



Mime
View raw message