Author: chirino
Date: Thu Jan 4 01:34:46 2007
New Revision: 492471
URL: http://svn.apache.org/viewvc?view=rev&rev=492471
Log:
Fixed a ton of Quick store bugs that were found when running the QuickStoreLoadTester.
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java
- copied, changed from r492380, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java
- copied, changed from r492373, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java
Removed:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
Thu Jan 4 01:34:46 2007
@@ -206,7 +206,7 @@
}
}
- private ByteSequence marshallState() throws IOException {
+ private synchronized ByteSequence marshallState() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@@ -338,9 +338,7 @@
synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
if(dataFile!=null){
if(dataFile.decrement()<=0){
- if(dataFile!=currentWriteFile){
- removeDataFile(dataFile);
- }
+ removeDataFile(dataFile);
}
}
}
@@ -355,21 +353,18 @@
List<DataFile> purgeList=new ArrayList<DataFile>();
for (Integer key : unUsed) {
DataFile dataFile=(DataFile) fileMap.get(key);
- if( dataFile!=currentWriteFile ) {
- purgeList.add(dataFile);
- }
+ purgeList.add(dataFile);
}
for (DataFile dataFile : purgeList) {
- removeDataFile(dataFile);
+ removeDataFile(dataFile);
}
-
}
public synchronized void consolidateDataFiles() throws IOException{
List<DataFile> purgeList=new ArrayList<DataFile>();
for (DataFile dataFile : fileMap.values()) {
- if(dataFile.isUnused() && dataFile != currentWriteFile){
+ if( dataFile.isUnused() ){
purgeList.add(dataFile);
}
}
@@ -379,12 +374,21 @@
}
private void removeDataFile(DataFile dataFile) throws IOException{
+
+ // Make sure we don't delete too much data.
+ if( dataFile==currentWriteFile || mark==null || dataFile.getDataFileId() >= mark.getDataFileId()
) {
+ return;
+ }
+
+ accessorPool.disposeDataFileAccessors(dataFile);
+
fileMap.remove(dataFile.getDataFileId());
dataFile.unlink();
- accessorPool.disposeDataFileAccessors(dataFile);
boolean result=dataFile.delete();
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
+
}
+
/**
* @return the maxFileLength
@@ -479,8 +483,10 @@
return rc;
}
- public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException
{
- mark = location;
+ public void setMark(Location location, boolean sync) throws IOException, IllegalStateException
{
+ synchronized(this) {
+ mark = location;
+ }
storeState(sync);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
Thu Jan 4 01:34:46 2007
@@ -36,9 +36,12 @@
int MAX_OPEN_READERS_PER_FILE=5;
class Pool {
+
private final DataFile file;
private final ArrayList<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
private boolean used;
+ private int openCounter;
+ private boolean disposed;
public Pool(DataFile file) {
this.file = file;
@@ -52,12 +55,14 @@
rc = (DataFileAccessor) pool.remove(pool.size()-1);
}
used=true;
+ openCounter++;
return rc;
}
public void closeDataFileReader(DataFileAccessor reader) {
+ openCounter--;
used=true;
- if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) {
+ if(pool.size() >= MAX_OPEN_READERS_PER_FILE || disposed) {
reader.dispose();
} else {
pool.add(reader);
@@ -77,6 +82,11 @@
reader.dispose();
}
pool.clear();
+ disposed=true;
+ }
+
+ public int getOpenCounter() {
+ return openCounter;
}
}
@@ -102,17 +112,17 @@
}
}
- synchronized void disposeDataFileAccessors(DataFile dataFile) throws IOException {
+ synchronized void disposeDataFileAccessors(DataFile dataFile) {
if( closed ) {
- throw new IOException("Closed.");
+ throw new IllegalStateException("Closed.");
}
Pool pool = pools.get(dataFile.getDataFileId());
if( pool != null ) {
- if( !pool.isUsed() ) {
+ if( pool.getOpenCounter()==0 ) {
pool.dispose();
pools.remove(dataFile.getDataFileId());
} else {
- throw new IOException("The data file is still in use: "+dataFile);
+ throw new IllegalStateException("The data file is still in use: "+dataFile+", use count:
"+pool.getOpenCounter());
}
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Thu Jan 4 01:34:46 2007
@@ -77,7 +77,7 @@
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry))
{
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
- if(msg.messageId.equals(identity)){
+ if(msg.messageId.equals(identity.toString())){
result=msg;
cache.put(identity,entry);
break;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
Thu Jan 4 01:34:46 2007
@@ -18,12 +18,15 @@
package org.apache.activemq.store.quick;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@@ -38,6 +41,8 @@
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
@@ -66,14 +71,26 @@
private LinkedHashMap<MessageId, ReferenceData> cpAddedMessageIds;
protected Location lastLocation;
+ protected Location lastWrittenLocation;
+
protected HashSet<Location> inFlightTxLocations = new HashSet<Location>();
+ protected final TaskRunner asyncWriteTask;
+ protected CountDownLatch flushLatch;
+ private final AtomicReference<Location> mark = new AtomicReference<Location>();
+
public QuickMessageStore(QuickPersistenceAdapter adapter, ReferenceStore referenceStore,
ActiveMQDestination destination) {
this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore;
this.destination = destination;
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
+
+ asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task(){
+ public boolean iterate() {
+ asyncWrite();
+ return false;
+ }}, "Checkpoint: "+destination);
}
public void setUsageManager(UsageManager usageManager) {
@@ -123,7 +140,7 @@
}
}
- private void addMessage(final Message message, final Location location) {
+ private void addMessage(final Message message, final Location location) throws InterruptedIOException
{
ReferenceData data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());
@@ -132,6 +149,11 @@
lastLocation = location;
messages.put(message.getMessageId(), data);
}
+ try {
+ asyncWriteTask.wakeup();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
}
public void replayAddMessage(ConnectionContext context, Message message, Location location)
{
@@ -193,15 +215,24 @@
}
}
- private void removeMessage(final MessageAck ack, final Location location) {
- synchronized (this) {
+ private void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException
{
+ ReferenceData data;
+ synchronized (this) {
lastLocation = location;
MessageId id = ack.getLastMessageId();
- ReferenceData data = messages.remove(id);
+ data = messages.remove(id);
if (data == null) {
messageAcks.add(ack);
}
}
+
+ if (data == null) {
+ try {
+ asyncWriteTask.wakeup();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
}
public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
@@ -216,34 +247,77 @@
log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
+ "'. Message may have already been acknowledged. reason: " + e);
}
}
+
+ /**
+ * Waits till the lastest data has landed on the referenceStore
+ * @throws InterruptedIOException
+ */
+ public void flush() throws InterruptedIOException {
+ log.debug("flush");
+ CountDownLatch countDown;
+ synchronized(this) {
+ if( lastWrittenLocation == lastLocation ) {
+ return;
+ }
+ if( flushLatch== null ) {
+ flushLatch = new CountDownLatch(1);
+ }
+ countDown = flushLatch;
+ }
+ try {
+ asyncWriteTask.wakeup();
+ countDown.await();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
/**
* @return
* @throws IOException
*/
- public Location checkpoint() throws IOException {
- return checkpoint(null);
+ private void asyncWrite() {
+ try {
+ CountDownLatch countDown;
+ synchronized(this) {
+ countDown = flushLatch;
+ flushLatch = null;
+ }
+
+ mark.set(doAsyncWrite());
+
+ if ( countDown != null ) {
+ countDown.countDown();
+ }
+ } catch (IOException e) {
+ log.error("Checkpoint failed: "+e, e);
+ }
}
/**
* @return
* @throws IOException
*/
- public Location checkpoint(final Callback postCheckpointTest) throws IOException {
+ protected Location doAsyncWrite() throws IOException {
final ArrayList<MessageAck> cpRemovedMessageLocations;
final ArrayList<Location> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
-
+ final Location lastLocation;
+
// swap out the message hash maps..
synchronized (this) {
cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations=new ArrayList<Location>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, ReferenceData>();
- this.messageAcks = new ArrayList<MessageAck>();
+ this.messageAcks = new ArrayList<MessageAck>();
+ lastLocation = this.lastLocation;
}
+ if( log.isDebugEnabled() )
+ log.debug("Doing batch update... adding: "+cpAddedMessageIds.size()+" removing:
"+cpRemovedMessageLocations.size()+" ");
+
transactionTemplate.run(new Callback() {
public void execute() throws Exception {
@@ -284,15 +358,15 @@
}
}
- if( postCheckpointTest!= null ) {
- postCheckpointTest.execute();
- }
}
});
+
+ log.debug("Batch update done.");
synchronized (this) {
cpAddedMessageIds = null;
+ lastWrittenLocation = lastLocation;
}
if( cpActiveJournalLocations.size() > 0 ) {
@@ -338,7 +412,7 @@
}
/**
- * Replays the checkpointStore first as those messages are the oldest ones,
+ * Replays the referenceStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
@@ -346,7 +420,7 @@
* @throws Exception
*/
public void recover(final MessageRecoveryListener listener) throws Exception {
- peristenceAdapter.checkpoint(true);
+ flush();
referenceStore.recover(new RecoveryListenerAdapter(this, listener));
}
@@ -355,6 +429,7 @@
}
public void stop() throws Exception {
+ asyncWriteTask.shutdown();
referenceStore.stop();
}
@@ -369,7 +444,7 @@
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException {
- peristenceAdapter.checkpoint(true);
+ flush();
referenceStore.removeAllMessages(context);
}
@@ -391,13 +466,12 @@
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount() throws IOException{
- peristenceAdapter.checkpoint(true);
+ flush();
return referenceStore.getMessageCount();
}
-
- public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws
Exception{
- peristenceAdapter.checkpoint(true);
+ public void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws
Exception{
+ flush();
referenceStore.recoverNextMessages(maxReturned,new RecoveryListenerAdapter(this,
listener));
}
@@ -407,5 +481,9 @@
referenceStore.resetBatching();
}
+
+ public Location getMark() {
+ return mark.get();
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
Thu Jan 4 01:34:46 2007
@@ -19,19 +19,12 @@
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.Journal;
@@ -94,12 +87,14 @@
private WireFormat wireFormat = new OpenWireFormat();
private UsageManager usageManager;
- private long checkpointInterval = 1000 * 30;
+
+ private long cleanupInterval = 1000 * 10;
+ private long checkpointInterval = 1000 * 10;
+
private int maxCheckpointWorkers = 1;
private int maxCheckpointMessageAddSize = 1024*4;
private QuickTransactionStore transactionStore = new QuickTransactionStore(this);
- private ThreadPoolExecutor checkpointExecutor;
private TaskRunner checkpointTask;
private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
@@ -111,6 +106,7 @@
private boolean deleteAllMessages;
private File directory = new File("activemq-data/quick");
+
public synchronized void start() throws Exception {
if( !started.compareAndSet(false, true) )
@@ -152,7 +148,13 @@
Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
for (Integer fileId : files) {
- asyncDataManager.addInterestInFile(fileId);
+ try {
+ asyncDataManager.addInterestInFile(fileId);
+ } catch (IOException e) {
+ // We can expect these since referenceStoreAdapter is a litle behind in updates
+ // and it might think it has references to data files that have allready come and gone..
+ // This should get resolved once recovery kicks in.
+ }
}
checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
@@ -161,15 +163,7 @@
return false;
}
}, "ActiveMQ Journal Checkpoint Worker");
-
- checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers,
30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runable) {
- Thread t = new Thread(runable, "Journal checkpoint worker");
- t.setPriority(7);
- return t;
- }
- });
-
+
createTransactionStore();
recover();
@@ -187,7 +181,7 @@
cleanup();
}
};
- Scheduler.executePeriodically(periodicCleanupTask, checkpointInterval);
+ Scheduler.executePeriodically(periodicCleanupTask, cleanupInterval);
}
@@ -200,11 +194,22 @@
this.usageManager.removeUsageListener(this);
Scheduler.cancel(periodicCheckpointTask);
+
+ Iterator<QuickMessageStore> iterator = queues.values().iterator();
+ while (iterator.hasNext()) {
+ QuickMessageStore ms = iterator.next();
+ ms.stop();
+ }
+
+ iterator = topics.values().iterator();
+ while (iterator.hasNext()) {
+ final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
+ ms.stop();
+ }
+
// Take one final checkpoint and stop checkpoint processing.
checkpoint(true);
- checkpointTask.shutdown();
- log.debug("Checkpoint task shutdown");
- checkpointExecutor.shutdown();
+ checkpointTask.shutdown();
queues.clear();
topics.clear();
@@ -268,54 +273,23 @@
log.debug("Checkpoint started.");
Location newMark = null;
- ArrayList<FutureTask> futureTasks = new ArrayList<FutureTask>(queues.size()+topics.size());
-
- //
Iterator<QuickMessageStore> iterator = queues.values().iterator();
while (iterator.hasNext()) {
- try {
- final QuickMessageStore ms = iterator.next();
- FutureTask<Location> task = new FutureTask<Location>(new
Callable<Location>() {
- public Location call() throws Exception {
- return ms.checkpoint();
- }});
- futureTasks.add(task);
- checkpointExecutor.execute(task);
- }
- catch (Exception e) {
- log.error("Failed to checkpoint a message store: " + e, e);
+ final QuickMessageStore ms = iterator.next();
+ Location mark = (Location) ms.getMark();
+ if (mark != null && (newMark == null || newMark.compareTo(mark) <
0)) {
+ newMark = mark;
}
}
iterator = topics.values().iterator();
while (iterator.hasNext()) {
- try {
- final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
- FutureTask<Location> task = new FutureTask<Location>(new
Callable<Location>() {
- public Location call() throws Exception {
- return ms.checkpoint();
- }});
- futureTasks.add(task);
- checkpointExecutor.execute(task);
- }
- catch (Exception e) {
- log.error("Failed to checkpoint a message store: " + e, e);
- }
- }
-
- try {
- for (Iterator<FutureTask> iter = futureTasks.iterator(); iter.hasNext();)
{
- FutureTask ft = iter.next();
- Location mark = (Location) ft.get();
- // We only set a newMark on full checkpoints.
- if (mark != null && (newMark == null || newMark.compareTo(mark)
< 0)) {
- newMark = mark;
- }
+ final QuickTopicMessageStore ms = (QuickTopicMessageStore) iterator.next();
+ Location mark = (Location) ms.getMark();
+ if (mark != null && (newMark == null || newMark.compareTo(mark) <
0)) {
+ newMark = mark;
}
- } catch (Throwable e) {
- log.error("Failed to checkpoint a message store: " + e, e);
}
-
try {
if (newMark != null) {
@@ -354,10 +328,8 @@
public void cleanup() {
try {
-
Set<Integer> inUse = referenceStoreAdapter.getReferenceFileIdsInUse();
asyncDataManager.consolidateDataFilesNotIn(inUse);
-
} catch (IOException e) {
log.error("Could not cleanup data files: "+e, e);
}
@@ -386,6 +358,11 @@
if (store == null) {
ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
store = new QuickMessageStore(this, checkpointStore, destination);
+ try {
+ store.start();
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
queues.put(destination, store);
}
return store;
@@ -396,6 +373,11 @@
if (store == null) {
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
store = new QuickTopicMessageStore(this, checkpointStore, destinationName);
+ try {
+ store.start();
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
topics.put(destinationName, store);
}
return store;
@@ -445,7 +427,7 @@
* @throws InvalidLocationException
* @throws IllegalStateException
*/
- private void recover() throws IllegalStateException, IOException, IOException {
+ private void recover() throws IllegalStateException, IOException {
Location pos = null;
int transactionCounter = 0;
@@ -594,8 +576,7 @@
newPercentUsage = ((newPercentUsage)/10)*10;
oldPercentUsage = ((oldPercentUsage)/10)*10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
- boolean sync = newPercentUsage >= 90;
- checkpoint(sync);
+ checkpoint(false);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
Thu Jan 4 01:34:46 2007
@@ -18,13 +18,13 @@
package org.apache.activemq.store.quick;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
-import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location;
@@ -49,18 +49,18 @@
private TopicReferenceStore topicReferenceStore;
private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey,
MessageId>();
- public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore checkpointStore,
ActiveMQTopic destinationName) {
- super(adapter, checkpointStore, destinationName);
- this.topicReferenceStore = checkpointStore;
+ public QuickTopicMessageStore(QuickPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore,
ActiveMQTopic destinationName) {
+ super(adapter, topicReferenceStore, destinationName);
+ this.topicReferenceStore = topicReferenceStore;
}
public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener
listener) throws Exception {
- this.peristenceAdapter.checkpoint(true);
+ flush();
topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this,
listener));
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
final MessageRecoveryListener listener) throws Exception{
- this.peristenceAdapter.checkpoint(true);
+ flush();
topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned,
new RecoveryListenerAdapter(this, listener));
}
@@ -69,14 +69,10 @@
}
public void addSubsciption(String clientId, String subscriptionName, String selector,
boolean retroactive) throws IOException {
- this.peristenceAdapter.checkpoint(true);
+ flush();
topicReferenceStore.addSubsciption(clientId, subscriptionName, selector, retroactive);
}
- public void addMessage(ConnectionContext context, Message message) throws IOException
{
- super.addMessage(context, message);
- }
-
/**
*/
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
final MessageId messageId) throws IOException {
@@ -141,27 +137,35 @@
* @param messageId
* @param location
* @param key
+ * @throws InterruptedIOException
*/
- private void acknowledge(MessageId messageId, Location location, SubscriptionKey key)
{
+ private void acknowledge(MessageId messageId, Location location, SubscriptionKey key)
throws InterruptedIOException {
synchronized(this) {
lastLocation = location;
ackedLastAckLocations.put(key, messageId);
}
+ try {
+ asyncWriteTask.wakeup();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
}
- public Location checkpoint() throws IOException {
-
- final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
+ @Override
+ protected Location doAsyncWrite() throws IOException {
+
+ final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
// swap out the hash maps..
synchronized (this) {
cpAckedLastAckLocations = this.ackedLastAckLocations;
this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
}
-
- return super.checkpoint( new Callback() {
+
+ Location location = super.doAsyncWrite();
+
+ transactionTemplate.run(new Callback() {
public void execute() throws Exception {
-
// Checkpoint the acknowledged messages.
Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
while (iterator.hasNext()) {
@@ -169,12 +173,12 @@
MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
subscriptionKey.subscriptionName, identity);
}
-
}
- });
-
+ } );
+
+ return location;
}
-
+
/**
* @return Returns the longTermStore.
*/
@@ -192,7 +196,7 @@
public int getMessageCount(String clientId,String subscriberName) throws IOException{
- this.peristenceAdapter.checkpoint(true);
+ flush();
return topicReferenceStore.getMessageCount(clientId,subscriberName);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/RecoveryListenerAdapter.java
Thu Jan 4 01:34:46 2007
@@ -21,9 +21,12 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
final class RecoveryListenerAdapter implements MessageRecoveryListener {
-
+ static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class);
+
private final MessageStore store;
private final MessageRecoveryListener listener;
@@ -45,6 +48,12 @@
}
public void recoverMessageReference(MessageId ref) throws Exception {
- listener.recoverMessage( this.store.getMessage(ref) );
+ Message message = this.store.getMessage(ref);
+ if( message !=null ){
+ listener.recoverMessage( message );
+ } else {
+ log.error("Message id "+ref+" could not be recovered from the data store!");
+ }
+
}
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
Thu Jan 4 01:34:46 2007
@@ -17,6 +17,7 @@
*/
package org.apache.activemq;
+import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -32,8 +33,6 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
@@ -105,6 +104,12 @@
protected void setUp() throws Exception {
super.setUp();
+
+ if(System.getProperty("basedir")==null){
+ File file=new File(".");
+ System.setProperty("basedir",file.getAbsolutePath());
+ }
+
broker = createBroker();
broker.start();
factory = createConnectionFactory();
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java?view=diff&rev=492471&r1=492470&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/LoadTester.java
Thu Jan 4 01:34:46 2007
@@ -45,6 +45,9 @@
*/
public class LoadTester extends JmsTestSupport {
+ protected int MESSAGE_SIZE=1024*64;
+ protected int PRODUCE_COUNT=10000;
+
protected BrokerService createBroker() throws Exception {
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/store/loadtester.xml"));
}
@@ -56,8 +59,6 @@
}
public void testQueueSendThenAddConsumer() throws Exception {
- int MESSAGE_SIZE=1024*64;
- int PRODUCE_COUNT=10000;
ProgressPrinter printer = new ProgressPrinter(PRODUCE_COUNT, 20);
ActiveMQDestination destination = new ActiveMQQueue("TEST");
Copied: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java
(from r492380, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java?view=diff&rev=492471&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java&r1=492380&p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalRecoveryBrokerTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreRecoveryBrokerTest.java
Thu Jan 4 01:34:46 2007
@@ -28,7 +28,7 @@
*
* @version $Revision$
*/
-public class QuickJournalRecoveryBrokerTest extends RecoveryBrokerTest {
+public class QuickStoreRecoveryBrokerTest extends RecoveryBrokerTest {
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
@@ -46,7 +46,7 @@
}
public static Test suite() {
- return suite(QuickJournalRecoveryBrokerTest.class);
+ return suite(QuickStoreRecoveryBrokerTest.class);
}
public static void main(String[] args) {
Copied: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java
(from r492373, incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java?view=diff&rev=492471&p1=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java&r1=492373&p2=incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java&r2=492471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickJournalXARecoveryBrokerTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/store/QuickStoreXARecoveryBrokerTest.java
Thu Jan 4 01:34:46 2007
@@ -21,17 +21,17 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.XARecoveryBrokerTest;
-import org.apache.activemq.store.DefaultPersistenceAdapterFactory;
+import org.apache.activemq.store.quick.QuickPersistenceAdapter;
/**
* Used to verify that recovery works correctly against
*
* @version $Revision$
*/
-public class QuickJournalXARecoveryBrokerTest extends XARecoveryBrokerTest {
+public class QuickStoreXARecoveryBrokerTest extends XARecoveryBrokerTest {
public static Test suite() {
- return suite(QuickJournalXARecoveryBrokerTest.class);
+ return suite(QuickStoreXARecoveryBrokerTest.class);
}
public static void main(String[] args) {
@@ -41,15 +41,15 @@
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
service.setDeleteAllMessagesOnStartup(true);
- DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory();
- factory.setUseQuickJournal(true);
+ QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+ service.setPersistenceAdapter(pa);
return service;
}
protected BrokerService createRestartedBroker() throws Exception {
BrokerService service = new BrokerService();
- DefaultPersistenceAdapterFactory factory = (DefaultPersistenceAdapterFactory) service.getPersistenceFactory();
- factory.setUseQuickJournal(true);
+ QuickPersistenceAdapter pa = new QuickPersistenceAdapter();
+ service.setPersistenceAdapter(pa);
return service;
}
|