activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject [1/2] git commit: https://issues.apache.org/jira/browse/AMQ-4952 - deal with duplicates by redirecting to dlq when they are detected by the cursors such that they don't linger for redispatch after a restart. Networks are the main culprit for such duplica
Date Wed, 08 Jan 2014 12:48:57 GMT
Updated Branches:
  refs/heads/trunk 3af70ba28 -> 0f0c0d676


https://issues.apache.org/jira/browse/AMQ-4952 - deal with duplicates by redirecting to dlq
when they are detected by the cursors such that they don't linger for redispatch after a restart.
Networks are the main culprit for such duplicates b/c the producer audit traps regular failover
resends


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f92d45be
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f92d45be
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f92d45be

Branch: refs/heads/trunk
Commit: f92d45bec1be76b1e38d3be2d9700bbdd9054d17
Parents: 3af70ba
Author: gtully <gary.tully@gmail.com>
Authored: Tue Jan 7 16:43:56 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Tue Jan 7 16:52:39 2014 +0000

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java |  36 +-
 .../activemq/broker/region/Destination.java     |   2 +
 .../broker/region/DestinationFilter.java        |   5 +
 .../apache/activemq/broker/region/Queue.java    |   2 +
 .../apache/activemq/broker/region/Topic.java    |  11 +
 .../region/cursors/AbstractStoreCursor.java     |  37 +-
 .../region/cursors/QueueStorePrefetch.java      |   1 +
 .../region/cursors/TopicStorePrefetch.java      |   7 +-
 .../activemq/store/kahadb/MessageDatabase.java  |   2 +-
 .../kahadb/disk/journal/DataFileAccessor.java   |   2 +-
 .../org/apache/activemq/bugs/AMQ4952Test.java   | 475 +++++++++++++++++++
 .../activemq/bugs/ConnectionPerMessageTest.java |   7 +-
 .../usecases/ThreeBrokerTopicNetworkTest.java   |  15 +-
 13 files changed, 578 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index f350098..7bbe360 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import javax.jms.ResourceAllocationException;
 import org.apache.activemq.advisory.AdvisorySupport;
@@ -32,6 +33,7 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
@@ -761,7 +763,7 @@ public abstract class BaseDestination implements Destination {
         return hasRegularConsumers;
     }
 
-    protected ConnectionContext createConnectionContext() {
+    public ConnectionContext createConnectionContext() {
         ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
         answer.setBroker(this.broker);
         answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
@@ -790,4 +792,36 @@ public abstract class BaseDestination implements Destination {
         return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination());
     }
 
+    public void duplicateFromStore(Message message, Subscription durableSub) {
+        ConnectionContext connectionContext = createConnectionContext();
+
+        TransactionId transactionId = message.getTransactionId();
+        if (transactionId != null && transactionId.isXATransaction()) {
+            try {
+                List<TransactionId> preparedTx = Arrays.asList(broker.getRoot().getPreparedTransactions(connectionContext));
+                getLog().trace("processing duplicate in {}, prepared {} ", transactionId,
preparedTx);
+                if (!preparedTx.contains(transactionId)) {
+                    // duplicates from past transactions expected after org.apache.activemq.broker.region.Destination#clearPendingMessages
+                    // till they are acked
+                    getLog().debug("duplicate message from store {}, from historical transaction
{}, ignoring", message.getMessageId(), transactionId);
+                    return;
+                }
+            } catch (Exception ignored) {
+                getLog().debug("failed to determine state of transaction {} on duplicate
message {}", transactionId, message.getMessageId(), ignored);
+            }
+        }
+
+        getLog().warn("duplicate message from store {}, redirecting for dlq processing",
message.getMessageId());
+        Throwable cause = new Throwable("duplicate from store for " + destination);
+        message.setRegionDestination(this);
+        broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
+        MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
+        messageAck.setPoisonCause(cause);
+        try {
+            acknowledge(connectionContext, durableSub, messageAck, message);
+        } catch (IOException e) {
+            getLog().error("Failed to acknowledge duplicate message {} from {} with {}",
message.getMessageId(), destination, messageAck);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 93502e7..ddcba84 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -239,4 +239,6 @@ public interface Destination extends Service, Task, Message.MessageDestination
{
     public void clearPendingMessages();
 
     public boolean isDLQ();
+
+    void duplicateFromStore(Message message, Subscription subscription);
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index ecf6cf7..7e21bcc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -388,6 +388,11 @@ public class DestinationFilter implements Destination {
         return next.isDLQ();
     }
 
+    @Override
+    public void duplicateFromStore(Message message, Subscription subscription) {
+        next.duplicateFromStore(message, subscription);
+    }
+
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws
Exception {
         if (next instanceof DestinationFilter) {
             DestinationFilter filter = (DestinationFilter) next;

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 52d7282..18a4a69 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1747,6 +1747,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
 
     protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference
reference,
             MessageAck ack) throws IOException {
+        LOG.trace("ack of {} with {}", reference.getMessageId(), ack);
         reference.setAcked(true);
         // This sends the ack the the journal..
         if (!ack.isInTransaction()) {
@@ -2049,6 +2050,7 @@ public class Queue extends BaseDestination implements Task, UsageListener
{
                         if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s,
(QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
                             // Dispatch it.
                             s.add(node);
+                            LOG.trace("assigned {} to consumer {}", node.getMessageId(),
s.getConsumerInfo().getConsumerId());
                             iterator.remove();
                             target = s;
                             break;

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index ab0f8ce..f85ed66 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -805,6 +805,17 @@ public class Topic extends BaseDestination implements Task {
         }
     }
 
+    private void rollback(MessageId poisoned) {
+        dispatchLock.readLock().lock();
+        try {
+            for (DurableTopicSubscription durableTopicSubscription : durableSubscribers.values())
{
+                durableTopicSubscription.getPending().rollback(poisoned);
+            }
+        } finally {
+            dispatchLock.readLock().unlock();
+        }
+    }
+
     public Map<SubscriptionKey, DurableTopicSubscription> getDurableTopicSubs() {
         return durableSubscribers;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 495d365..bb77afb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -17,8 +17,10 @@
 package org.apache.activemq.broker.region.cursors;
 
 import java.util.Iterator;
+import java.util.LinkedList;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.TransactionId;
@@ -93,17 +95,25 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
             recovered = true;
             storeHasMessages = true;
         } else {
-            /*
-             * we should expect to get these - as the message is recorded as it before it
goes into
-             * the cache. If subsequently, we pull out that message from the store (before
its deleted)
-             * it will be a duplicate - but should be ignored
-             */
-            LOG.trace("{} - cursor got duplicate: {}, {}", new Object[]{ this, message.getMessageId(),
message.getPriority() });
+            LOG.warn("{} - cursor got duplicate: {}, {}", new Object[]{ this, message.getMessageId(),
message.getPriority() });
+            duplicate(message);
         }
         return recovered;
     }
-    
-    
+
+    // track for processing outside of store index lock so we can dlq
+    final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>();
+    private void duplicate(Message message) {
+        duplicatesFromStore.add(message);
+    }
+
+    void dealWithDuplicates() {
+        for (Message message : duplicatesFromStore) {
+            regionDestination.duplicateFromStore(message, getSubscription());
+        }
+        duplicatesFromStore.clear();
+    }
+
     public final synchronized void reset() {
         if (batchList.isEmpty()) {
             try {
@@ -180,9 +190,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
                     lastCachedId = node.getMessageId();
                     lastTx = node.getMessage().getTransactionId();
                 } else {
-                    // failed to recover, possible duplicate from concurrent dispatchPending,
-                    // lets not recover further in case of out of order
-                    disableCache = true;
+                    LOG.debug(this + " duplicate add {}", node.getMessage(), new Throwable("duplicated
detected"));
+                    dealWithDuplicates();
                 }
             }
         } else {
@@ -251,7 +260,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
     }
 
     protected final synchronized void fillBatch() {
-        LOG.trace("{} - fillBatch", this);
+        //LOG.trace("{} - fillBatch", this);
         if (batchResetNeeded) {
             resetSize();
             setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size));
@@ -302,4 +311,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
     protected abstract int getStoreSize();
     
     protected abstract boolean isStoreEmpty();
+
+    public Subscription getSubscription() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index b7fd289..c89b648 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -104,6 +104,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         hadSpace = this.hasSpace();
         if (!broker.getBrokerService().isPersistent() || hadSpace) {
             this.store.recoverNextMessages(this.maxBatchSize, this);
+            dealWithDuplicates(); // without the index lock
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index aec5c06..9e02e4e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -69,7 +69,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         
     @Override
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception
{
-        LOG.trace("recover: {}, priority: {}", message.getMessageId(), message.getPriority());
+        LOG.trace("{} recover: {}, priority: {}", this, message.getMessageId(), message.getPriority());
         boolean recovered = false;
         MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
         messageEvaluationContext.setMessageReference(message);
@@ -123,6 +123,11 @@ class TopicStorePrefetch extends AbstractStoreCursor {
     }
 
     @Override
+    public Subscription getSubscription() {
+        return subscription;
+    }
+
+    @Override
     public String toString() {
         return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId()
+ " - " + super.toString();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8c8fe0f..2d2dd55 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1291,7 +1291,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             } else {
                 // If the message ID as indexed, then the broker asked us to
                 // store a DUP message. Bad BOY! Don't do it, and log a warning.
-                LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName()
+ ", Message id: " + command.getMessageId());
+                LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message
id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
                 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
                 sd.locationIndex.remove(tx, location);
                 rollbackStatsOnDuplicate(command.getDestination());

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 7781b7e..2896196 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -90,7 +90,7 @@ final class DataFileAccessor {
             return new ByteSequence(data, 0, data.length);
 
         } catch (RuntimeException e) {
-            throw new IOException("Invalid location: " + location + ", : " + e);
+            throw new IOException("Invalid location: " + location + ", : " + e, e);
         }
     }
     

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
new file mode 100644
index 0000000..06f60e2
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
@@ -0,0 +1,475 @@
+
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.*;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.*;
+import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.Wait;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.sql.DataSource;
+import scala.actors.threadpool.Arrays;
+import java.net.URI;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Test creates a broker network with two brokers -
+ * producerBroker (with a message producer attached) and consumerBroker (with consumer attached)
+ * <p/>
+ * Simulates network duplicate message by stopping and restarting the consumerBroker after
message (with message ID ending in
+ * 120) is persisted to consumerBrokerstore BUT BEFORE ack sent to the producerBroker over
the network connection.
+ * When the network connection is reestablished the producerBroker resends
+ * message (with messageID ending in 120).
+ * <p/>
+ * Expectation:
+ * <p/>
+ * With the following policy entries set,  would  expect the duplicate message to be read
from the store
+ * and dispatched to the consumer - where the duplicate could be detected by consumer.
+ * <p/>
+ * PolicyEntry policy = new PolicyEntry();
+ * policy.setQueue(">");
+ * policy.setEnableAudit(false);
+ * policy.setUseCache(false);
+ * policy.setExpireMessagesPeriod(0);
+ * <p/>
+ * <p/>
+ * Note 1: Network needs to use replaywhenNoConsumers so enabling the networkAudit to avoid
this scenario is not feasible.
+ * <p/>
+ * NOTE 2: Added a custom plugin to the consumerBroker so that the consumerBroker shutdown
will occur after a message has been
+ * persisted to consumerBroker store but before an ACK is sent back to ProducerBroker. This
is just a hack to ensure producerBroker will resend
+ * the message after shutdown.
+ */
+
+public class AMQ4952Test extends TestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class);
+
+    protected static final int MESSAGE_COUNT = 1;
+
+    protected BrokerService consumerBroker;
+    protected BrokerService producerBroker;
+
+    protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store");
+
+    private final CountDownLatch stopConsumerBroker = new CountDownLatch(1);
+    private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1);
+    private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1);
+
+    private EmbeddedDataSource localDataSource;
+
+
+    public void testConsumerBrokerRestart() throws Exception {
+
+        Callable consumeMessageTask = new Callable() {
+            @Override
+            public Object call() throws Exception {
+
+                int receivedMessageCount = 0;
+
+                ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false");
+                Connection consumerConnection = consumerFactory.createConnection();
+
+                try {
+
+                    consumerConnection.setClientID("consumer");
+                    consumerConnection.start();
+
+                    Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                    MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME);
+
+
+                    while (true) {
+                        TextMessage textMsg = (TextMessage) messageConsumer.receive(5000);
+
+                        if (textMsg == null) {
+                            return receivedMessageCount;
+                        }
+
+                        receivedMessageCount++;
+                        LOG.info("*** receivedMessageCount {} message has MessageID {} ",
receivedMessageCount, textMsg.getJMSMessageID());
+
+                        // on first delivery ensure the message is pending an ack when it
is resent from the producer broker
+                        if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount
== 1) {
+                            LOG.info("Waiting for restart...");
+                            consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS);
+                        }
+
+                        textMsg.acknowledge();
+
+                    }
+                } finally {
+                    consumerConnection.close();
+                }
+            }
+        };
+
+        Runnable consumerBrokerResetTask = new Runnable() {
+            public void run() {
+
+                try {
+                    // wait for signal
+                    stopConsumerBroker.await();
+
+
+                    LOG.info("********* STOPPING CONSUMER BROKER");
+
+                    consumerBroker.stop();
+                    consumerBroker.waitUntilStopped();
+
+
+                    LOG.info("***** STARTING CONSUMER BROKER");
+                    // do not delete  messages on startup
+                    consumerBroker = createConsumerBroker(false);
+
+                    LOG.info("***** CONSUMER BROKER STARTED!!");
+                    consumerBrokerRestarted.countDown();
+
+                    assertTrue("message forwarded on time", Wait.waitFor(new Wait.Condition()
{
+                        @Override
+                        public boolean isSatisified() throws Exception {
+                            LOG.info("ProducerBroker totalMessageCount: " + producerBroker.getAdminView().getTotalMessageCount());
+                            return producerBroker.getAdminView().getTotalMessageCount() ==
0;
+                        }
+                    }));
+                    consumerRestartedAndMessageForwarded.countDown();
+
+
+                } catch (Exception e) {
+                    LOG.error("Exception when stopping/starting the consumerBroker ", e);
+                }
+
+
+            }
+        };
+
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+
+        //start consumerBroker start/stop task
+        executor.execute(consumerBrokerResetTask);
+
+        //start consuming messages
+        Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask);
+
+
+        produceMessages();
+
+        //Wait for consumer to finish
+        int totalMessagesConsumed = numberOfConsumedMessage.get();
+
+        StringBuffer contents = new StringBuffer();
+        boolean messageInStore = isMessageInJDBCStore(localDataSource, contents);
+        LOG.debug("****number of messages received " + totalMessagesConsumed);
+
+        assertEquals("number of messages received", 2, totalMessagesConsumed);
+        assertEquals("messages left in store", true, messageInStore);
+        assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ"));
+
+    }
+
+    private void produceMessages() throws JMSException {
+
+        ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false");
+        Connection producerConnection = producerFactory.createConnection();
+
+        try {
+            producerConnection.setClientID("producer");
+            producerConnection.start();
+
+            Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            final MessageProducer remoteProducer = producerSession.createProducer(QUEUE_NAME);
+
+            int i = 0;
+            while (MESSAGE_COUNT > i) {
+                String payload = "test msg " + i;
+                TextMessage msg = producerSession.createTextMessage(payload);
+                remoteProducer.send(msg);
+                i++;
+            }
+
+        } finally {
+            producerConnection.close();
+        }
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        doSetUp(true);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        doTearDown();
+        super.tearDown();
+    }
+
+    protected void doTearDown() throws Exception {
+
+        try {
+            consumerBroker.stop();
+        } catch (Exception ex) {
+        }
+        try {
+            consumerBroker.stop();
+        } catch (Exception ex) {
+        }
+    }
+
+    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+        producerBroker = createProducerBroker();
+        consumerBroker = createConsumerBroker(true);
+    }
+
+
+    /**
+     * Producer broker
+     * listens on  localhost:2003
+     * networks to consumerBroker - localhost:2006
+     *
+     * @return
+     * @throws Exception
+     */
+
+    protected BrokerService createProducerBroker() throws Exception {
+
+
+        String networkToPorts[] = new String[]{"2006"};
+        HashMap<String, String> networkProps = new HashMap<String, String>();
+
+        networkProps.put("networkTTL", "10");
+        networkProps.put("conduitSubscriptions", "true");
+        networkProps.put("decreaseNetworkConsumerPriority", "true");
+        networkProps.put("dynamicOnly", "true");
+
+        BrokerService broker = new BrokerService();
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setBrokerName("BP");
+        broker.setAdvisorySupport(false);
+
+        // lazy init listener on broker start
+        TransportConnector transportConnector = new TransportConnector();
+        transportConnector.setUri(new URI("tcp://localhost:2003"));
+        List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+        transportConnectors.add(transportConnector);
+        broker.setTransportConnectors(transportConnectors);
+
+
+        //network to consumerBroker
+
+        if (networkToPorts != null && networkToPorts.length > 0) {
+            StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false");
+            NetworkConnector nc = broker.addNetworkConnector(builder.toString());
+            if (networkProps != null) {
+                IntrospectionSupport.setProperties(nc, networkProps);
+            }
+            nc.setStaticallyIncludedDestinations(Arrays.asList(new ActiveMQQueue[]{QUEUE_NAME}));
+        }
+
+        //Persistence adapter
+
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource remoteDataSource = new EmbeddedDataSource();
+        remoteDataSource.setDatabaseName("derbyDBRemoteBroker");
+        remoteDataSource.setCreateDatabase("create");
+        jdbc.setDataSource(remoteDataSource);
+        broker.setPersistenceAdapter(jdbc);
+
+        //set Policy entries
+        PolicyEntry policy = new PolicyEntry();
+
+        policy.setQueue(">");
+        policy.setEnableAudit(false);
+        policy.setUseCache(false);
+        policy.setExpireMessagesPeriod(0);
+
+        // set replay with no consumers
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
+                new ConditionalNetworkBridgeFilterFactory();
+        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
+        policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.start();
+        broker.waitUntilStarted();
+
+        return broker;
+    }
+
+
+    /**
+     * consumerBroker
+     * - listens on localhost:2006
+     *
+     * @param deleteMessages - drop messages when broker instance is created
+     * @return
+     * @throws Exception
+     */
+
+    protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception
{
+
+        String scheme = "tcp";
+        String listenPort = "2006";
+
+        BrokerService broker = new BrokerService();
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setDeleteAllMessagesOnStartup(deleteMessages);
+        broker.setBrokerName("BC");
+        // lazy init listener on broker start
+        TransportConnector transportConnector = new TransportConnector();
+        transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort));
+        List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+        transportConnectors.add(transportConnector);
+        broker.setTransportConnectors(transportConnectors);
+
+        //policy entries
+
+        PolicyEntry policy = new PolicyEntry();
+
+        policy.setQueue(">");
+        policy.setUseCache(false);
+        policy.setExpireMessagesPeriod(0);
+
+        // set replay with no consumers
+        ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory =
+                new ConditionalNetworkBridgeFilterFactory();
+        conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
+        policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
+
+        PolicyMap pMap = new PolicyMap();
+
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+
+        // Persistence adapter
+        JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter();
+        EmbeddedDataSource localDataSource = new EmbeddedDataSource();
+        localDataSource.setDatabaseName("derbyDBLocalBroker");
+        localDataSource.setCreateDatabase("create");
+        localJDBCPersistentAdapter.setDataSource(localDataSource);
+        broker.setPersistenceAdapter(localJDBCPersistentAdapter);
+
+        if (deleteMessages) {
+            // no plugin on restart
+            broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()});
+        }
+
+        this.localDataSource = localDataSource;
+
+        broker.start();
+        broker.waitUntilStarted();
+
+        return broker;
+    }
+
+
+    /**
+     * Query JDBC Store to see if messages are left
+     *
+     * @param dataSource
+     * @return
+     * @throws SQLException
+     */
+
+    private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer)
+            throws SQLException {
+
+        boolean tableHasData = false;
+        String query = "select * from ACTIVEMQ_MSGS";
+
+        java.sql.Connection conn = dataSource.getConnection();
+        PreparedStatement s = conn.prepareStatement(query);
+
+        ResultSet set = null;
+
+
+
+        try {
+            StringBuffer headers = new StringBuffer();
+            set = s.executeQuery();
+            ResultSetMetaData metaData = set.getMetaData();
+            for (int i = 1; i <= metaData.getColumnCount(); i++) {
+
+                if (i == 1) {
+                    headers.append("||");
+                }
+                headers.append(metaData.getColumnName(i) + "||");
+            }
+            LOG.error(headers.toString());
+
+
+            while (set.next()) {
+                tableHasData = true;
+
+                for (int i = 1; i <= metaData.getColumnCount(); i++) {
+                    if (i == 1) {
+                        stringBuffer.append("|");
+                    }
+                    stringBuffer.append(set.getString(i) + "|");
+                }
+                LOG.error(stringBuffer.toString());
+            }
+        } finally {
+            try {
+                set.close();
+            } catch (Throwable ignore) {
+            }
+            try {
+                s.close();
+            } catch (Throwable ignore) {
+            }
+
+            conn.close();
+        }
+
+        return tableHasData;
+    }
+
+
+    /**
+     * plugin used to ensure consumerbroker is restared before the network message from producerBroker
is acked
+     */
+    class MyTestPlugin implements BrokerPlugin {
+
+        public Broker installPlugin(Broker broker) throws Exception {
+            return new MyTestBroker(broker);
+        }
+
+    }
+
+    class MyTestBroker extends BrokerFilter {
+
+        public MyTestBroker(Broker next) {
+            super(next);
+        }
+
+        public void send(ProducerBrokerExchange producerExchange, org.apache.activemq.command.Message
messageSend) throws Exception {
+
+            super.send(producerExchange, messageSend);
+            LOG.error("Stopping broker on send:  " +messageSend.getMessageId().getProducerSequenceId());
+            stopConsumerBroker.countDown();
+            producerExchange.getConnectionContext().setDontSendReponse(true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
index e7ee4d9..8c580a9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
@@ -35,19 +35,19 @@ import org.slf4j.LoggerFactory;
 public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ConnectionPerMessageTest.class);
-	private static final int COUNT = 20000;
+	private static final int COUNT = 2000;
 	protected String bindAddress;
 
 	public void testConnectionPerMessage() throws Exception {
 		final String topicName = "test.topic";
 
-		LOG.info("Initializing pooled connection factory for JMS to URL: "
+		LOG.info("Initializing connection factory for JMS to URL: "
 				+ bindAddress);
 		final ActiveMQConnectionFactory normalFactory = new ActiveMQConnectionFactory();
 		normalFactory.setBrokerURL(bindAddress);
 		for (int i = 0; i < COUNT; i++) {
 
-			if (i % 1000 == 0) {
+			if (i % 100 == 0) {
 				LOG.info(new Integer(i).toString());
 			}
 
@@ -86,6 +86,7 @@ public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport
{
 
 	protected BrokerService createBroker() throws Exception {
 		BrokerService answer = new BrokerService();
+        answer.setDeleteAllMessagesOnStartup(true);
 		answer.setUseJmx(false);
 		answer.setPersistent(isPersistent());
 		answer.addConnector(bindAddress);

http://git-wip-us.apache.org/repos/asf/activemq/blob/f92d45be/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
index 5bd9b8a..33963b7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
@@ -58,7 +58,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
         MessageConsumer clientB = createConsumer("BrokerB", dest);
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
-//      let consumers propogate around the network
+        //let consumers propagate around the network
         Thread.sleep(2000);
         // Send messages
         sendMessages("BrokerA", dest, MESSAGE_COUNT);
@@ -143,7 +143,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
         MessageConsumer clientB = createConsumer("BrokerB", dest);
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
-//      let consumers propogate around the network
+        //let consumers propagate around the network
         Thread.sleep(2000);
         // Send messages
         sendMessages("BrokerA", dest, MESSAGE_COUNT);
@@ -182,7 +182,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
         MessageConsumer clientB = createConsumer("BrokerB", dest);
         MessageConsumer clientC = createConsumer("BrokerC", dest);
 
-//      let consumers propogate around the network
+        //let consumers propagate around the network
         Thread.sleep(2000);
 
         // Send messages
@@ -254,7 +254,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
         // default (true) is present in a matching destination policy entry
         int networkTTL = 2;
         boolean conduitSubs = true;
-        // Setup broker networks
+        // Setup ring broker networks
         bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
         bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
         bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
@@ -307,7 +307,7 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
     public void testAllConnectedBrokerNetworkDurableSubTTL() throws Exception {
         int networkTTL = 2;
         boolean conduitSubs = true;
-        // Setup broker networks
+        // Setup ring broker network
         bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
         bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL, conduitSubs);
         bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
@@ -396,6 +396,11 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
         createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
     }
 
+    @Override
+    protected void configureBroker(BrokerService broker) {
+        broker.setBrokerId(broker.getBrokerName());
+    }
+
     public static Test suite() {
         return suite(ThreeBrokerTopicNetworkTest.class);
     }


Mime
View raw message