Author: gtully
Date: Mon Mar 30 18:04:20 2009
New Revision: 760075
URL: http://svn.apache.org/viewvc?rev=760075&view=rev
Log:
fix duplicate detection of messages recovered when space limit is reached and fix cursor cache
reenablement when free space becomes available, AMQ-2149
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
Mon Mar 30 18:04:20 2009
@@ -91,7 +91,7 @@
}
/**
- * Checks if this message has beeb seen before
+ * Checks if this message has been seen before
*
* @param message
* @return true if the message is a duplicate
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Mon Mar 30 18:04:20 2009
@@ -271,11 +271,22 @@
this.useCache = useCache;
}
- public synchronized boolean isDuplicate(MessageId messageId) {
+ public synchronized boolean isDuplicate(MessageId messageId) {
+ boolean unique = recordUniqueId(messageId);
+ rollback(messageId);
+ return !unique;
+ }
+
+ /**
+ * records a message id and checks if it is a duplicate
+ * @param messageId
+ * @return true if id is unique, false otherwise.
+ */
+ public synchronized boolean recordUniqueId(MessageId messageId) {
if (!enableAudit || audit==null) {
- return false;
+ return true;
}
- return audit.isDuplicate(messageId);
+ return !audit.isDuplicate(messageId);
}
public synchronized void rollback(MessageId id) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Mar 30 18:04:20 2009
@@ -37,7 +37,7 @@
protected final Destination regionDestination;
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message>
();
private Iterator<Entry<MessageId, Message>> iterator = null;
- protected boolean cacheEnabled=false;
+ private boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
protected int size;
@@ -73,7 +73,7 @@
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
boolean recovered = false;
- if (!isDuplicate(message.getMessageId())) {
+ if (recordUniqueId(message.getMessageId())) {
if (!cached) {
message.setRegionDestination(regionDestination);
if( message.getMemoryUsage()==null ) {
@@ -157,6 +157,9 @@
} else {
if (cacheEnabled) {
cacheEnabled=false;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName()
+ " disabling cache on size:" + size);
+ }
// sync with store on disabling the cache
if (lastCachedId != null) {
setBatch(lastCachedId);
@@ -176,12 +179,15 @@
public final synchronized void remove() {
size--;
- if (size==0 && isStarted() && useCache) {
- cacheEnabled=true;
- }
if (iterator!=null) {
iterator.remove();
}
+ if (size==0 && isStarted() && useCache && hasSpace() &&
getStoreSize() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() +
" enabling cache on last remove");
+ }
+ cacheEnabled=true;
+ }
}
public final synchronized void remove(MessageReference node) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Mon Mar 30 18:04:20 2009
@@ -17,14 +17,11 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
-import java.io.InterruptedIOException;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.amq.AMQMessageStore;
-import org.apache.activemq.store.kahadaptor.KahaReferenceStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
Mon Mar 30 18:04:20 2009
@@ -26,5 +26,10 @@
boolean recoverMessage(Message message) throws Exception;
boolean recoverMessageReference(MessageId ref) throws Exception;
boolean hasSpace();
+ /**
+ * check if ref is a duplicate but do not record the reference
+ * @param ref
+ * @return true if ref is a duplicate
+ */
boolean isDuplicate(MessageId ref);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
Mon Mar 30 18:04:20 2009
@@ -381,6 +381,9 @@
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue()))
{
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("adding message ref:" + entry.getKey());
+ }
size++;
} else {
if (LOG.isDebugEnabled()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
Mon Mar 30 18:04:20 2009
@@ -119,7 +119,7 @@
do {
ReferenceRecord msg = messageContainer.getValue(entry);
if (msg != null ) {
- if ( recoverReference(listener, msg)) {
+ if (recoverReference(listener, msg)) {
count++;
lastBatchId = msg.getMessageId();
} else if (!listener.isDuplicate(new MessageId(msg.getMessageId())))
{
@@ -180,14 +180,6 @@
lock.unlock();
}
}
-
- public void addReferenceFileIdsInUse() {
- for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
- .getNext(entry)) {
- ReferenceRecord msg = (ReferenceRecord)messageContainer.getValue(entry);
- addInterest(msg);
- }
- }
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException
{
removeMessage(ack.getLastMessageId());
@@ -274,6 +266,9 @@
lock.lock();
try {
batchEntry = messageContainer.getEntry(startAfter);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("setBatch: " + startAfter);
+ }
} finally {
lock.unlock();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Mon Mar
30 18:04:20 2009
@@ -184,10 +184,6 @@
onLimitChange();
}
- /*
- * Sets the minimum number of percentage points the usage has to change
- * before a UsageListener event is fired by the manager.
- */
public int getPercentUsage() {
synchronized (usageMutex) {
return percentUsage;
@@ -243,8 +239,9 @@
private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
if (debug) {
- LOG.debug("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage);
- }
+ LOG.info("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage);
+ }
+
if (started.get()) {
// Switching from being full to not being full..
if (oldPercentUsage >= 100 && newPercentUsage < 100) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=760075&r1=760074&r2=760075&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Mon
Mar 30 18:04:20 2009
@@ -30,17 +30,20 @@
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.broker.util.LoggingBrokerPlugin;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -63,16 +66,18 @@
private final String SEQ_NUM_PROPERTY = "seqNum";
- final int MESSAGE_LENGTH_BYTES = 75000;
+ final int MESSAGE_LENGTH_BYTES = 75 * 1024;
final int MAX_TO_SEND = 1500;
final long SLEEP_BETWEEN_SEND_MS = 3;
final int NUM_SENDERS_AND_RECEIVERS = 10;
final Object brokerLock = new Object();
-
+
BrokerService broker;
Vector<Throwable> exceptions = new Vector<Throwable>();
private File dataDirFile;
+ final LoggingBrokerPlugin[] plugins = new LoggingBrokerPlugin[]{new LoggingBrokerPlugin()};
+
public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService();
@@ -112,7 +117,7 @@
private class Receiver implements MessageListener {
- private final String queueName;
+ private final javax.jms.Destination dest;
private final Connection connection;
@@ -124,13 +129,17 @@
private String lastId = null;
- public Receiver(String queueName) throws JMSException {
- this.queueName = queueName;
+ public Receiver(javax.jms.Destination dest) throws JMSException {
+ this.dest = dest;
connection = new ActiveMQConnectionFactory(BROKER_URL)
.createConnection();
+ connection.setClientID(dest.toString());
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- messageConsumer = session.createConsumer(new ActiveMQQueue(
- queueName));
+ if (ActiveMQDestination.transform(dest).isTopic()) {
+ messageConsumer = session.createDurableSubscriber((Topic) dest, dest.toString());
+ } else {
+ messageConsumer = session.createConsumer(dest);
+ }
messageConsumer.setMessageListener(this);
connection.start();
}
@@ -147,22 +156,22 @@
try {
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
if ((seqNum % 500) == 0) {
- LOG.info(queueName + " received " + seqNum);
+ LOG.info(dest + " received " + seqNum);
}
if (seqNum != nextExpectedSeqNum) {
- LOG.warn(queueName + " received " + seqNum
+ LOG.warn(dest + " received " + seqNum
+ " in msg: " + message.getJMSMessageID()
+ " expected "
+ nextExpectedSeqNum
+ ", lastId: " + lastId
+ ", message:" + message);
- fail(queueName + " received " + seqNum + " expected "
+ fail(dest + " received " + seqNum + " expected "
+ nextExpectedSeqNum);
}
++nextExpectedSeqNum;
lastId = message.getJMSMessageID();
} catch (Throwable e) {
- LOG.error(queueName + " onMessage error", e);
+ LOG.error(dest + " onMessage error", e);
exceptions.add(e);
}
}
@@ -171,7 +180,7 @@
private class Sender implements Runnable {
- private final String queueName;
+ private final javax.jms.Destination dest;
private final Connection connection;
@@ -181,13 +190,12 @@
private volatile long nextSequenceNumber = 0;
- public Sender(String queueName) throws JMSException {
- this.queueName = queueName;
+ public Sender(javax.jms.Destination dest) throws JMSException {
+ this.dest = dest;
connection = new ActiveMQConnectionFactory(BROKER_URL)
.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- messageProducer = session.createProducer(new ActiveMQQueue(
- queueName));
+ messageProducer = session.createProducer(dest);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
}
@@ -203,14 +211,14 @@
++nextSequenceNumber;
messageProducer.send(message);
} catch (Exception e) {
- LOG.error(queueName + " send error", e);
+ LOG.error(dest + " send error", e);
exceptions.add(e);
}
if (SLEEP_BETWEEN_SEND_MS > 0) {
try {
Thread.sleep(SLEEP_BETWEEN_SEND_MS);
} catch (InterruptedException e) {
- LOG.warn(queueName + " sleep interrupted", e);
+ LOG.warn(dest + " sleep interrupted", e);
}
}
}
@@ -240,7 +248,7 @@
public void configure(BrokerService broker) throws Exception {
SystemUsage usage = new SystemUsage();
MemoryUsage memoryUsage = new MemoryUsage();
- memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 5 * NUM_SENDERS_AND_RECEIVERS);
+ memoryUsage.setLimit(MESSAGE_LENGTH_BYTES * 10 * NUM_SENDERS_AND_RECEIVERS);
usage.setMemoryUsage(memoryUsage);
broker.setSystemUsage(usage);
@@ -252,7 +260,8 @@
verifyStats(false);
}
- public void testOrderWithRestartAndVMIndex() throws Exception {
+ // no need to run this unless there are some issues with the others
+ public void noProblem_testOrderWithRestartAndVMIndex() throws Exception {
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory =
@@ -288,7 +297,10 @@
});
final Timer timer = new Timer();
- schedualRestartTask(timer, null);
+ schedualRestartTask(timer, new Configurer() {
+ public void configure(BrokerService broker) throws Exception {
+ }
+ });
try {
verifyOrderedMessageReceipt();
@@ -300,29 +312,27 @@
}
- public void testOrderWithRestartAndNoCache() throws Exception {
+ public void x_testTopicOrderWithRestart() throws Exception {
+ plugins[0].setLogAll(true);
+ plugins[0].setLogInternalEvents(false);
+
- PolicyEntry noCache = new PolicyEntry();
- noCache.setUseCache(false);
- final PolicyMap policyMap = new PolicyMap();
- policyMap.setDefaultEntry(noCache);
-
createBroker(new Configurer() {
public void configure(BrokerService broker) throws Exception {
- broker.setDestinationPolicy(policyMap);
- broker.deleteAllMessages();
+ broker.deleteAllMessages();
+ broker.setPlugins(plugins);
}
});
final Timer timer = new Timer();
schedualRestartTask(timer, new Configurer() {
public void configure(BrokerService broker) throws Exception {
- broker.setDestinationPolicy(policyMap);
+ broker.setPlugins(plugins);
}
});
try {
- verifyOrderedMessageReceipt();
+ verifyOrderedMessageReceipt(ActiveMQDestination.TOPIC_TYPE);
} finally {
timer.cancel();
}
@@ -339,6 +349,7 @@
AMQPersistenceAdapterFactory persistenceFactory =
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setForceRecoverReferenceStore(true);
+ broker.setPlugins(plugins);
broker.deleteAllMessages();
}
});
@@ -349,6 +360,7 @@
AMQPersistenceAdapterFactory persistenceFactory =
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
persistenceFactory.setForceRecoverReferenceStore(true);
+ broker.setPlugins(plugins);
}
});
@@ -408,19 +420,24 @@
}
private void verifyOrderedMessageReceipt() throws Exception {
+ verifyOrderedMessageReceipt(ActiveMQDestination.QUEUE_TYPE);
+ }
+
+ private void verifyOrderedMessageReceipt(byte destinationType) throws Exception {
Vector<Thread> threads = new Vector<Thread>();
Vector<Receiver> receivers = new Vector<Receiver>();
for (int i = 0; i < NUM_SENDERS_AND_RECEIVERS; ++i) {
- final String queueName = "test.queue." + i;
- receivers.add(new Receiver(queueName));
- Thread thread = new Thread(new Sender(queueName));
+ final javax.jms.Destination destination =
+ ActiveMQDestination.createDestination("test.dest." + i, destinationType);
+ receivers.add(new Receiver(destination));
+ Thread thread = new Thread(new Sender(destination));
thread.start();
threads.add(thread);
}
- final long expiry = System.currentTimeMillis() + 1000 * 60 * 20;
+ final long expiry = System.currentTimeMillis() + 1000 * 60 * 30;
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis()
< expiry) {
Thread sendThread = threads.firstElement();
sendThread.join(1000*10);
|