activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r492471 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/impl/async/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activemq/store/quick/ test/java/org/apache/activemq/ test/java/org...
Date Thu, 04 Jan 2007 09:34:48 GMT
Author: chirino
Date: Thu Jan  4 01:34:46 2007
New Revision: 492471

URL: http://svn.apache.org/viewvc?view=rev&rev=492471
Log:
Fixed a ton of Quick store bugs that were found when running the QuickStoreLoadTester.

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java
      - copied, changed from r492380, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java
      - copied, changed from r492373, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java
Removed:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
Thu Jan  4 01:34:46 2007
@@ -206,7 +206,7 @@
     	}
 	}
     
-    private ByteSequence marshallState() throws IOException {
+    private synchronized ByteSequence marshallState() throws IOException {
     	ByteArrayOutputStream baos = new ByteArrayOutputStream();
     	DataOutputStream dos = new DataOutputStream(baos);
 
@@ -338,9 +338,7 @@
     synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
         if(dataFile!=null){
             if(dataFile.decrement()<=0){
-                if(dataFile!=currentWriteFile){
-                    removeDataFile(dataFile);
-                }
+                removeDataFile(dataFile);
             }
         }
     }
@@ -355,21 +353,18 @@
         List<DataFile> purgeList=new ArrayList<DataFile>();
 		for (Integer key : unUsed) {
             DataFile dataFile=(DataFile) fileMap.get(key);
-            if( dataFile!=currentWriteFile ) {
-                purgeList.add(dataFile);
-            }
+            purgeList.add(dataFile);
 		}
 		
         for (DataFile dataFile : purgeList) {
-            removeDataFile(dataFile);
+			removeDataFile(dataFile);
 		}
-        
 	}
 
     public synchronized void consolidateDataFiles() throws IOException{
         List<DataFile> purgeList=new ArrayList<DataFile>();
         for (DataFile dataFile : fileMap.values()) {
-            if(dataFile.isUnused() && dataFile != currentWriteFile){
+            if( dataFile.isUnused() ){
                 purgeList.add(dataFile);
             }
         }
@@ -379,12 +374,21 @@
     }
 
     private void removeDataFile(DataFile dataFile) throws IOException{
+
+    	// Make sure we don't delete too much data.
+        if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId()
) {
+        	return;
+        }
+
+        accessorPool.disposeDataFileAccessors(dataFile);
+        
         fileMap.remove(dataFile.getDataFileId());
         dataFile.unlink();
-        accessorPool.disposeDataFileAccessors(dataFile);        
         boolean result=dataFile.delete();
         log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
+        
     }
+        
 
     /**
      * @return the maxFileLength
@@ -479,8 +483,10 @@
 		return rc;
 	}
 
-	public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException
{
-		mark = location;
+	public void setMark(Location location, boolean sync) throws IOException, IllegalStateException
{
+		synchronized(this) {
+			mark = location;
+		}
 		storeState(sync);
 	}
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
Thu Jan  4 01:34:46 2007
@@ -36,9 +36,12 @@
 	int MAX_OPEN_READERS_PER_FILE=5;
 	
 	class Pool {
+		
 		private final DataFile file;
 		private final ArrayList<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
 		private boolean used; 
+		private int openCounter;
+		private boolean disposed;
 		
 		public Pool(DataFile file) {
 			this.file = file;
@@ -52,12 +55,14 @@
 				rc = (DataFileAccessor) pool.remove(pool.size()-1);
 			}
 			used=true;
+			openCounter++;
 			return rc;
 		}
 
 		public void closeDataFileReader(DataFileAccessor reader) {
+			openCounter--;
 			used=true;
-			if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) {
+			if(pool.size() >= MAX_OPEN_READERS_PER_FILE || disposed) {
 				reader.dispose();
 			} else {
 				pool.add(reader);
@@ -77,6 +82,11 @@
 				reader.dispose();
 			}
 			pool.clear();
+			disposed=true;
+		}
+
+		public int getOpenCounter() {
+			return openCounter;
 		}
 		
 	}
@@ -102,17 +112,17 @@
 		}
 	}
 	
-	synchronized void disposeDataFileAccessors(DataFile dataFile) throws IOException {
+	synchronized void disposeDataFileAccessors(DataFile dataFile) {
 		if( closed ) {
-			throw new IOException("Closed.");
+			throw new IllegalStateException("Closed.");
 		}		
 		Pool pool = pools.get(dataFile.getDataFileId());
 		if( pool != null ) {
-			if( !pool.isUsed() ) {
+			if( pool.getOpenCounter()==0 ) {
 				pool.dispose();
 				pools.remove(dataFile.getDataFileId());
 			} else {
-				throw new IOException("The data file is still in use: "+dataFile);
+				throw new IllegalStateException("The data file is still in use: "+dataFile+", use count:
"+pool.getOpenCounter());
 			}
 		}
 	}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Thu Jan  4 01:34:46 2007
@@ -77,7 +77,7 @@
         }else{    
             for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry))
{
             	ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
-                if(msg.messageId.equals(identity)){
+                if(msg.messageId.equals(identity.toString())){
                     result=msg;
                     cache.put(identity,entry);
                     break;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
Thu Jan  4 01:34:46 2007
@@ -18,12 +18,15 @@
 package org.apache.activemq.store.quick;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -38,6 +41,8 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.ReferenceStore;
 import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.TransactionTemplate;
@@ -66,14 +71,26 @@
     private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
     
     protected Location lastLocation;
+    protected Location lastWrittenLocation;
+    
     protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
 
+	protected final TaskRunner asyncWriteTask;
+	protected CountDownLatch flushLatch;
+	private final AtomicReference<Location> mark = new AtomicReference<Location>();
+
     public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore,
ActiveMQDestination destination) {
         this.peristenceAdapter = adapter;
         this.transactionStore = adapter.getTransactionStore();
         this.referenceStore = referenceStore;
         this.destination = destination;
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+        
+        asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
+			public boolean iterate() {
+				asyncWrite();
+				return false;
+			}}, "Checkpoint: "+destination);
     }
     
     public void setUsageManager(UsageManager usageManager) {
@@ -123,7 +140,7 @@
         }
     }
 
-    private void addMessage(final Message message, final Location location) {
+    private void addMessage(final Message message, final Location location) throws InterruptedIOException
{
         ReferenceData data = new ReferenceData();
     	data.setExpiration(message.getExpiration());
     	data.setFileId(location.getDataFileId());
@@ -132,6 +149,11 @@
             lastLocation = location;
             messages.put(message.getMessageId(), data);
         }
+        try {
+			asyncWriteTask.wakeup();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
     }
     
     public void replayAddMessage(ConnectionContext context, Message message, Location location)
{
@@ -193,15 +215,24 @@
         }
     }
     
-    private void removeMessage(final MessageAck ack, final Location location) {
-        synchronized (this) {
+    private void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException
{
+    	ReferenceData data;
+    	synchronized (this) {
             lastLocation = location;
             MessageId id = ack.getLastMessageId();
-            ReferenceData data = messages.remove(id);
+            data = messages.remove(id);
             if (data == null) {
                 messageAcks.add(ack);
             }
         }
+    	
+        if (data == null) {
+            try {
+    			asyncWriteTask.wakeup();
+    		} catch (InterruptedException e) {
+    			throw new InterruptedIOException();
+    		}
+        }
     }
     
     public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
@@ -216,34 +247,77 @@
             log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
+ "'.  Message may have already been acknowledged. reason: " + e);
         }
     }
+    
+    /**
+     * Waits till the lastest data has landed on the referenceStore
+     * @throws InterruptedIOException 
+     */
+    public void flush() throws InterruptedIOException {
+    	log.debug("flush");
+    	CountDownLatch countDown;
+    	synchronized(this) {
+    		if( lastWrittenLocation == lastLocation ) {
+    			return;
+    		}
+    		if( flushLatch== null ) {
+    			flushLatch = new CountDownLatch(1);
+    		}
+    		countDown = flushLatch;
+    	}
+    	try {
+        	asyncWriteTask.wakeup();
+	    	countDown.await();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
+	}
 
     /**
      * @return
      * @throws IOException
      */
-    public Location checkpoint() throws IOException {
-        return checkpoint(null);
+    private void asyncWrite() {
+        try {
+        	CountDownLatch countDown;
+        	synchronized(this) {
+        		countDown = flushLatch;
+        		flushLatch = null;
+        	}
+			
+        	mark.set(doAsyncWrite());
+			
+			if ( countDown != null ) {
+				countDown.countDown();
+			}
+		} catch (IOException e) {
+			log.error("Checkpoint failed: "+e, e);
+		}
     }
     
     /**
      * @return
      * @throws IOException
      */
-    public Location checkpoint(final Callback postCheckpointTest) throws IOException {
+    protected Location doAsyncWrite() throws IOException {
 
     	final ArrayList<MessageAck> cpRemovedMessageLocations;
         final ArrayList<Location> cpActiveJournalLocations;
         final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
-
+        final Location lastLocation;
+        
         // swap out the message hash maps..
         synchronized (this) {
             cpAddedMessageIds = this.messages;
             cpRemovedMessageLocations = this.messageAcks;
             cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
           
             this.messages = new LinkedHashMap<MessageId, ReferenceData>();
-            this.messageAcks = new ArrayList<MessageAck>();            
+            this.messageAcks = new ArrayList<MessageAck>();      
+            lastLocation = this.lastLocation;
         }
 
+        if( log.isDebugEnabled() )
+        	log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing:
"+cpRemovedMessageLocations.size()+" ");
+        
         transactionTemplate.run(new Callback() {
             public void execute() throws Exception {
 
@@ -284,15 +358,15 @@
                     }
                 }
                 
-                if( postCheckpointTest!= null ) {
-                    postCheckpointTest.execute();
-                }
             }
 
         });
+        
+        log.debug("Batch update done.");
 
         synchronized (this) {
             cpAddedMessageIds = null;
+            lastWrittenLocation = lastLocation;
         }
         
         if( cpActiveJournalLocations.size() > 0 ) {
@@ -338,7 +412,7 @@
     }
 
     /**
-     * Replays the checkpointStore first as those messages are the oldest ones,
+     * Replays the referenceStore first as those messages are the oldest ones,
      * then messages are replayed from the transaction log and then the cache is
      * updated.
      * 
@@ -346,7 +420,7 @@
      * @throws Exception 
      */
     public void recover(final MessageRecoveryListener listener) throws Exception {
-        peristenceAdapter.checkpoint(true);
+        flush();
         referenceStore.recover(new RecoveryListenerAdapter(this, listener));
     }
 
@@ -355,6 +429,7 @@
     }
 
     public void stop() throws Exception {
+        asyncWriteTask.shutdown();
         referenceStore.stop();
     }
 
@@ -369,7 +444,7 @@
      * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
      */
     public void removeAllMessages(ConnectionContext context) throws IOException {
-        peristenceAdapter.checkpoint(true);
+        flush();
         referenceStore.removeAllMessages(context);
     }
     
@@ -391,13 +466,12 @@
      * @see org.apache.activemq.store.MessageStore#getMessageCount()
      */
     public int getMessageCount() throws IOException{
-        peristenceAdapter.checkpoint(true);
+        flush();
         return referenceStore.getMessageCount();
     }
 
-   
-    public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws
Exception{
-        peristenceAdapter.checkpoint(true);
+	public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws
Exception{
+        flush();
         referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this,
listener));
         
     }
@@ -407,5 +481,9 @@
         referenceStore.resetBatching();
         
     }
+
+	public Location getMark() {
+		return mark.get();
+	}
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
Thu Jan  4 01:34:46 2007
@@ -19,19 +19,12 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activeio.journal.Journal;
@@ -94,12 +87,14 @@
     private WireFormat wireFormat = new OpenWireFormat();
 
     private UsageManager usageManager;
-    private long checkpointInterval = 1000 * 30;
+
+    private long cleanupInterval = 1000 * 10;
+    private long checkpointInterval = 1000 * 10;
+    
     private int maxCheckpointWorkers = 1;
     private int maxCheckpointMessageAddSize = 1024*4;
 
     private QuickTransactionStore transactionStore = new QuickTransactionStore(this);
-    private ThreadPoolExecutor checkpointExecutor;
     
     private TaskRunner checkpointTask;
     private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
@@ -111,6 +106,7 @@
 	private boolean deleteAllMessages;
 	private File directory = new File("activemq-data/quick");
 
+
     
     public synchronized void start() throws Exception {
         if( !started.compareAndSet(false, true) )
@@ -152,7 +148,13 @@
     	
     	Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
     	for (Integer fileId : files) {
-			asyncDataManager.addInterestInFile(fileId);
+			try {
+				asyncDataManager.addInterestInFile(fileId);
+			} catch (IOException e) {
+				// We can expect these since referenceStoreAdapter is a litle behind in updates
+				// and it might think it has references to data files that have allready come and gone..

+				// This should get resolved once recovery kicks in.
+			}
 		}
         
         checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
@@ -161,15 +163,7 @@
                 return false;
             }
         }, "ActiveMQ Journal Checkpoint Worker");
-        
-        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers,
30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-            public Thread newThread(Runnable runable) {
-                Thread t = new Thread(runable, "Journal checkpoint worker");
-                t.setPriority(7);
-                return t;
-            }            
-        });
-        
+                
         createTransactionStore();
         recover();
 
@@ -187,7 +181,7 @@
 	        	cleanup();
 	        }
 	    };
-        Scheduler.executePeriodically(periodicCleanupTask, checkpointInterval);
+        Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
 
     }
 
@@ -200,11 +194,22 @@
         this.usageManager.removeUsageListener(this);        
         Scheduler.cancel(periodicCheckpointTask);
 
+        
+        Iterator<QuickMessageStore> iterator = queues.values().iterator();
+        while (iterator.hasNext()) {
+            QuickMessageStore ms = iterator.next();
+            ms.stop();
+        }
+
+        iterator = topics.values().iterator();
+        while (iterator.hasNext()) {
+            final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
+            ms.stop();
+        }
+        
         // Take one final checkpoint and stop checkpoint processing.
         checkpoint(true);
-        checkpointTask.shutdown();        
-        log.debug("Checkpoint task shutdown");
-        checkpointExecutor.shutdown();
+        checkpointTask.shutdown();   
         
         queues.clear();
         topics.clear();
@@ -268,54 +273,23 @@
             log.debug("Checkpoint started.");
             Location newMark = null;
 
-            ArrayList<FutureTask> futureTasks = new ArrayList<FutureTask>(queues.size()+topics.size());
-            
-            //
             Iterator<QuickMessageStore> iterator = queues.values().iterator();
             while (iterator.hasNext()) {
-                try {
-                    final QuickMessageStore ms = iterator.next();
-                    FutureTask<Location> task = new FutureTask<Location>(new
Callable<Location>() {
-                        public Location call() throws Exception {
-                            return ms.checkpoint();
-                        }});
-                    futureTasks.add(task);
-                    checkpointExecutor.execute(task);                        
-                }
-                catch (Exception e) {
-                    log.error("Failed to checkpoint a message store: " + e, e);
+                final QuickMessageStore ms = iterator.next();
+                Location mark = (Location) ms.getMark();
+                if (mark != null && (newMark == null || newMark.compareTo(mark) <
0)) {
+                    newMark = mark;
                 }
             }
 
             iterator = topics.values().iterator();
             while (iterator.hasNext()) {
-                try {
-                    final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
-                    FutureTask<Location> task = new FutureTask<Location>(new
Callable<Location>() {
-                        public Location call() throws Exception {
-                            return ms.checkpoint();
-                        }});
-                    futureTasks.add(task);
-                    checkpointExecutor.execute(task);                        
-                }
-                catch (Exception e) {
-                    log.error("Failed to checkpoint a message store: " + e, e);
-                }
-            }
-
-            try {
-                for (Iterator<FutureTask> iter = futureTasks.iterator(); iter.hasNext();)
{
-                    FutureTask ft = iter.next();
-                    Location mark = (Location) ft.get();
-                    // We only set a newMark on full checkpoints.
-                    if (mark != null && (newMark == null || newMark.compareTo(mark)
< 0)) {
-                        newMark = mark;
-                    }
+                final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
+                Location mark = (Location) ms.getMark();
+                if (mark != null && (newMark == null || newMark.compareTo(mark) <
0)) {
+                    newMark = mark;
                 }
-            } catch (Throwable e) {
-                log.error("Failed to checkpoint a message store: " + e, e);
             }
-            
 
             try {
                 if (newMark != null) {
@@ -354,10 +328,8 @@
     public void cleanup() {
     	
     	try {
-			
     		Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
 			asyncDataManager.consolidateDataFilesNotIn(inUse);
-			
 		} catch (IOException e) {
             log.error("Could not cleanup data files: "+e, e);
 		}
@@ -386,6 +358,11 @@
         if (store == null) {
         	ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
             store = new QuickMessageStore(this, checkpointStore, destination);
+            try {
+				store.start();
+			} catch (Exception e) {
+				throw IOExceptionSupport.create(e);
+			}
             queues.put(destination, store);
         }
         return store;
@@ -396,6 +373,11 @@
         if (store == null) {
         	TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
             store = new QuickTopicMessageStore(this, checkpointStore, destinationName);
+            try {
+				store.start();
+			} catch (Exception e) {
+				throw IOExceptionSupport.create(e);
+			}
             topics.put(destinationName, store);
         }
         return store;
@@ -445,7 +427,7 @@
      * @throws InvalidLocationException
      * @throws IllegalStateException
      */
-    private void recover() throws IllegalStateException, IOException, IOException {
+    private void recover() throws IllegalStateException, IOException {
 
         Location pos = null;
         int transactionCounter = 0;
@@ -594,8 +576,7 @@
         newPercentUsage = ((newPercentUsage)/10)*10;
         oldPercentUsage = ((oldPercentUsage)/10)*10;
         if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
-            boolean sync = newPercentUsage >= 90;
-            checkpoint(sync);
+            checkpoint(false);
         }
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
Thu Jan  4 01:34:46 2007
@@ -18,13 +18,13 @@
 package org.apache.activemq.store.quick;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.HashMap;
 import java.util.Iterator;
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.JournalTopicAck;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.impl.async.Location;
@@ -49,18 +49,18 @@
     private TopicReferenceStore topicReferenceStore;
 	private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey,
MessageId>();
     
-    public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore checkpointStore,
ActiveMQTopic destinationName) {
-        super(adapter, checkpointStore, destinationName);
-        this.topicReferenceStore = checkpointStore;
+    public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore,
ActiveMQTopic destinationName) {
+        super(adapter, topicReferenceStore, destinationName);
+        this.topicReferenceStore = topicReferenceStore;
     }
     
     public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener
listener) throws Exception {
-        this.peristenceAdapter.checkpoint(true);
+        flush();
         topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this,
listener));
     }
     
     public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
final MessageRecoveryListener listener) throws Exception{
-        this.peristenceAdapter.checkpoint(true);
+        flush();
         topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned,
new RecoveryListenerAdapter(this, listener));        
     }
 
@@ -69,14 +69,10 @@
     }
 
     public void addSubsciption(String clientId, String subscriptionName, String selector,
boolean retroactive) throws IOException {
-        this.peristenceAdapter.checkpoint(true);
+        flush();
         topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
     }
 
-    public void addMessage(ConnectionContext context, Message message) throws IOException
{
-        super.addMessage(context, message);
-    }
-    
     /**
      */
     public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
final MessageId messageId) throws IOException {
@@ -141,27 +137,35 @@
      * @param messageId
      * @param location
      * @param key
+     * @throws InterruptedIOException 
      */
-    private void acknowledge(MessageId messageId, Location location, SubscriptionKey key)
{
+    private void acknowledge(MessageId messageId, Location location, SubscriptionKey key)
throws InterruptedIOException {
         synchronized(this) {
 		    lastLocation = location;
 		    ackedLastAckLocations.put(key, messageId);
 		}
+        try {
+			asyncWriteTask.wakeup();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
     }
     
-    public Location checkpoint() throws IOException {
-        
-		final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
+    @Override
+    protected Location doAsyncWrite() throws IOException {
+
+    	final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
 
         // swap out the hash maps..
         synchronized (this) {
             cpAckedLastAckLocations = this.ackedLastAckLocations;
             this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
         }
-
-        return super.checkpoint( new Callback() {
+        
+    	Location location = super.doAsyncWrite();
+    	    	
+        transactionTemplate.run(new Callback() {
             public void execute() throws Exception {
-
                 // Checkpoint the acknowledged messages.
                 Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
                 while (iterator.hasNext()) {
@@ -169,12 +173,12 @@
                     MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
                     topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
subscriptionKey.subscriptionName, identity);
                 }
-
             }
-        });
-
+        } );
+        
+        return location;
     }
-
+   
     /**
 	 * @return Returns the longTermStore.
 	 */
@@ -192,7 +196,7 @@
 
     
     public int getMessageCount(String clientId,String subscriberName) throws IOException{
-        this.peristenceAdapter.checkpoint(true);
+        flush();
         return topicReferenceStore.getMessageCount(clientId,subscriberName);
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
Thu Jan  4 01:34:46 2007
@@ -21,9 +21,12 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 final class RecoveryListenerAdapter implements MessageRecoveryListener {
-
+	static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class);
+	
 	private final MessageStore store;
 	private final MessageRecoveryListener listener;
 
@@ -45,6 +48,12 @@
 	}
 
 	public void recoverMessageReference(MessageId ref) throws Exception {
-		listener.recoverMessage( this.store.getMessage(ref) );
+		Message message = this.store.getMessage(ref);
+		if( message !=null ){
+			listener.recoverMessage( message );
+		} else {
+			log.error("Message id "+ref+" could not be recovered from the data store!");
+		}
+			
 	}
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
Thu Jan  4 01:34:46 2007
@@ -17,6 +17,7 @@
  */
 package org.apache.activemq;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -32,8 +33,6 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -105,6 +104,12 @@
 
     protected void setUp() throws Exception {
         super.setUp();
+
+    	if(System.getProperty("basedir")==null){
+            File file=new File(".");
+            System.setProperty("basedir",file.getAbsolutePath());
+        }
+
         broker = createBroker();
         broker.start();
         factory = createConnectionFactory();

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java
Thu Jan  4 01:34:46 2007
@@ -45,6 +45,9 @@
  */
 public class LoadTester extends JmsTestSupport {
 
+	protected int MESSAGE_SIZE=1024*64;
+	protected int PRODUCE_COUNT=10000;
+
     protected BrokerService createBroker() throws Exception {
          return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
     }
@@ -56,8 +59,6 @@
     }
     
     public void testQueueSendThenAddConsumer() throws Exception {
-        int MESSAGE_SIZE=1024*64;
-        int PRODUCE_COUNT=10000;
         ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT, 20);
    
         ActiveMQDestination destination = new ActiveMQQueue("TEST");

Copied: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java
(from r492380, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java?view=diff&rev=492471&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java&r1=492380&p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java
Thu Jan  4 01:34:46 2007
@@ -28,7 +28,7 @@
  * 
  * @version $Revision$
  */
-public class QuickJournalRecoveryBrokerTest extends RecoveryBrokerTest {
+public class QuickStoreRecoveryBrokerTest extends RecoveryBrokerTest {
 
     protected BrokerService createBroker() throws Exception {
         BrokerService service = new BrokerService();
@@ -46,7 +46,7 @@
     }
     
     public static Test suite() {
-        return suite(QuickJournalRecoveryBrokerTest.class);
+        return suite(QuickStoreRecoveryBrokerTest.class);
     }
     
     public static void main(String[] args) {

Copied: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java
(from r492373, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java?view=diff&rev=492471&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java&r1=492373&p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java
Thu Jan  4 01:34:46 2007
@@ -21,17 +21,17 @@
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.XARecoveryBrokerTest;
-import org.apache.activemq.store.DefaultPersistenceAdapterFactory;
+import org.apache.activemq.store.quick.QuickPersistenceAdapter;
 
 /**
  * Used to verify that recovery works correctly against 
  * 
  * @version $Revision$
  */
-public class QuickJournalXARecoveryBrokerTest extends XARecoveryBrokerTest {
+public class QuickStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
 
     public static Test suite() {
-        return suite(QuickJournalXARecoveryBrokerTest.class);
+        return suite(QuickStoreXARecoveryBrokerTest.class);
     }
     
     public static void main(String[] args) {
@@ -41,15 +41,15 @@
     protected BrokerService createBroker() throws Exception {
         BrokerService service = new BrokerService();
         service.setDeleteAllMessagesOnStartup(true);
-        DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory();
-        factory.setUseQuickJournal(true);
+        QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+        service.setPersistenceAdapter(pa);
         return service;
     }
     
     protected BrokerService createRestartedBroker() throws Exception {
         BrokerService service = new BrokerService();
-        DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory();
-        factory.setUseQuickJournal(true);
+        QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+        service.setPersistenceAdapter(pa);
         return service;
     }
     



Mime
View raw message