activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: [AMQ-6667] gate cursor cache enablement on a single pending send and tidy up setbatch to always check outstanding async future list. Fix and test
Date Wed, 03 May 2017 10:36:30 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 57795bafc -> a0ba0bf4c


[AMQ-6667] gate cursor cache enablement on a single pending send and tidy up setbatch to always
check outstanding async future list. Fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0ba0bf4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0ba0bf4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0ba0bf4

Branch: refs/heads/master
Commit: a0ba0bf4c6beaf50ce5e021ef5e4d493119bb1ef
Parents: 57795ba
Author: gtully <gary.tully@gmail.com>
Authored: Wed May 3 11:36:06 2017 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed May 3 11:36:06 2017 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  13 +-
 .../region/cursors/AbstractStoreCursor.java     |  91 +++--
 .../region/cursors/QueueStorePrefetch.java      |   7 +
 .../cursors/StoreQueueCursorOrderTest.java      |  12 +-
 .../activemq/bugs/DuplicateFromStoreTest.java   | 345 +++++++++++++++++++
 5 files changed, 430 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 3ead89d..c6241b0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -38,6 +38,7 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -114,6 +115,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
     // Messages that are paged in but have not yet been targeted at a subscription
     private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock();
     protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList();
+    private AtomicInteger pendingSends = new AtomicInteger(0);
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory();
@@ -149,7 +151,11 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
 
     private final Object iteratingMutex = new Object();
 
-
+    // gate on enabling cursor cache to ensure no outstanding sync
+    // send before async sends resume
+    public boolean singlePendingSend() {
+        return pendingSends.get() <= 1;
+    }
 
     class TimeoutMessage implements Delayed {
 
@@ -825,6 +831,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
         ListenableFuture<Object> result = null;
 
         producerExchange.incrementSend();
+        pendingSends.incrementAndGet();
         do {
             checkUsage(context, producerExchange, message);
             message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
@@ -845,6 +852,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
                     // we may have a store in inconsistent state, so reset the cursor
                     // before restarting normal broker operations
                     resetNeeded = true;
+                    pendingSends.decrementAndGet();
                     throw e;
                 }
             }
@@ -1837,6 +1845,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
     }
 
     final void messageSent(final ConnectionContext context, final Message msg) throws Exception
{
+        pendingSends.decrementAndGet();
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         destinationStatistics.getMessageSize().addSize(msg.getSize());
@@ -1983,7 +1992,7 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
                         // store should have trapped duplicate in it's index, or cursor audit
trapped insert
                         // or producerBrokerExchange suppressed send.
                         // note: jdbc store will not trap unacked messages as a duplicate
b/c it gives each message a unique sequence id
-                        LOG.warn("{}, duplicate message {} from cursor, is cursor audit disabled
or too constrained? Redirecting to dlq", this, ref.getMessage());
+                        LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit
disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
                         if (store != null) {
                             ConnectionContext connectionContext = createConnectionContext();
                             dropMessage(ref);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 0295b33..aef7528 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -232,12 +232,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
     public synchronized boolean tryAddMessageLast(MessageReference node, long wait) throws
Exception {
         boolean disableCache = false;
         if (hasSpace()) {
-            if (!isCacheEnabled() && size==0 && isStarted() && useCache)
{
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("{} - enabling cache for empty store {} {}", this, node.getMessageId(),
node.getMessageId().getFutureOrSequenceLong());
-                }
-                setCacheEnabled(true);
-            }
             if (isCacheEnabled()) {
                 if (recoverMessage(node.getMessage(),true)) {
                     trackLastCached(node);
@@ -261,41 +255,68 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
         return true;
     }
 
+    @Override
+    public synchronized boolean isCacheEnabled() {
+        return super.isCacheEnabled() || enableCacheNow();
+    }
+
+    protected boolean enableCacheNow() {
+        boolean result = false;
+        if (canEnableCash()) {
+            setCacheEnabled(true);
+            result = true;
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("{} enabling cache on empty store", this);
+            }
+        }
+        return result;
+    }
+
+    protected boolean canEnableCash() {
+        return useCache && size==0 && hasSpace() && isStarted();
+    }
+
     private void syncWithStore(Message currentAdd) throws Exception {
         pruneLastCached();
-        if (lastCachedIds[SYNC_ADD] == null) {
-            // possibly only async adds, lets wait on the potential last add and reset from
there
-            for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size());
it.hasPrevious(); ) {
-                MessageId lastPending = it.previous();
-                Object futureOrLong = lastPending.getFutureOrSequenceLong();
-                if (futureOrLong instanceof Future) {
-                    Future future = (Future) futureOrLong;
-                    if (future.isCancelled()) {
-                        continue;
-                    }
-                    try {
-                        future.get(5, TimeUnit.SECONDS);
-                        setLastCachedId(ASYNC_ADD, lastPending);
-                    } catch (CancellationException ok) {
-                        continue;
-                    } catch (TimeoutException potentialDeadlock) {
-                        LOG.debug("{} timed out waiting for async add", this, potentialDeadlock);
-                    } catch (Exception worstCaseWeReplay) {
-                        LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay);
-                    }
-                } else {
+        for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size());
it.hasPrevious(); ) {
+            MessageId lastPending = it.previous();
+            Object futureOrLong = lastPending.getFutureOrSequenceLong();
+            if (futureOrLong instanceof Future) {
+                Future future = (Future) futureOrLong;
+                if (future.isCancelled()) {
+                    continue;
+                }
+                try {
+                    future.get(5, TimeUnit.SECONDS);
                     setLastCachedId(ASYNC_ADD, lastPending);
+                } catch (CancellationException ok) {
+                    continue;
+                } catch (TimeoutException potentialDeadlock) {
+                    LOG.debug("{} timed out waiting for async add", this, potentialDeadlock);
+                } catch (Exception worstCaseWeReplay) {
+                    LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay);
                 }
-                break;
+            } else {
+                setLastCachedId(ASYNC_ADD, lastPending);
             }
-            if (lastCachedIds[ASYNC_ADD] != null) {
-                // ensure we don't skip current possibly sync add b/c we waited on the future
-                if (isAsync(currentAdd) || Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()),
((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) > 0) {
-                    setBatch(lastCachedIds[ASYNC_ADD]);
+            break;
+        }
+
+        MessageId candidate = lastCachedIds[ASYNC_ADD];
+        if (candidate != null) {
+            // ensure we don't skip current possibly sync add b/c we waited on the future
+            if (!isAsync(currentAdd) && Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()),
((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) < 0) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("no set batch from async:" + candidate.getFutureOrSequenceLong()
+ " >= than current: " + currentAdd.getMessageId().getFutureOrSequenceLong() + ", " + this);
                 }
+                candidate = null;
             }
-        } else {
-            setBatch(lastCachedIds[SYNC_ADD]);
+        }
+        if (candidate == null) {
+            candidate = lastCachedIds[SYNC_ADD];
+        }
+        if (candidate != null) {
+            setBatch(candidate);
         }
         // cleanup
         lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
@@ -355,6 +376,8 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor
i
             } else if (candidateOrSequenceLong != null &&
                     Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong))
> 0) {
                 lastCachedIds[index] = candidate;
+            } if (LOG.isTraceEnabled()) {
+                LOG.trace("no set last cached[" + index + "] current:" + lastCacheFutureOrSequenceLong
+ " <= than candidate: " + candidateOrSequenceLong+ ", " + this);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index b10b2e2..1a13a59 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = LoggerFactory.getLogger(QueueStorePrefetch.class);
     private final MessageStore store;
+    private final Queue queue;
     private final Broker broker;
 
     /**
@@ -46,6 +47,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
      */
     public QueueStorePrefetch(Queue queue, Broker broker) {
         super(queue);
+        this.queue = queue;
         this.store = queue.getMessageStore();
         this.broker = broker;
 
@@ -88,6 +90,11 @@ class QueueStorePrefetch extends AbstractStoreCursor {
     }
 
     @Override
+    protected boolean canEnableCash() {
+        return super.canEnableCash() && queue.singlePendingSend();
+    }
+
+    @Override
     protected synchronized boolean isStoreEmpty() {
         try {
             return this.store.isEmpty();

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index 90b8428..5a1ab90 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -184,7 +184,7 @@ public class StoreQueueCursorOrderTest {
         msg = getMessage(1);
         messages[0] = msg;
         msg.setMemoryUsage(systemUsage.getMemoryUsage());
-        msg.getMessageId().setFutureOrSequenceLong(1l);
+        msg.getMessageId().setFutureOrSequenceLong(0l);
         underTest.addMessageLast(msg);
 
 
@@ -354,7 +354,7 @@ public class StoreQueueCursorOrderTest {
         msg = getMessage(3);
         messages[2] = msg;
         msg.setMemoryUsage(systemUsage.getMemoryUsage());
-        msg.getMessageId().setFutureOrSequenceLong(3l);
+        msg.getMessageId().setFutureOrSequenceLong(2l);
         underTest.addMessageLast(msg);
 
 
@@ -375,6 +375,14 @@ public class StoreQueueCursorOrderTest {
         }
         underTest.release();
         assertEquals(count, dequeueCount);
+
+        msg = getMessage(4);
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(4l);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled on empty store",  underTest.isCacheEnabled());
+
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0ba0bf4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
new file mode 100644
index 0000000..4f4004c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+public class DuplicateFromStoreTest {
+    static Logger LOG = LoggerFactory.getLogger(DuplicateFromStoreTest.class);
+    String activemqURL;
+    BrokerService broker;
+
+    protected final static String DESTNAME = "TEST";
+    protected final static int NUM_PRODUCERS = 100;
+    protected final static int NUM_CONSUMERS = 20;
+
+    protected final static int NUM_MSGS = 40000;
+    protected final static int CONSUMER_SLEEP = 0;
+    protected final static int PRODUCER_SLEEP = 10;
+
+    public static CountDownLatch producersFinished = new CountDownLatch(NUM_PRODUCERS);
+    public static CountDownLatch consumersFinished = new CountDownLatch(NUM_CONSUMERS );
+
+    public AtomicInteger totalMessagesToSend = new AtomicInteger(NUM_MSGS);
+    public AtomicInteger totalReceived = new AtomicInteger(0);
+
+    public int messageSize = 16*1000;
+
+
+    @Before
+    public void startBroker() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://0.0.0.0:0");
+
+        // Create <policyEntry>
+        PolicyEntry policy = new PolicyEntry();
+        ActiveMQDestination dest = new ActiveMQQueue(">");
+        policy.setDestination(dest);
+        policy.setMemoryLimit(10 * 1024 * 1024); // 10 MB
+        policy.setExpireMessagesPeriod(0);
+        policy.setEnableAudit(false); // allow any duplicates from the store to bubble up
to the q impl
+        PolicyMap policies = new PolicyMap();
+        policies.put(dest, policy);
+        broker.setDestinationPolicy(policies);
+
+        // configure <systemUsage>
+        MemoryUsage memoryUsage = new MemoryUsage();
+        memoryUsage.setPercentOfJvmHeap(70);
+
+        StoreUsage storeUsage = new StoreUsage();
+        storeUsage.setLimit(8 * 1024 * 1024 * 1024); // 8 gb
+
+        SystemUsage memoryManager = new SystemUsage();
+        memoryManager.setMemoryUsage(memoryUsage);
+        memoryManager.setStoreUsage(storeUsage);
+        broker.setSystemUsage(memoryManager);
+
+        // configure KahaDB persistence
+        PersistenceAdapter kahadb = new KahaDBStore();
+        ((KahaDBStore) kahadb).setConcurrentStoreAndDispatchQueues(true);
+        broker.setPersistenceAdapter(kahadb);
+
+        // start broker
+        broker.start();
+        broker.waitUntilStarted();
+
+        activemqURL = broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Test
+    public void testDuplicateMessage() throws Exception {
+        LOG.info("Testing for duplicate messages.");
+
+        //create producer and consumer threads
+        ExecutorService producers = Executors.newFixedThreadPool(NUM_PRODUCERS);
+        ExecutorService consumers = Executors.newFixedThreadPool(NUM_CONSUMERS);
+
+        createOpenwireClients(producers, consumers);
+
+        LOG.info("All producers and consumers got started. Awaiting their termination");
+        producersFinished.await(100, TimeUnit.MINUTES);
+        LOG.info("All producers have terminated.");
+
+        consumersFinished.await(100, TimeUnit.MINUTES);
+        LOG.info("All consumers have terminated.");
+
+        producers.shutdownNow();
+        consumers.shutdownNow();
+
+        assertEquals("no messages pending, i.e. dlq empty", 0l, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+
+        // validate cache can be enabled if disabled
+
+    }
+
+
+    protected void createOpenwireClients(ExecutorService producers, ExecutorService consumers)
{
+        for (int i = 0; i < NUM_CONSUMERS; i++) {
+            LOG.trace("Creating consumer for destination " + DESTNAME);
+            Consumer consumer = new Consumer(DESTNAME, false);
+            consumers.submit(consumer);
+            // wait for consumer to signal it has fully initialized
+            synchronized(consumer.init) {
+                try {
+                    consumer.init.wait();
+                } catch (InterruptedException e) {
+                    LOG.error(e.toString(), e);
+                }
+            }
+        }
+
+        for (int i = 0; i < NUM_PRODUCERS; i++) {
+            LOG.trace("Creating producer for destination " + DESTNAME );
+            Producer producer = new Producer(DESTNAME, false, 0);
+            producers.submit(producer);
+        }
+    }
+
+    class Producer implements Runnable {
+
+        Logger log = LOG;
+        protected String destName = "TEST";
+        protected boolean isTopicDest = false;
+
+
+        public Producer(String dest, boolean isTopic, int ttl) {
+            this.destName = dest;
+            this.isTopicDest = isTopic;
+        }
+
+
+        /**
+         * Connect to broker and constantly send messages
+         */
+        public void run() {
+
+            Connection connection = null;
+            Session session = null;
+            MessageProducer producer = null;
+
+            try {
+                ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL);
+                connection = amq.createConnection();
+
+                connection.setExceptionListener(new javax.jms.ExceptionListener() {
+                    public void onException(javax.jms.JMSException e) {
+                        e.printStackTrace();
+                    }
+                });
+                connection.start();
+
+                // Create a Session
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                Destination destination;
+                if (isTopicDest) {
+                    // Create the destination (Topic or Queue)
+                    destination = session.createTopic(destName);
+                } else {
+                    destination = session.createQueue(destName);
+                }
+                // Create a MessageProducer from the Session to the Topic or Queue
+                producer = session.createProducer(destination);
+
+                // Create message
+                long counter = 0;
+                //enlarge msg to 16 kb
+                int msgSize = 16 * 1024;
+                StringBuilder stringBuilder = new StringBuilder();
+                stringBuilder.setLength(msgSize + 15);
+                stringBuilder.append("Message: ");
+                stringBuilder.append(counter);
+                for (int j = 0; j < (msgSize / 10); j++) {
+                    stringBuilder.append("XXXXXXXXXX");
+                }
+                String text = stringBuilder.toString();
+                TextMessage message = session.createTextMessage(text);
+
+                // send message
+                while (totalMessagesToSend.decrementAndGet() >= 0) {
+                    producer.send(message);
+                    log.debug("Sent message: " + counter);
+                    counter++;
+
+                    if ((counter % 10000) == 0)
+                        log.info("sent " + counter + " messages");
+
+                    Thread.sleep(PRODUCER_SLEEP);
+                }
+            } catch (Exception ex) {
+                log.error(ex.getMessage());
+                return;
+            } finally {
+                try {
+                    if (connection != null) {
+                        connection.close();
+                    }
+                } catch (Exception ignored) {
+                } finally {
+                    producersFinished.countDown();
+                }
+            }
+            log.debug("Closing producer for " + destName);
+        }
+    }
+
+    class Consumer implements Runnable {
+
+        public Object init = new Object();
+        protected String queueName = "TEST";
+        boolean isTopic = false;
+
+        Logger log = LOG;
+
+        public Consumer(String destName, boolean topic) {
+            this.isTopic = topic;
+            this.queueName = destName;
+        }
+
+        /**
+         * connect to broker and receive messages
+         */
+        public void run() {
+
+            Connection connection = null;
+            Session session = null;
+            MessageConsumer consumer = null;
+
+            try {
+                ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL);
+                connection = amq.createConnection();
+                connection.setExceptionListener(new javax.jms.ExceptionListener() {
+                    public void onException(javax.jms.JMSException e) {
+                        e.printStackTrace();
+                    }
+                });
+                connection.start();
+                // Create a Session
+                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                // Create the destination (Topic or Queue)
+                Destination destination = null;
+                if (isTopic)
+                    destination = session.createTopic(queueName);
+                else
+                    destination = session.createQueue(queueName);
+
+                //Create a MessageConsumer from the Session to the Topic or Queue
+                consumer = session.createConsumer(destination);
+
+                synchronized (init) {
+                    init.notifyAll();
+                }
+
+                // Wait for a message
+                long counter = 0;
+                while (totalReceived.get() < NUM_MSGS) {
+                    Message message2 = consumer.receive(5000);
+
+                    if (message2 instanceof TextMessage) {
+                        TextMessage textMessage = (TextMessage) message2;
+                        String text = textMessage.getText();
+                        log.debug("Received: " + text.substring(0, 50));
+                    } else {
+                        if (totalReceived.get() < NUM_MSGS) {
+                            log.error("Received message of unsupported type. Expecting TextMessage.
" + message2);
+                        }
+                        break;
+                    }
+                    if (message2 != null) {
+                        counter++;
+                        totalReceived.incrementAndGet();
+                        if ((counter % 10000) == 0)
+                            log.info("received " + counter + " messages");
+
+                        Thread.sleep(CONSUMER_SLEEP);
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Error in Consumer: " + e.getMessage());
+                return;
+            } finally {
+                try {
+                    if (connection != null) {
+                        connection.close();
+                    }
+                } catch (Exception ignored) {
+                } finally {
+                    consumersFinished.countDown();
+                }
+            }
+        }
+    }
+}


Mime
View raw message