activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1469013 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
Date Wed, 17 Apr 2013 18:07:24 GMT
Author: tabish
Date: Wed Apr 17 18:07:24 2013
New Revision: 1469013

URL: http://svn.apache.org/r1469013
Log:
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4475

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
  (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java?rev=1469013&r1=1469012&r2=1469013&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/util/TimeStampingBrokerPlugin.java
Wed Apr 17 18:07:24 2013
@@ -140,7 +140,7 @@ public class TimeStampingBrokerPlugin ex
         Destination regionDestination = (Destination) message.getRegionDestination();
         if (message != null && regionDestination != null) {
             deadLetterStrategy = regionDestination.getDeadLetterStrategy();
-            if (deadLetterStrategy != null) {
+            if (deadLetterStrategy != null && message.getOriginalDestination() !=
null) {
                 // Cheap copy, since we only need two fields
                 tmp = new ActiveMQMessage();
                 tmp.setDestination(message.getOriginalDestination());

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java?rev=1469013&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
(added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
Wed Apr 17 18:07:24 2013
@@ -0,0 +1,328 @@
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.util.TimeStampingBrokerPlugin;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4475Test {
+
+    private final Log LOG = LogFactory.getLog(AMQ4475Test.class);
+
+    private final int NUM_MSGS = 1000;
+    private final int MAX_THREADS = 20;
+
+    private BrokerService broker;
+    private String connectionUri;
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);
+    private final ActiveMQQueue original = new ActiveMQQueue("jms/AQueue");
+    private final ActiveMQQueue rerouted = new ActiveMQQueue("jms/AQueue_proxy");
+
+    @Before
+    public void setUp() throws Exception {
+        TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
+        tsbp.setZeroExpirationOverride(432000000);
+        tsbp.setTtlCeiling(432000000);
+        tsbp.setFutureOnly(true);
+
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setPlugins(new BrokerPlugin[] {tsbp});
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setProcessExpired(true);
+        ((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(true);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testIndividualDeadLetterAndTimeStampPlugin() {
+        LOG.info("Starting test ..");
+
+        long startTime = System.nanoTime();
+
+        // Produce to network
+        List<Future<ProducerTask>> tasks = new ArrayList<Future<ProducerTask>>();
+
+        for (int index = 0; index < 1; index++) {
+            ProducerTask p = new ProducerTask(connectionUri, original, NUM_MSGS);
+            Future<ProducerTask> future = executor.submit(p, p);
+            tasks.add(future);
+        }
+
+        ForwardingConsumerThread f1 = new ForwardingConsumerThread(original, rerouted, NUM_MSGS);
+        f1.start();
+        ConsumerThread c1 = new ConsumerThread(connectionUri, rerouted, NUM_MSGS);
+        c1.start();
+
+        LOG.info("Waiting on consumers and producers to exit");
+
+        try {
+            for (Future<ProducerTask> future : tasks) {
+                ProducerTask e = future.get();
+                LOG.info("[Completed] " + e.dest.getPhysicalName());
+            }
+            executor.shutdown();
+            LOG.info("Producing threads complete, waiting on ACKs");
+            f1.join(TimeUnit.MINUTES.toMillis(2));
+            c1.join(TimeUnit.MINUTES.toMillis(2));
+        } catch (ExecutionException e) {
+            LOG.warn("Caught unexpected exception: {}", e);
+            throw new RuntimeException(e);
+        } catch (InterruptedException ie) {
+            LOG.warn("Caught unexpected exception: {}", ie);
+            throw new RuntimeException(ie);
+        }
+
+        assertFalse(f1.isFailed());
+        assertFalse(c1.isFailed());
+
+        long estimatedTime = System.nanoTime() - startTime;
+
+        LOG.info("Testcase duration (seconds): " + estimatedTime / 1000000000.0);
+        LOG.info("Consumers and producers exited, all msgs received as expected");
+    }
+
+    public class ProducerTask implements Runnable {
+        private final String uri;
+        private final ActiveMQQueue dest;
+        private final int count;
+
+        public ProducerTask(String uri, ActiveMQQueue dest, int count) {
+            this.uri = uri;
+            this.dest = dest;
+            this.count = count;
+        }
+
+        @Override
+        public void run() {
+
+            Connection connection = null;
+            try {
+                String destName = "";
+
+                try {
+                    destName = dest.getQueueName();
+                } catch (JMSException e) {
+                    LOG.warn("Caught unexpected exception: {}", e);
+                }
+
+                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uri);
+
+                connection = connectionFactory.createConnection();
+
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(dest);
+                connection.start();
+
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                String msg = "Test Message";
+
+                for (int i = 0; i < count; i++) {
+                    producer.send(session.createTextMessage(msg + dest.getQueueName() + "
" + i));
+                }
+
+                LOG.info("[" + destName + "] Sent " + count + " msgs");
+            } catch (Exception e) {
+                LOG.warn("Caught unexpected exception: {}", e);
+            } finally {
+                try {
+                    connection.close();
+                } catch (Throwable e) {
+                    LOG.warn("Caught unexpected exception: {}", e);
+                }
+            }
+        }
+    }
+
+    public class ForwardingConsumerThread extends Thread {
+
+        private final ActiveMQQueue original;
+        private final ActiveMQQueue forward;
+        private int blockSize = 0;
+        private final int PARALLEL = 1;
+        private boolean failed;
+
+        public ForwardingConsumerThread(ActiveMQQueue original, ActiveMQQueue forward, int
total) {
+            this.original = original;
+            this.forward = forward;
+            this.blockSize = total / PARALLEL;
+        }
+
+        public boolean isFailed() {
+            return failed;
+        }
+
+        @Override
+        public void run() {
+            Connection connection = null;
+            try {
+
+                for (int index = 0; index < PARALLEL; index++) {
+
+                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+
+                    connection = factory.createConnection();
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageConsumer consumer = session.createConsumer(original);
+                    MessageProducer producer = session.createProducer(forward);
+                    connection.start();
+                    int count = 0;
+
+                    while (count < blockSize) {
+
+                        Message msg1 = consumer.receive(10000);
+                        if (msg1 != null) {
+                            if (msg1 instanceof ActiveMQTextMessage) {
+                                if (count % 100 == 0) {
+                                    LOG.info("Consuming -> " + ((ActiveMQTextMessage)
msg1).getDestination() + " count=" + count);
+                                }
+
+                                producer.send(msg1);
+
+                                count++;
+                            } else {
+                                LOG.info("Skipping unknown msg type " + msg1);
+                            }
+                        } else {
+                            break;
+                        }
+                    }
+
+                    LOG.info("[" + original.getQueueName() + "] completed segment (" + index
+ " of " + blockSize + ")");
+                    connection.close();
+                }
+            } catch (Exception e) {
+                LOG.warn("Caught unexpected exception: {}", e);
+            } finally {
+                LOG.debug(getName() + ": is stopping");
+                try {
+                    connection.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    public class ConsumerThread extends Thread {
+
+        private final String uri;
+        private final ActiveMQQueue dest;
+        private int blockSize = 0;
+        private final int PARALLEL = 1;
+        private boolean failed;
+
+        public ConsumerThread(String uri, ActiveMQQueue dest, int total) {
+            this.uri = uri;
+            this.dest = dest;
+            this.blockSize = total / PARALLEL;
+        }
+
+        public boolean isFailed() {
+            return failed;
+        }
+
+        @Override
+        public void run() {
+            Connection connection = null;
+            try {
+
+                for (int index = 0; index < PARALLEL; index++) {
+
+                    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
+
+                    connection = factory.createConnection();
+                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    MessageConsumer consumer = session.createConsumer(dest);
+                    connection.start();
+                    int count = 0;
+
+                    while (count < blockSize) {
+
+                        Object msg1 = consumer.receive(10000);
+                        if (msg1 != null) {
+                            if (msg1 instanceof ActiveMQTextMessage) {
+                                if (count % 100 == 0) {
+                                    LOG.info("Consuming -> " + ((ActiveMQTextMessage)
msg1).getDestination() + " count=" + count);
+                                }
+
+                                count++;
+                            } else {
+                                LOG.info("Skipping unknown msg type " + msg1);
+                            }
+                        } else {
+                            failed = true;
+                            break;
+                        }
+                    }
+
+                    LOG.info("[" + dest.getQueueName() + "] completed segment (" + index
+ " of " + blockSize + ")");
+                    connection.close();
+                }
+            } catch (Exception e) {
+                LOG.warn("Caught unexpected exception: {}", e);
+            } finally {
+                LOG.debug(getName() + ": is stopping");
+                try {
+                    connection.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message