activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [39/52] [abbrv] [partial] activemq-artemis git commit: ARTEMIS-127 Adding activemq unit test module to Artemis
Date Tue, 09 Jun 2015 16:37:10 GMT
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java
new file mode 100644
index 0000000..6145f3b
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FTPTestSupport.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.util.IOHelper;
+import org.apache.ftpserver.FtpServer;
+import org.apache.ftpserver.FtpServerFactory;
+import org.apache.ftpserver.ftplet.Authority;
+import org.apache.ftpserver.ftplet.UserManager;
+import org.apache.ftpserver.listener.ListenerFactory;
+import org.apache.ftpserver.usermanager.PropertiesUserManagerFactory;
+import org.apache.ftpserver.usermanager.impl.BaseUser;
+import org.apache.ftpserver.usermanager.impl.WritePermission;
+import org.jmock.Mockery;
+
+public abstract class FTPTestSupport extends EmbeddedBrokerTestSupport {
+    
+    protected static final String ftpServerListenerName = "default";
+    protected Connection connection;
+    protected FtpServer server;
+    String userNamePass = "activemq";
+
+    Mockery context = null;
+    String ftpUrl;
+    int ftpPort;
+    
+    final File ftpHomeDirFile = new File("target/FTPBlobTest/ftptest");
+    
+    protected void setUp() throws Exception {
+        
+        if (ftpHomeDirFile.getParentFile().exists()) {
+            IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
+        }
+        ftpHomeDirFile.mkdirs();
+        ftpHomeDirFile.getParentFile().deleteOnExit();
+
+        FtpServerFactory serverFactory = new FtpServerFactory();
+        ListenerFactory factory = new ListenerFactory();
+
+        PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
+        UserManager userManager = userManagerFactory.createUserManager();
+
+        BaseUser user = new BaseUser();
+        user.setName("activemq");
+        user.setPassword("activemq");
+        user.setHomeDirectory(ftpHomeDirFile.getParent());
+        
+        // authorize user
+        List<Authority> auths = new ArrayList<Authority>();
+        Authority auth = new WritePermission();
+        auths.add(auth);
+        user.setAuthorities(auths);
+        
+        userManager.save(user);
+
+        BaseUser guest = new BaseUser();
+        guest.setName("guest");
+        guest.setPassword("guest");
+        guest.setHomeDirectory(ftpHomeDirFile.getParent());
+        
+        userManager.save(guest);
+        
+        serverFactory.setUserManager(userManager);
+        factory.setPort(0);
+        serverFactory.addListener(ftpServerListenerName, factory
+                .createListener());
+        server = serverFactory.createServer();
+        server.start();
+        ftpPort = serverFactory.getListener(ftpServerListenerName)
+                .getPort();
+        super.setUp();
+    }
+    
+    public void setConnection() throws Exception {
+        ftpUrl = "ftp://"
+            + userNamePass
+            + ":"
+            + userNamePass
+            + "@localhost:"
+            + ftpPort
+            + "/ftptest/";
+        bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + ftpUrl;
+        
+        connectionFactory = createConnectionFactory();
+        
+        connection = createConnection();
+        connection.start();        
+    }
+    
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.stop();
+        }
+        super.tearDown();
+        if (server != null) {
+            server.stop();
+        }
+        IOHelper.deleteFile(ftpHomeDirFile.getParentFile());
+    }
+
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
new file mode 100644
index 0000000..1754689
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/blob/FilesystemBlobTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.blob;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.BlobMessage;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQBlobMessage;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class FilesystemBlobTest extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(FilesystemBlobTest.class);
+
+    private Connection connection;
+    private final String tmpDir =  System.getProperty("user.dir") + "/target/FilesystemBlobTest";
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        // replace \ with / to let it work on windows too
+        String fileUrl = "file:///" +tmpDir.replaceAll("\\\\", "/");
+        LOG.info("Using file: " + fileUrl);
+        bindAddress = "vm://localhost?jms.blobTransferPolicy.defaultUploadUrl=" + fileUrl;
+
+        connectionFactory = createConnectionFactory();
+
+        connection = createConnection();
+        connection.start();
+    }
+
+    public void testBlobFile() throws Exception {
+        // first create Message
+        File file = File.createTempFile("amq-data-file-", ".dat");
+        // lets write some data
+        String content = "hello world " + System.currentTimeMillis();
+        BufferedWriter writer = new BufferedWriter(new FileWriter(file));
+        writer.append(content);
+        writer.close();
+
+        ActiveMQSession session = (ActiveMQSession) connection.createSession(
+                false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        MessageConsumer consumer = session.createConsumer(destination);
+        BlobMessage message = session.createBlobMessage(file);
+
+        producer.send(message);
+        Thread.sleep(1000);
+
+        // check message send
+        Message msg = consumer.receive(1000);
+        assertTrue(msg instanceof ActiveMQBlobMessage);
+
+        InputStream input = ((ActiveMQBlobMessage) msg).getInputStream();
+        StringBuilder b = new StringBuilder();
+        int i = input.read();
+        while (i != -1) {
+            b.append((char) i);
+            i = input.read();
+        }
+        input.close();
+        File uploaded = new File(tmpDir, msg.getJMSMessageID().toString().replace(":", "_"));
+        assertEquals(content, b.toString());
+        assertTrue(uploaded.exists());
+        ((ActiveMQBlobMessage)msg).deleteFile();
+        assertFalse(uploaded.exists());
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.stop();
+        }
+        super.tearDown();
+
+        IOHelper.deleteFile(new File(tmpDir));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
new file mode 100644
index 0000000..e810f92
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implements the test case attached to:
+ * https://issues.apache.org/jira/browse/AMQ-4351
+ *
+ * This version avoids the spring deps.
+ */
+public class AMQ4351Test extends BrokerTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4351Test.class);
+
+    public static Test suite() {
+        return suite(AMQ4351Test.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+
+        // Lets clean up often.
+        broker.setOfflineDurableSubscriberTaskSchedule(500);
+        broker.setOfflineDurableSubscriberTimeout(2000); // lets delete durable subs much
faster.
+
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        EmbeddedDataSource dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc.setDataSource(dataSource);
+
+        jdbc.deleteAllMessages();
+        broker.setPersistenceAdapter(jdbc);
+        return broker;
+    }
+
+    ActiveMQConnectionFactory connectionFactory;
+    ActiveMQTopic destination = new ActiveMQTopic("TEST");
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+    }
+
+    class ProducingClient implements Runnable {
+        final AtomicLong size = new AtomicLong();
+        final AtomicBoolean done = new AtomicBoolean();
+        CountDownLatch doneLatch = new CountDownLatch(1);
+
+        Connection connection;
+        Session session;
+        MessageProducer producer;
+
+        ProducingClient() throws JMSException {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            producer = session.createProducer(destination);
+        }
+
+        private void sendMessage() {
+            try {
+                producer.send(session.createTextMessage("Test"));
+                long i = size.incrementAndGet();
+                if( (i % 1000) == 0 ) {
+                    LOG.info("produced " + i + ".");
+                }
+            } catch (JMSException e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void start() {
+            new Thread(this, "ProducingClient").start();
+        }
+
+        public void stop() throws InterruptedException {
+            done.set(true);
+            if( !doneLatch.await(20, TimeUnit.MILLISECONDS) ) {
+                try {
+                    connection.close();
+                    doneLatch.await();
+                } catch (JMSException e) {
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                try {
+                    while (!done.get()) {
+                        sendMessage();
+                        Thread.sleep(10);
+                    }
+                } finally {
+                    connection.close();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                done.set(true);
+            } finally {
+                doneLatch.countDown();
+            }
+        }
+    }
+
+    class ConsumingClient implements Runnable {
+        final String name;
+        final AtomicLong size = new AtomicLong();
+        final AtomicBoolean done = new AtomicBoolean();
+        CountDownLatch doneLatch = new CountDownLatch(1);
+        CountDownLatch started;
+        CountDownLatch finished;
+
+
+        public ConsumingClient(String name, CountDownLatch started, CountDownLatch finished)
{
+            this.name = name;
+            this.started = started;
+            this.finished = finished;
+        }
+
+        public void start() {
+            LOG.info("Starting JMS listener " + name);
+            new Thread(this, "ConsumingClient: "+name).start();
+        }
+
+        public void stopAsync() {
+            finished.countDown();
+            done.set(true);
+        }
+
+        public void stop() throws InterruptedException {
+            stopAsync();
+            doneLatch.await();
+        }
+
+        @Override
+        public void run() {
+            try {
+                Connection connection = connectionFactory.createConnection();
+                connection.setClientID(name);
+                connection.start();
+                try {
+                    Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                    MessageConsumer consumer = session.createDurableSubscriber(destination,
name, null, false);
+                    started.countDown();
+                    while( !done.get() ) {
+                        Message msg = consumer.receive(100);
+                        if(msg!=null ) {
+                            size.incrementAndGet();
+                            session.commit();
+                        }
+                    }
+                } finally {
+                    connection.close();
+                    LOG.info("Stopped JMS listener " + name);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                done.set(true);
+            } finally {
+                doneLatch.countDown();
+            }
+        }
+
+    }
+
+    public void testAMQ4351() throws InterruptedException, JMSException {
+        LOG.info("Start test.");
+        int subs = 100;
+        CountDownLatch startedLatch = new CountDownLatch(subs - 1);
+        CountDownLatch shutdownLatch = new CountDownLatch(subs - 4);
+
+
+        ProducingClient producer = new ProducingClient();
+        ConsumingClient listener1 = new ConsumingClient("subscriber-1", startedLatch, shutdownLatch);
+        ConsumingClient listener2 = new ConsumingClient("subscriber-2", startedLatch, shutdownLatch);
+        ConsumingClient listener3 = new ConsumingClient("subscriber-3", startedLatch, shutdownLatch);
+        try {
+
+            listener1.start();
+            listener2.start();
+            listener3.start();
+
+            List<ConsumingClient> subscribers = new ArrayList<ConsumingClient>(subs);
+            for (int i = 4; i < subs; i++) {
+                ConsumingClient client = new ConsumingClient("subscriber-" + i, startedLatch,
shutdownLatch);
+                subscribers.add(client);
+                client.start();
+            }
+            startedLatch.await(10, TimeUnit.SECONDS);
+
+            LOG.info("All subscribers started.");
+            producer.sendMessage();
+
+            LOG.info("Stopping 97 subscribers....");
+            for (ConsumingClient client : subscribers) {
+                client.stopAsync();
+            }
+            shutdownLatch.await(10, TimeUnit.SECONDS);
+
+            // Start producing messages for 10 minutes, at high rate
+            LOG.info("Starting mass message producer...");
+            producer.start();
+
+            long lastSize = listener1.size.get();
+            for( int i=0 ; i < 10; i++ ) {
+                Thread.sleep(1000);
+                long size = listener1.size.get();
+                LOG.info("Listener 1: consumed: "+(size - lastSize));
+                assertTrue( size > lastSize );
+                lastSize = size;
+            }
+        } finally {
+            LOG.info("Stopping clients");
+            listener1.stop();
+            listener2.stop();
+            listener3.stop();
+            producer.stop();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java
new file mode 100644
index 0000000..3e154f9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerBenchmark.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BrokerBenchmark is used to get an idea of the raw performance of a broker.
+ * Since the broker data structures using in message dispatching are under high
+ * contention from client requests, it's performance should be monitored closely
+ * since it typically is the biggest bottleneck in a high performance messaging
+ * fabric. The benchmarks are run under all the following combinations options:
+ * Queue vs. Topic, 1 vs. 10 producer threads, 1 vs. 10 consumer threads, and
+ * Persistent vs. Non-Persistent messages. Message Acking uses client ack style
+ * batch acking since that typically has the best ack performance.
+ * 
+ * 
+ */
+public class BrokerBenchmark extends BrokerTestSupport {
+    private static final transient Logger LOG = LoggerFactory.getLogger(BrokerBenchmark.class);
+
+    public int produceCount = Integer.parseInt(System.getProperty("PRODUCE_COUNT", "10000"));
+    public ActiveMQDestination destination;
+    public int prodcuerCount;
+    public int consumerCount;
+    public boolean deliveryMode;
+
+    public void initCombosForTestPerformance() {
+        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new
ActiveMQTopic("TEST")});
+        addCombinationValues("PRODUCER_COUNT", new Object[] {new Integer("1"), new Integer("10")});
+        addCombinationValues("CONSUMER_COUNT", new Object[] {new Integer("1"), new Integer("10")});
+        addCombinationValues("CONSUMER_COUNT", new Object[] {new Integer("1"), new Integer("10")});
+        addCombinationValues("deliveryMode", new Object[] {Boolean.TRUE});
+    }
+
+    public void testPerformance() throws Exception {
+
+        LOG.info("Running Benchmark for destination=" + destination + ", producers=" + prodcuerCount
+ ", consumers=" + consumerCount + ", deliveryMode=" + deliveryMode);
+        final int consumeCount = destination.isTopic() ? consumerCount * produceCount : produceCount;
+
+        final Semaphore consumersStarted = new Semaphore(1 - consumerCount);
+        final Semaphore producersFinished = new Semaphore(1 - prodcuerCount);
+        final Semaphore consumersFinished = new Semaphore(1 - consumerCount);
+        final ProgressPrinter printer = new ProgressPrinter(produceCount + consumeCount,
10);
+
+        // Start a producer and consumer
+
+        profilerPause("Benchmark ready.  Start profiler ");
+
+        long start = System.currentTimeMillis();
+
+        final AtomicInteger receiveCounter = new AtomicInteger(0);
+        for (int i = 0; i < consumerCount; i++) {
+            new Thread() {
+                public void run() {
+                    try {
+
+                        // Consume the messages
+                        StubConnection connection = new StubConnection(broker);
+                        ConnectionInfo connectionInfo = createConnectionInfo();
+                        connection.send(connectionInfo);
+
+                        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+                        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
+                        consumerInfo.setPrefetchSize(1000);
+                        connection.send(sessionInfo);
+                        connection.send(consumerInfo);
+
+                        consumersStarted.release();
+
+                        while (receiveCounter.get() < consumeCount) {
+
+                            int counter = 0;
+                            // Get a least 1 message.
+                            Message msg = receiveMessage(connection, 2000);
+                            if (msg != null) {
+                                printer.increment();
+                                receiveCounter.incrementAndGet();
+
+                                counter++;
+
+                                // Try to piggy back a few extra message acks if
+                                // they are ready.
+                                Message extra = null;
+                                while ((extra = receiveMessage(connection, 0)) != null) {
+                                    msg = extra;
+                                    printer.increment();
+                                    receiveCounter.incrementAndGet();
+                                    counter++;
+                                }
+                            }
+
+                            if (msg != null) {
+                                connection.send(createAck(consumerInfo, msg, counter, MessageAck.STANDARD_ACK_TYPE));
+                            } else if (receiveCounter.get() < consumeCount) {
+                                LOG.info("Consumer stall, waiting for message #" + receiveCounter.get()
+ 1);
+                            }
+                        }
+
+                        connection.send(closeConsumerInfo(consumerInfo));
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    } finally {
+                        consumersFinished.release();
+                    }
+                }
+
+            }.start();
+        }
+
+        // Make sure that the consumers are started first to avoid sending
+        // messages
+        // before a topic is subscribed so that those messages are not missed.
+        consumersStarted.acquire();
+
+        // Send the messages in an async thread.
+        for (int i = 0; i < prodcuerCount; i++) {
+            new Thread() {
+                public void run() {
+                    try {
+                        StubConnection connection = new StubConnection(broker);
+                        ConnectionInfo connectionInfo = createConnectionInfo();
+                        connection.send(connectionInfo);
+
+                        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+                        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+                        connection.send(sessionInfo);
+                        connection.send(producerInfo);
+
+                        for (int i = 0; i < produceCount / prodcuerCount; i++) {
+                            Message message = createMessage(producerInfo, destination);
+                            message.setPersistent(deliveryMode);
+                            message.setResponseRequired(false);
+                            connection.send(message);
+                            printer.increment();
+                        }
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    } finally {
+                        producersFinished.release();
+                    }
+                };
+            }.start();
+        }
+
+        producersFinished.acquire();
+        long end1 = System.currentTimeMillis();
+        consumersFinished.acquire();
+        long end2 = System.currentTimeMillis();
+
+        LOG.info("Results for destination=" + destination + ", producers=" + prodcuerCount
+ ", consumers=" + consumerCount + ", deliveryMode=" + deliveryMode);
+        LOG.info("Produced at messages/sec: " + (produceCount * 1000.0 / (end1 - start)));
+        LOG.info("Consumed at messages/sec: " + (consumeCount * 1000.0 / (end2 - start)));
+        profilerPause("Benchmark done.  Stop profiler ");
+    }
+
+    public static Test suite() {
+        return suite(BrokerBenchmark.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
new file mode 100644
index 0000000..1fc7a6a
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRedeliveryTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.broker.util.RedeliveryPlugin;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
+
+    static final Logger LOG = LoggerFactory.getLogger(BrokerRedeliveryTest.class);
+    BrokerService broker = null;
+
+    final ActiveMQQueue destination = new ActiveMQQueue("Redelivery");
+    final String data = "hi";
+    final long redeliveryDelayMillis = 2000;
+    long initialRedeliveryDelayMillis = 4000;
+    int maxBrokerRedeliveries = 2;
+
+    public void testScheduledRedelivery() throws Exception {
+        doTestScheduledRedelivery(maxBrokerRedeliveries, true);
+    }
+
+    public void testInfiniteRedelivery() throws Exception {
+        initialRedeliveryDelayMillis = redeliveryDelayMillis;
+        maxBrokerRedeliveries = RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES;
+        doTestScheduledRedelivery(RedeliveryPolicy.DEFAULT_MAXIMUM_REDELIVERIES + 1, false);
+    }
+
+    public void doTestScheduledRedelivery(int maxBrokerRedeliveriesToValidate, boolean validateDLQ)
throws Exception {
+
+        startBroker(true);
+        sendMessage(0);
+
+        ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        redeliveryPolicy.setInitialRedeliveryDelay(0);
+        redeliveryPolicy.setMaximumRedeliveries(0);
+        consumerConnection.setRedeliveryPolicy(redeliveryPolicy);
+        consumerConnection.start();
+        Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+        Message message = consumer.receive(1000);
+        assertNotNull("got message", message);
+        LOG.info("got: " + message);
+        consumerSession.rollback();
+
+        for (int i=0;i<maxBrokerRedeliveriesToValidate;i++) {
+            Message shouldBeNull = consumer.receive(500);
+            assertNull("did not get message after redelivery count exceeded: " + shouldBeNull,
shouldBeNull);
+
+            TimeUnit.SECONDS.sleep(3);
+
+            Message brokerRedeliveryMessage = consumer.receive(500);
+            LOG.info("got: " + brokerRedeliveryMessage);
+            assertNotNull("got message via broker redelivery after delay", brokerRedeliveryMessage);
+            assertEquals("message matches", message.getStringProperty("data"), brokerRedeliveryMessage.getStringProperty("data"));
+            assertEquals("has expiryDelay specified", i == 0 ? initialRedeliveryDelayMillis
: redeliveryDelayMillis, brokerRedeliveryMessage.getLongProperty(RedeliveryPlugin.REDELIVERY_DELAY));
+
+            consumerSession.rollback();
+        }
+
+        if (validateDLQ) {
+            MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+            Message dlqMessage = dlqConsumer.receive(2000);
+            assertNotNull("Got message from dql", dlqMessage);
+            assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
+            consumerSession.commit();
+        } else {
+            // consume/commit ok
+            message = consumer.receive(3000);
+            assertNotNull("got message", message);
+            assertEquals("redeliveries accounted for", maxBrokerRedeliveriesToValidate +
2, message.getLongProperty("JMSXDeliveryCount"));
+            consumerSession.commit();
+        }
+
+        consumerConnection.close();
+    }
+
+    public void testNoScheduledRedeliveryOfExpired() throws Exception {
+        startBroker(true);
+        ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
+        consumerConnection.start();
+        Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = consumerSession.createConsumer(destination);
+        sendMessage(1500);
+        Message message = consumer.receive(1000);
+        assertNotNull("got message", message);
+
+        // ensure there is another consumer to redispatch to
+        MessageConsumer redeliverConsumer = consumerSession.createConsumer(destination);
+
+        // allow consumed to expire so it gets redelivered
+        TimeUnit.SECONDS.sleep(2);
+        consumer.close();
+
+        // should go to dlq as it has expired
+        // validate DLQ
+        MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+        Message dlqMessage = dlqConsumer.receive(2000);
+        assertNotNull("Got message from dql", dlqMessage);
+        assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
+    }
+
+    private void sendMessage(int timeToLive) throws Exception {
+        ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
+        producerConnection.start();
+        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(destination);
+        if (timeToLive > 0) {
+            producer.setTimeToLive(timeToLive);
+        }
+        Message message = producerSession.createMessage();
+        message.setStringProperty("data", data);
+        producer.send(message);
+        producerConnection.close();
+    }
+
+    private void startBroker(boolean deleteMessages) throws Exception {
+        broker = new BrokerService();
+        broker.setSchedulerSupport(true);
+
+
+        RedeliveryPlugin redeliveryPlugin = new RedeliveryPlugin();
+
+        RedeliveryPolicy brokerRedeliveryPolicy = new RedeliveryPolicy();
+        brokerRedeliveryPolicy.setRedeliveryDelay(redeliveryDelayMillis);
+        brokerRedeliveryPolicy.setInitialRedeliveryDelay(initialRedeliveryDelayMillis);
+        brokerRedeliveryPolicy.setMaximumRedeliveries(maxBrokerRedeliveries);
+
+        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
+        redeliveryPolicyMap.setDefaultEntry(brokerRedeliveryPolicy);
+        redeliveryPlugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
+
+        broker.setPlugins(new BrokerPlugin[]{redeliveryPlugin});
+
+        if (deleteMessages) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+        broker.start();
+    }
+
+
+    private void stopBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+        broker = null;
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        stopBroker();
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
new file mode 100644
index 0000000..c4e3848
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.activemq.util.IOHelper;
+
+public class BrokerRestartTestSupport extends BrokerTestSupport {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        File dir = broker.getBrokerDataDirectory();
+        if (dir != null) {
+            IOHelper.deleteChildren(dir);
+        }
+        broker.setDeleteAllMessagesOnStartup(true);
+        configureBroker(broker);
+        return broker;
+    }
+
+    /**
+     * @return
+     * @throws Exception
+     */
+    protected BrokerService createRestartedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        configureBroker(broker);
+        return broker;
+    }
+
+    protected void configureBroker(BrokerService broker) throws Exception {
+         broker.setDestinationPolicy(policyMap);
+    }
+
+    /**
+     * Simulates a broker restart. The memory based persistence adapter is
+     * reused so that it does not "loose" it's "persistent" messages.
+     * 
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    protected void restartBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
new file mode 100644
index 0000000..9d55d04
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.TestCase;
+import org.apache.activemq.network.NetworkConnector;
+
+/**
+ * Tests for the BrokerService class
+ * 
+ * @author chirino
+ */
+public class BrokerServiceTest extends TestCase {
+
+    public void testAddRemoveTransportsWithJMX() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setUseJmx(true);
+        service.setPersistent(false);
+        TransportConnector connector = service.addConnector("tcp://localhost:0");
+        service.start();
+
+        service.removeConnector(connector);
+        connector.stop();
+        service.stop();
+    }
+
+    public void testAddRemoveTransportsWithoutJMX() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+        TransportConnector connector = service.addConnector("tcp://localhost:0");
+        service.start();
+
+        service.removeConnector(connector);
+        connector.stop();
+        service.stop();
+    }
+
+    public void testAddRemoveNetworkWithJMX() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(true);
+        NetworkConnector connector = service.addNetworkConnector("multicast://default?group=group-"+System.currentTimeMillis());
+        service.start();
+
+        service.removeNetworkConnector(connector);
+        connector.stop();
+        service.stop();
+    }
+
+    public void testAddRemoveNetworkWithoutJMX() throws Exception {
+        BrokerService service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+        NetworkConnector connector = service.addNetworkConnector("multicast://default?group=group-"+System.currentTimeMillis());
+        service.start();
+
+        service.removeNetworkConnector(connector);
+        connector.stop();
+        service.stop();
+    }
+    
+    public void testSystemUsage()
+    {
+        BrokerService service = new BrokerService();
+        assertEquals( 1024 * 1024 * 1024, service.getSystemUsage().getMemoryUsage().getLimit()
);
+        assertEquals( 1024L * 1024 * 1024 * 50, service.getSystemUsage().getTempUsage().getLimit()
);
+        assertEquals( 1024L * 1024 * 1024 * 100, service.getSystemUsage().getStoreUsage().getLimit()
);
+    }
+}


Mime
View raw message