activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r379619 [26/30] - in /incubator/activemq/trunk: ./ activecluster/ activecluster/src/java/org/apache/activecluster/ activecluster/src/java/org/apache/activecluster/election/ activecluster/src/java/org/apache/activecluster/election/impl/ acti...
Date Tue, 21 Feb 2006 23:14:17 GMT
Modified: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageUsingActivemqXMLTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageUsingActivemqXMLTest.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageUsingActivemqXMLTest.java (original)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageUsingActivemqXMLTest.java Tue Feb 21 15:12:56 2006
@@ -1,113 +1,113 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.xbean.BrokerFactoryBean;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
-
-import java.io.File;
-
-/**
- *
- * Test Publish/Consume queue using the release activemq.xml configuration file
- *
- * @version $Revision: 1.2 $
- */
-public class PublishOnQueueConsumedMessageUsingActivemqXMLTest extends PublishOnTopicConsumedMessageTest {
-    protected static final String JOURNAL_ROOT = "../data/";
-    BrokerService broker;
-
-
-
-     /**
-     * Use the transportConnector uri configured on the activemq.xml
-     *
-     * @return ActiveMQConnectionFactory
-     * @throws Exception
-     */
-    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("tcp://localhost:61616");
-    }
-
-
-    /**
-     * Sets up a test where the producer and consumer have their own connection.
-     *
-     * @see junit.framework.TestCase#setUp()
-     */
-    protected void setUp() throws Exception {
-        ;
-        File journalFile = new File(JOURNAL_ROOT);
-        recursiveDelete(journalFile);
-        // Create broker from resource
-        System.out.print("Creating broker... ");
-        broker = createBroker("org/apache/activemq/usecases/activemq.xml");
-        System.out.println("Success");
-        super.setUp();
-
-    }
-
-
-
-     /*
-     * Stops the Broker
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-         System.out.println("Closing Broker");
-            if (broker != null) {
-               broker.stop();
-            }
-         System.out.println("Broker closed...");
-
-
-    }
-
-
-    /*
-     * clean up the journal
-     */
-
-    protected static void recursiveDelete(File file) {
-        if( file.isDirectory() ) {
-            File[] files = file.listFiles();
-            for (int i = 0; i < files.length; i++) {
-                recursiveDelete(files[i]);
-            }
-        }
-        file.delete();
-    }
-
-    protected BrokerService createBroker(String resource) throws Exception {
-        return createBroker(new ClassPathResource(resource));
-    }
-
-    protected BrokerService createBroker(Resource resource) throws Exception {
-        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
-        factory.afterPropertiesSet();
-
-        BrokerService broker = factory.getBroker();
-
-        //assertTrue("Should have a broker!", broker != null);
-
-
-        return broker;
-    }
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+import java.io.File;
+
+/**
+ *
+ * Test Publish/Consume queue using the release activemq.xml configuration file
+ *
+ * @version $Revision: 1.2 $
+ */
+public class PublishOnQueueConsumedMessageUsingActivemqXMLTest extends PublishOnTopicConsumedMessageTest {
+    protected static final String JOURNAL_ROOT = "../data/";
+    BrokerService broker;
+
+
+
+     /**
+     * Use the transportConnector uri configured on the activemq.xml
+     *
+     * @return ActiveMQConnectionFactory
+     * @throws Exception
+     */
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("tcp://localhost:61616");
+    }
+
+
+    /**
+     * Sets up a test where the producer and consumer have their own connection.
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        ;
+        File journalFile = new File(JOURNAL_ROOT);
+        recursiveDelete(journalFile);
+        // Create broker from resource
+        System.out.print("Creating broker... ");
+        broker = createBroker("org/apache/activemq/usecases/activemq.xml");
+        System.out.println("Success");
+        super.setUp();
+
+    }
+
+
+
+     /*
+     * Stops the Broker
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+         System.out.println("Closing Broker");
+            if (broker != null) {
+               broker.stop();
+            }
+         System.out.println("Broker closed...");
+
+
+    }
+
+
+    /*
+     * clean up the journal
+     */
+
+    protected static void recursiveDelete(File file) {
+        if( file.isDirectory() ) {
+            File[] files = file.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                recursiveDelete(files[i]);
+            }
+        }
+        file.delete();
+    }
+
+    protected BrokerService createBroker(String resource) throws Exception {
+        return createBroker(new ClassPathResource(resource));
+    }
+
+    protected BrokerService createBroker(Resource resource) throws Exception {
+        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+
+        BrokerService broker = factory.getBroker();
+
+        //assertTrue("Should have a broker!", broker != null);
+
+
+        return broker;
+    }
+}

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnQueueConsumedMessageUsingActivemqXMLTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnTemporaryQueueConsumedMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumedMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumerMessageUsingActivemqXMLTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumerMessageUsingActivemqXMLTest.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumerMessageUsingActivemqXMLTest.java (original)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumerMessageUsingActivemqXMLTest.java Tue Feb 21 15:12:56 2006
@@ -1,115 +1,115 @@
-/**
- *
- * Copyright 2005 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.xbean.BrokerFactoryBean;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
-import java.io.File;
-
-/**
- *
- * Test Publish/Consume topic  using the release activemq.xml configuration file
- *
- * @version $Revision: 1.2 $
- */
-public class PublishOnTopicConsumerMessageUsingActivemqXMLTest extends PublishOnTopicConsumedMessageTest {
-    protected static final String JOURNAL_ROOT = "../data/";
-    BrokerService broker;
-
-
-
-     /**
-     * Use the transportConnector uri configured on the activemq.xml
-     *
-     * @return ActiveMQConnectionFactory
-     * @throws Exception
-     */
-    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
-        return new ActiveMQConnectionFactory("tcp://localhost:61616");
-    }
-
-
-    /**
-     * Sets up a test where the producer and consumer have their own connection.
-     *
-     * @see junit.framework.TestCase#setUp()
-     */
-    protected void setUp() throws Exception {
-        ;
-        File journalFile = new File(JOURNAL_ROOT);
-        recursiveDelete(journalFile);
-        // Create broker from resource
-        System.out.print("Creating broker... ");
-        broker = createBroker("org/apache/activemq/usecases/activemq.xml");
-        System.out.println("Success");
-        super.setUp();
-
-    }
-
-
-
-     /*
-     * Stops the Broker
-     * @see junit.framework.TestCase#tearDown()
-     */
-    protected void tearDown() throws Exception {
-         System.out.println("Closing Broker");
-            if (broker != null) {
-               broker.stop();
-            }
-         System.out.println("Broker closed...");
-
-
-    }
-
-
-
-
-
-    /*
-     * clean up the journal
-     */
-
-    protected static void recursiveDelete(File file) {
-        if( file.isDirectory() ) {
-            File[] files = file.listFiles();
-            for (int i = 0; i < files.length; i++) {
-                recursiveDelete(files[i]);
-            }
-        }
-        file.delete();
-    }
-
-    protected BrokerService createBroker(String resource) throws Exception {
-        return createBroker(new ClassPathResource(resource));
-    }
-
-    protected BrokerService createBroker(Resource resource) throws Exception {
-        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
-        factory.afterPropertiesSet();
-
-        BrokerService broker = factory.getBroker();
-
-        //assertTrue("Should have a broker!", broker != null);
-
-
-        return broker;
-    }
-}
+/**
+ *
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+import java.io.File;
+
+/**
+ *
+ * Test Publish/Consume topic  using the release activemq.xml configuration file
+ *
+ * @version $Revision: 1.2 $
+ */
+public class PublishOnTopicConsumerMessageUsingActivemqXMLTest extends PublishOnTopicConsumedMessageTest {
+    protected static final String JOURNAL_ROOT = "../data/";
+    BrokerService broker;
+
+
+
+     /**
+     * Use the transportConnector uri configured on the activemq.xml
+     *
+     * @return ActiveMQConnectionFactory
+     * @throws Exception
+     */
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory("tcp://localhost:61616");
+    }
+
+
+    /**
+     * Sets up a test where the producer and consumer have their own connection.
+     *
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        ;
+        File journalFile = new File(JOURNAL_ROOT);
+        recursiveDelete(journalFile);
+        // Create broker from resource
+        System.out.print("Creating broker... ");
+        broker = createBroker("org/apache/activemq/usecases/activemq.xml");
+        System.out.println("Success");
+        super.setUp();
+
+    }
+
+
+
+     /*
+     * Stops the Broker
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+         System.out.println("Closing Broker");
+            if (broker != null) {
+               broker.stop();
+            }
+         System.out.println("Broker closed...");
+
+
+    }
+
+
+
+
+
+    /*
+     * clean up the journal
+     */
+
+    protected static void recursiveDelete(File file) {
+        if( file.isDirectory() ) {
+            File[] files = file.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                recursiveDelete(files[i]);
+            }
+        }
+        file.delete();
+    }
+
+    protected BrokerService createBroker(String resource) throws Exception {
+        return createBroker(new ClassPathResource(resource));
+    }
+
+    protected BrokerService createBroker(Resource resource) throws Exception {
+        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+
+        BrokerService broker = factory.getBroker();
+
+        //assertTrue("Should have a broker!", broker != null);
+
+
+        return broker;
+    }
+}

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/PublishOnTopicConsumerMessageUsingActivemqXMLTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/QueueConsumerCloseAndReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/QueueDuplicatesTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/QueueRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/StartAndStopBrokerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/SubscribeClosePublishThenConsumeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkUsingTcpTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkUsingTcpTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TopicRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TransactionRollbackOrderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TransactionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TransientQueueRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMessageNotSentToRemoteWhenNoConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMulticastQueueTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMulticastQueueTest.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMulticastQueueTest.java (original)
+++ incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMulticastQueueTest.java Tue Feb 21 15:12:56 2006
@@ -1,269 +1,269 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.CombinationTestSupport;
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.xbean.XBeanBrokerFactory;
-
-import java.net.URI;
-import java.util.Arrays;
-
-import junit.framework.Test;
-
-import javax.jms.Destination;
-import javax.jms.ConnectionFactory;
-import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-public class TwoBrokerMulticastQueueTest extends CombinationTestSupport {
-
-    public static Test suite() {
-        return suite(TwoBrokerMulticastQueueTest.class);
-    }
-
-    public static void main(String[] args) {
-        junit.textui.TestRunner.run(suite());
-    }
-
-    public static final int MESSAGE_COUNT  = 100;
-    public static final int BROKER_COUNT   = 2;
-    public static final int CONSUMER_COUNT = 20;
-
-    private BrokerService[] brokers;
-    public String sendUri, recvUri;
-
-    public void setUp() throws Exception {
-        super.setAutoFail(true);
-        super.setUp();
-    }
-
-    public void tearDown() throws Exception {
-        for (int i=0; i<BROKER_COUNT; i++) {
-            brokers[i].stop();
-        }
-        super.tearDown();
-    }
-
-    private void doSendReceiveTest() throws Exception {
-        Destination dest = new ActiveMQQueue("TEST.FOO");
-
-        ConnectionFactory sendFactory = createConnectionFactory(sendUri);
-
-        Connection conn = createConnection(sendFactory);
-        sendMessages(conn, dest, MESSAGE_COUNT);
-
-        Thread.sleep(500);
-
-        ConnectionFactory recvFactory = createConnectionFactory(recvUri);
-        assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0));
-    }
-
-    private void doMultipleConsumersConnectTest() throws Exception {
-        Destination dest = new ActiveMQQueue("TEST.FOO");
-
-        ConnectionFactory sendFactory = createConnectionFactory(sendUri);
-
-        Connection conn = createConnection(sendFactory);
-        sendMessages(conn, dest, MESSAGE_COUNT);
-
-        Thread.sleep(500);
-
-        ConnectionFactory recvFactory = createConnectionFactory(recvUri);
-        assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0));
-
-        for (int i=0; i<(CONSUMER_COUNT-1); i++) {
-            assertEquals(0, receiveMessages(createConnection(recvFactory), dest, 200));
-        }
-    }
-
-    public void initCombosForTestSendReceive() {
-        addCombinationValues("sendUri", new Object[] {
-            "tcp://localhost:61616", "tcp://localhost:61617"
-        });
-        addCombinationValues("recvUri", new Object[] {
-            "tcp://localhost:61616", "tcp://localhost:61617"
-        });
-    }
-
-    public void testSendReceive() throws Throwable {
-        createMulticastBrokerNetwork();
-        doSendReceiveTest();
-    }
-
-    public void initCombosForTestMultipleConsumersConnect() {
-        addCombinationValues("sendUri", new Object[] {
-            "tcp://localhost:61616", "tcp://localhost:61617",
-        });
-        addCombinationValues("recvUri", new Object[] {
-            "tcp://localhost:61616", "tcp://localhost:61617"
-        });
-    }
-
-    public void testMultipleConsumersConnect() throws Throwable {
-        createMulticastBrokerNetwork();
-        doMultipleConsumersConnectTest();
-    }
-
-    public void testSendReceiveUsingFailover() throws Throwable {
-        sendUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
-        recvUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
-        createMulticastBrokerNetwork();
-        doSendReceiveTest();
-    }
-
-    public void testMultipleConsumersConnectUsingFailover() throws Throwable {
-        sendUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
-        recvUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
-        createMulticastBrokerNetwork();
-        doMultipleConsumersConnectTest();
-    }
-
-    public void testSendReceiveUsingDiscovery() throws Throwable {
-        sendUri = "discovery:multicast://default";
-        recvUri = "discovery:multicast://default";
-        createMulticastBrokerNetwork();
-        doSendReceiveTest();
-    }
-
-    public void testMultipleConsumersConnectUsingDiscovery() throws Throwable {
-        sendUri = "discovery:multicast://default";
-        recvUri = "discovery:multicast://default";
-        createMulticastBrokerNetwork();
-        doMultipleConsumersConnectTest();
-    }
-
-    public void testSendReceiveUsingAutoAssignFailover() throws Throwable {
-        sendUri = "failover:multicast://default";
-        recvUri = "failover:multicast://default";
-        createAutoAssignMulticastBrokerNetwork();
-        doSendReceiveTest();
-    }
-
-    public void testMultipleConsumersConnectUsingAutoAssignFailover() throws Throwable {
-        sendUri = "failover:multicast://default";
-        recvUri = "failover:multicast://default";
-        createAutoAssignMulticastBrokerNetwork();
-        doMultipleConsumersConnectTest();
-    }
-
-    public void testSendReceiveUsingAutoAssignDiscovery() throws Throwable {
-        sendUri = "discovery:multicast://default";
-        recvUri = "discovery:multicast://default";
-        createAutoAssignMulticastBrokerNetwork();
-        doSendReceiveTest();
-    }
-
-    public void testMultipleConsumersConnectUsingAutoAssignDiscovery() throws Throwable {
-        sendUri = "discovery:multicast://default";
-        recvUri = "discovery:multicast://default";
-        createAutoAssignMulticastBrokerNetwork();
-        doMultipleConsumersConnectTest();
-    }
-
-    protected void createMulticastBrokerNetwork() throws Exception {
-
-        brokers = new BrokerService[BROKER_COUNT];
-        for (int i=0; i<BROKER_COUNT; i++) {
-            brokers[i] = createBroker("xbean:multicast-broker-" + (i+1)+ ".xml");
-            brokers[i].start();
-        }
-
-        // Let the brokers discover each other first
-        Thread.sleep(1000);
-    }
-
-    protected void createAutoAssignMulticastBrokerNetwork() throws Exception {
-        brokers = new BrokerService[BROKER_COUNT];
-        for (int i=0; i<BROKER_COUNT; i++) {
-            brokers[i] = createBroker("xbean:multicast-broker-auto.xml");
-            brokers[i].start();
-        }
-
-        // Let the brokers discover each other first
-        Thread.sleep(1000);
-    }
-
-    protected BrokerService createBroker(String uri) throws Exception {
-        return (new XBeanBrokerFactory()).createBroker(new URI(uri));
-    }
-
-    protected ConnectionFactory createConnectionFactory(String uri) {
-        return new ActiveMQConnectionFactory(uri);
-    }
-
-    protected Connection createConnection(ConnectionFactory factory) throws JMSException {
-        Connection conn = factory.createConnection();
-        return conn;
-    }
-
-    protected int receiveMessages(Connection conn, Destination dest, int waitTime) throws JMSException, InterruptedException {
-        conn.start();
-        MessageIdList list = new MessageIdList();
-        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = sess.createConsumer(dest);
-        consumer.setMessageListener(list);
-
-        if (waitTime > 0) {
-            Thread.sleep(waitTime);
-        } else {
-            list.waitForMessagesToArrive(MESSAGE_COUNT);
-        }
-
-        conn.close();
-
-        return list.getMessageCount();
-    }
-
-    protected void sendMessages(Connection conn, Destination dest, int count) throws JMSException {
-        conn.start();
-        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageProducer prod = sess.createProducer(dest);
-
-        for (int i=0; i<count; i++) {
-            prod.send(createTextMessage(sess, "Message " + i, 1024));
-        }
-
-        conn.close();
-    }
-
-    protected TextMessage createTextMessage(Session session, String initText, int messageSize) throws JMSException {
-        TextMessage msg = session.createTextMessage();
-
-        // Pad message text
-        if (initText.length() < messageSize) {
-            char[] data = new char[messageSize - initText.length()];
-            Arrays.fill(data, '*');
-            String str = new String(data);
-            msg.setText(initText + str);
-
-        // Do not pad message text
-        } else {
-            msg.setText(initText);
-        }
-
-        return msg;
-    }
-
-}
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+
+import java.net.URI;
+import java.util.Arrays;
+
+import junit.framework.Test;
+
+import javax.jms.Destination;
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+public class TwoBrokerMulticastQueueTest extends CombinationTestSupport {
+
+    public static Test suite() {
+        return suite(TwoBrokerMulticastQueueTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public static final int MESSAGE_COUNT  = 100;
+    public static final int BROKER_COUNT   = 2;
+    public static final int CONSUMER_COUNT = 20;
+
+    private BrokerService[] brokers;
+    public String sendUri, recvUri;
+
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+    }
+
+    public void tearDown() throws Exception {
+        for (int i=0; i<BROKER_COUNT; i++) {
+            brokers[i].stop();
+        }
+        super.tearDown();
+    }
+
+    private void doSendReceiveTest() throws Exception {
+        Destination dest = new ActiveMQQueue("TEST.FOO");
+
+        ConnectionFactory sendFactory = createConnectionFactory(sendUri);
+
+        Connection conn = createConnection(sendFactory);
+        sendMessages(conn, dest, MESSAGE_COUNT);
+
+        Thread.sleep(500);
+
+        ConnectionFactory recvFactory = createConnectionFactory(recvUri);
+        assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0));
+    }
+
+    private void doMultipleConsumersConnectTest() throws Exception {
+        Destination dest = new ActiveMQQueue("TEST.FOO");
+
+        ConnectionFactory sendFactory = createConnectionFactory(sendUri);
+
+        Connection conn = createConnection(sendFactory);
+        sendMessages(conn, dest, MESSAGE_COUNT);
+
+        Thread.sleep(500);
+
+        ConnectionFactory recvFactory = createConnectionFactory(recvUri);
+        assertEquals(MESSAGE_COUNT, receiveMessages(createConnection(recvFactory), dest, 0));
+
+        for (int i=0; i<(CONSUMER_COUNT-1); i++) {
+            assertEquals(0, receiveMessages(createConnection(recvFactory), dest, 200));
+        }
+    }
+
+    public void initCombosForTestSendReceive() {
+        addCombinationValues("sendUri", new Object[] {
+            "tcp://localhost:61616", "tcp://localhost:61617"
+        });
+        addCombinationValues("recvUri", new Object[] {
+            "tcp://localhost:61616", "tcp://localhost:61617"
+        });
+    }
+
+    public void testSendReceive() throws Throwable {
+        createMulticastBrokerNetwork();
+        doSendReceiveTest();
+    }
+
+    public void initCombosForTestMultipleConsumersConnect() {
+        addCombinationValues("sendUri", new Object[] {
+            "tcp://localhost:61616", "tcp://localhost:61617",
+        });
+        addCombinationValues("recvUri", new Object[] {
+            "tcp://localhost:61616", "tcp://localhost:61617"
+        });
+    }
+
+    public void testMultipleConsumersConnect() throws Throwable {
+        createMulticastBrokerNetwork();
+        doMultipleConsumersConnectTest();
+    }
+
+    public void testSendReceiveUsingFailover() throws Throwable {
+        sendUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
+        recvUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
+        createMulticastBrokerNetwork();
+        doSendReceiveTest();
+    }
+
+    public void testMultipleConsumersConnectUsingFailover() throws Throwable {
+        sendUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
+        recvUri = "failover:tcp://localhost:61616,tcp://localhost:61617";
+        createMulticastBrokerNetwork();
+        doMultipleConsumersConnectTest();
+    }
+
+    public void testSendReceiveUsingDiscovery() throws Throwable {
+        sendUri = "discovery:multicast://default";
+        recvUri = "discovery:multicast://default";
+        createMulticastBrokerNetwork();
+        doSendReceiveTest();
+    }
+
+    public void testMultipleConsumersConnectUsingDiscovery() throws Throwable {
+        sendUri = "discovery:multicast://default";
+        recvUri = "discovery:multicast://default";
+        createMulticastBrokerNetwork();
+        doMultipleConsumersConnectTest();
+    }
+
+    public void testSendReceiveUsingAutoAssignFailover() throws Throwable {
+        sendUri = "failover:multicast://default";
+        recvUri = "failover:multicast://default";
+        createAutoAssignMulticastBrokerNetwork();
+        doSendReceiveTest();
+    }
+
+    public void testMultipleConsumersConnectUsingAutoAssignFailover() throws Throwable {
+        sendUri = "failover:multicast://default";
+        recvUri = "failover:multicast://default";
+        createAutoAssignMulticastBrokerNetwork();
+        doMultipleConsumersConnectTest();
+    }
+
+    public void testSendReceiveUsingAutoAssignDiscovery() throws Throwable {
+        sendUri = "discovery:multicast://default";
+        recvUri = "discovery:multicast://default";
+        createAutoAssignMulticastBrokerNetwork();
+        doSendReceiveTest();
+    }
+
+    public void testMultipleConsumersConnectUsingAutoAssignDiscovery() throws Throwable {
+        sendUri = "discovery:multicast://default";
+        recvUri = "discovery:multicast://default";
+        createAutoAssignMulticastBrokerNetwork();
+        doMultipleConsumersConnectTest();
+    }
+
+    protected void createMulticastBrokerNetwork() throws Exception {
+
+        brokers = new BrokerService[BROKER_COUNT];
+        for (int i=0; i<BROKER_COUNT; i++) {
+            brokers[i] = createBroker("xbean:multicast-broker-" + (i+1)+ ".xml");
+            brokers[i].start();
+        }
+
+        // Let the brokers discover each other first
+        Thread.sleep(1000);
+    }
+
+    protected void createAutoAssignMulticastBrokerNetwork() throws Exception {
+        brokers = new BrokerService[BROKER_COUNT];
+        for (int i=0; i<BROKER_COUNT; i++) {
+            brokers[i] = createBroker("xbean:multicast-broker-auto.xml");
+            brokers[i].start();
+        }
+
+        // Let the brokers discover each other first
+        Thread.sleep(1000);
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        return (new XBeanBrokerFactory()).createBroker(new URI(uri));
+    }
+
+    protected ConnectionFactory createConnectionFactory(String uri) {
+        return new ActiveMQConnectionFactory(uri);
+    }
+
+    protected Connection createConnection(ConnectionFactory factory) throws JMSException {
+        Connection conn = factory.createConnection();
+        return conn;
+    }
+
+    protected int receiveMessages(Connection conn, Destination dest, int waitTime) throws JMSException, InterruptedException {
+        conn.start();
+        MessageIdList list = new MessageIdList();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = sess.createConsumer(dest);
+        consumer.setMessageListener(list);
+
+        if (waitTime > 0) {
+            Thread.sleep(waitTime);
+        } else {
+            list.waitForMessagesToArrive(MESSAGE_COUNT);
+        }
+
+        conn.close();
+
+        return list.getMessageCount();
+    }
+
+    protected void sendMessages(Connection conn, Destination dest, int count) throws JMSException {
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer prod = sess.createProducer(dest);
+
+        for (int i=0; i<count; i++) {
+            prod.send(createTextMessage(sess, "Message " + i, 1024));
+        }
+
+        conn.close();
+    }
+
+    protected TextMessage createTextMessage(Session session, String initText, int messageSize) throws JMSException {
+        TextMessage msg = session.createTextMessage();
+
+        // Pad message text
+        if (initText.length() < messageSize) {
+            char[] data = new char[messageSize - initText.length()];
+            Arrays.fill(data, '*');
+            String str = new String(data);
+            msg.setText(initText + str);
+
+        // Do not pad message text
+        } else {
+            msg.setText(initText);
+        }
+
+        return msg;
+    }
+
+}

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerMulticastQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueSendReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerTopicSendReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerTopicSendReceiveUsingHttpTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerTopicSendReceiveUsingJavaConfigurationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoBrokerTopicSendReceiveUsingTcpTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/assembly/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/jmeter/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/jmeter/src/java/org/activemq/sampler/Consumer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/jmeter/src/java/org/activemq/sampler/ConsumerSysTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/jmeter/src/java/org/activemq/sampler/ConsumerSysTest.java?rev=379619&r1=379618&r2=379619&view=diff
==============================================================================
--- incubator/activemq/trunk/jmeter/src/java/org/activemq/sampler/ConsumerSysTest.java (original)
+++ incubator/activemq/trunk/jmeter/src/java/org/activemq/sampler/ConsumerSysTest.java Tue Feb 21 15:12:56 2006
@@ -1,338 +1,338 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.activemq.sampler;
-
-import org.apache.log.Logger;
-import org.apache.jorphan.logging.LoggingManager;
-import org.apache.jmeter.util.JMeterUtils;
-import org.apache.jmeter.samplers.SampleResult;
-import org.apache.jmeter.samplers.Entry;
-import org.activemq.util.connection.ServerConnectionFactory;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.command.ActiveMQMessage;
-
-import javax.jms.*;
-import java.util.*;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-
-
-
-public class ConsumerSysTest extends Sampler implements MessageListener {
-
-    private static final Logger log = LoggingManager.getLoggerForClass();
-
-    // Otherwise, the response is scanned for these strings
-    private static final String STATUS_PREFIX = JMeterUtils.getPropDefault("tcp.status.prefix", "");
-    private static final String STATUS_SUFFIX = JMeterUtils.getPropDefault("tcp.status.suffix", "");
-
-    private static final String STATUS_PROPERTIES = JMeterUtils.getPropDefault("tcp.status.properties", "");
-    private static final Properties statusProps = new Properties();
-    private static int msgCounter;
-
-    public static int noOfMessages;
-    public static int ConsumerCount;
-    public static Map ProducerMap = Collections.synchronizedMap(new ConcurrentHashMap());
-    public static Map CopyProducerMap = Collections.synchronizedMap(new ConcurrentHashMap());
-    public static boolean destination;
-    public static boolean resetMap = false;
-
-    private MessageConsumer consumer = null;
-
-    static {
-        log.info("Protocol Handler name=" + getClassname());
-        log.info("Status prefix=" + STATUS_PREFIX);
-        log.info("Status suffix=" + STATUS_SUFFIX);
-        log.info("Status properties=" + STATUS_PROPERTIES);
-
-        if (STATUS_PROPERTIES.length() > 0) {
-            File f = new File(STATUS_PROPERTIES);
-
-            try {
-                statusProps.load(new FileInputStream(f));
-                log.info("Successfully loaded properties");
-            } catch (FileNotFoundException e) {
-                log.error("Property file not found");
-            } catch (IOException e) {
-                log.error("Property file error " + e.toString());
-            }
-        }
-    }
-
-    /**
-     *  Constructor for ConsumerSampler object.
-     */
-    public ConsumerSysTest() {
-        log.debug("Created " + this);
-        protocolHandler = getProtocol();
-        log.debug("Using Protocol Handler: " + protocolHandler.getClass().getName());
-    }
-
-    /**
-     *  Subscribe to the config message.
-     *
-     * @throws JMSException
-     */
-    protected void suscribeConfigMessage() throws JMSException {
-        boolean topic = false;
-
-        Connection connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
-                                                                                ACTIVEMQ_SERVER,
-                                                                                topic,
-                                                                                this.getEmbeddedBroker());
-
-        // Start connection before receiving messages.
-        connection.start();
-
-        Session session = ServerConnectionFactory.createSession(connection,
-                                                                TRANSACTED_FALSE,
-                                                                ACTIVEMQ_SERVER,
-                                                                topic);
-
-        Destination destination = ServerConnectionFactory.createDestination(session,
-                                                                            CONFIG_SUBJECT,
-                                                                            this.getURL(),
-                                                                            ACTIVEMQ_SERVER,
-                                                                            topic);
-
-        MessageConsumer consumer = null;
-        consumer = session.createConsumer(destination);
-        Message message = consumer.receive();
-
-        TextMessage txtMsg = (TextMessage) message;
-        String configMsg = txtMsg.getText();
-
-        noOfMessages = Integer.parseInt(configMsg.substring(configMsg.indexOf("#")+1, configMsg.lastIndexOf("#")));
-
-        ServerConnectionFactory.close(connection, session);
-    }
-
-    /**
-     *  Create the subscriber/s then subscribe.
-     *
-     * @throws JMSException
-     */
-    protected void subscribe() throws JMSException {
-        String subjects[] = getSubjects();
-
-        for (int i = 0; i < this.getNoConsumer(); i++) {
-            String subject = subjects[i % getNoSubject()];
-            subscribe(subject);
-        }
-        ConsumerCount = getNoConsumer();
-    }
-
-    /**
-     *  Subscribe to the subject.
-     *
-     * @param subject
-     * @throws JMSException
-     */
-    protected void subscribe(String subject) throws JMSException {
-        destination(this.getTopic());
-        Connection connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
-                                                                                ACTIVEMQ_SERVER,
-                                                                                this.getTopic(),
-                                                                                this.getEmbeddedBroker());
-
-        if (this.getDurable()) {
-            IdGenerator idGenerator = new IdGenerator();
-            connection.setClientID(idGenerator.generateId());
-        }
-
-        // Start connection before receiving messages.
-        connection.start();
-
-        Session session = ServerConnectionFactory.createSession(connection,
-                                                                TRANSACTED_FALSE,
-                                                                ACTIVEMQ_SERVER,
-                                                                this.getTopic());
-
-        Destination destination = ServerConnectionFactory.createDestination(session,
-                                                                            subject,
-                                                                            this.getURL(),
-                                                                            ACTIVEMQ_SERVER,
-                                                                            this.getTopic());
-
-
-        if (this.getDurable() && this.getTopic()) {
-            consumer = session.createDurableSubscriber((Topic) destination, getClass().getName());
-        } else {
-            consumer = session.createConsumer(destination);
-        }
-
-        consumer.setMessageListener(this);
-        addResource(consumer);
-    }
-
-    /**
-     *  Create the publisher then send the confirmation message.
-     *
-     * @throws JMSException
-     */
-    protected void publishConfirmMessage() throws JMSException {
-        MessageProducer publisher = null;
-        String text = PUBLISH_MSG;
-        Connection connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
-                                                                                ACTIVEMQ_SERVER,
-                                                                                this.getTopic(),
-                                                                                this.getEmbeddedBroker());
-        if (this.getDurable()) {
-            IdGenerator idGenerator = new IdGenerator();
-            connection.setClientID(idGenerator.generateId());
-        }
-
-        Session session = ServerConnectionFactory.createSession(connection,
-                                                                this.getTransacted(),
-                                                                ACTIVEMQ_SERVER,
-                                                                this.getTopic());
-
-         Destination destination = ServerConnectionFactory.createDestination(session,
-                                                                            CONFIRM_SUBJECT,
-                                                                            this.getURL(),
-                                                                            ACTIVEMQ_SERVER,
-                                                                            this.getTopic());
-
-        publisher = session.createProducer(destination);
-
-         if (getDurable()) {
-             publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
-         } else {
-             publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         }
-
-        publishConfirmMessage(connection, session, publisher, text);
-    }
-
-    /**
-     * publish the confirmation message.
-     *
-     * @param connection
-     * @param session
-     * @param publisher
-     * @param text
-     * @throws JMSException
-     */
-    protected void publishConfirmMessage(Connection connection, Session session, MessageProducer publisher, String text)
-        throws JMSException {
-
-        Message message = session.createTextMessage(text);
-        publisher.send(message);
-
-        // Close the connection and session after sending the config message
-        ServerConnectionFactory.close(connection, session);
-    }
-
-    /**
-     * Runs and subscribes to messages.
-     *
-     * @throws JMSException
-     */
-    public void run() throws JMSException {
-
-        // Receives the config message
-        suscribeConfigMessage();
-
-        // Create subscriber
-        subscribe();
-
-        // Publish confirm messages
-        publishConfirmMessage();
-    }
-
-    /**
-     * Retrieves the sample as SampleResult object. There are times that this
-     * is ignored.
-     *
-     * @param e - Entry object.
-     * @return Returns the sample result.
-     */
-    public SampleResult sample(Entry e) {// Entry tends to be ignored ...
-        SampleResult res = new SampleResult();
-        res.setSampleLabel(getName());
-        res.setSamplerData(getURL());
-        res.sampleStart();
-
-        try {
-            this.run();
-        } catch (JMSException ex) {
-            log.error("Error running consumer ", ex);
-            res.setResponseCode("500");
-            res.setResponseMessage(ex.toString());
-        }
-
-        // Calculate response time
-        res.sampleEnd();
-
-        // Set if we were successful or not
-        res.setSuccessful(true);
-
-        return res;
-    }
-
-    public void onMessage(Message message) {
-        try {
-            ActiveMQMessage amsg = (ActiveMQMessage) message;
-            TextMessage textMessage = (TextMessage) message;
-
-            StringBuffer sb = new StringBuffer();
-            sb.append(textMessage.getText());
-            sb.append("#");
-            sb.append(amsg.getJMSMessageID());
-
-            addToMap(sb.toString());
-
-        } catch (JMSException e) {
-            log.error("Unable to force deserialize the content", e);
-        }
-    }
-
-    /**
-     *
-     * @param text Add the message to a Producer hash map.
-     */
-    private synchronized void addToMap(String text) {
-       msgCounter++;
-       String strMsgCounter = String.valueOf(msgCounter);
-       ProducerMap.put(strMsgCounter, text);
-    }
-
-    /**
-     *
-     * @return Resets the Producer map.
-     */
-    public synchronized  Map resetProducerMap() {
-        Map copy = Collections.synchronizedMap(new ConcurrentHashMap(ProducerMap));
-        ProducerMap.clear();
-        msgCounter = 0;
-        return copy;
-    }
-
-    /**
-     *
-     * @param dest
-     */
-    private void destination(boolean dest) {
-        destination = dest;
-    }
-
-
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.activemq.sampler;
+
+import org.apache.log.Logger;
+import org.apache.jorphan.logging.LoggingManager;
+import org.apache.jmeter.util.JMeterUtils;
+import org.apache.jmeter.samplers.SampleResult;
+import org.apache.jmeter.samplers.Entry;
+import org.activemq.util.connection.ServerConnectionFactory;
+import org.apache.activemq.util.IdGenerator;
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.*;
+import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
+
+
+
+public class ConsumerSysTest extends Sampler implements MessageListener {
+
+    private static final Logger log = LoggingManager.getLoggerForClass();
+
+    // Otherwise, the response is scanned for these strings
+    private static final String STATUS_PREFIX = JMeterUtils.getPropDefault("tcp.status.prefix", "");
+    private static final String STATUS_SUFFIX = JMeterUtils.getPropDefault("tcp.status.suffix", "");
+
+    private static final String STATUS_PROPERTIES = JMeterUtils.getPropDefault("tcp.status.properties", "");
+    private static final Properties statusProps = new Properties();
+    private static int msgCounter;
+
+    public static int noOfMessages;
+    public static int ConsumerCount;
+    public static Map ProducerMap = Collections.synchronizedMap(new ConcurrentHashMap());
+    public static Map CopyProducerMap = Collections.synchronizedMap(new ConcurrentHashMap());
+    public static boolean destination;
+    public static boolean resetMap = false;
+
+    private MessageConsumer consumer = null;
+
+    static {
+        log.info("Protocol Handler name=" + getClassname());
+        log.info("Status prefix=" + STATUS_PREFIX);
+        log.info("Status suffix=" + STATUS_SUFFIX);
+        log.info("Status properties=" + STATUS_PROPERTIES);
+
+        if (STATUS_PROPERTIES.length() > 0) {
+            File f = new File(STATUS_PROPERTIES);
+
+            try {
+                statusProps.load(new FileInputStream(f));
+                log.info("Successfully loaded properties");
+            } catch (FileNotFoundException e) {
+                log.error("Property file not found");
+            } catch (IOException e) {
+                log.error("Property file error " + e.toString());
+            }
+        }
+    }
+
+    /**
+     *  Constructor for ConsumerSampler object.
+     */
+    public ConsumerSysTest() {
+        log.debug("Created " + this);
+        protocolHandler = getProtocol();
+        log.debug("Using Protocol Handler: " + protocolHandler.getClass().getName());
+    }
+
+    /**
+     *  Subscribe to the config message.
+     *
+     * @throws JMSException
+     */
+    protected void suscribeConfigMessage() throws JMSException {
+        boolean topic = false;
+
+        Connection connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
+                                                                                ACTIVEMQ_SERVER,
+                                                                                topic,
+                                                                                this.getEmbeddedBroker());
+
+        // Start connection before receiving messages.
+        connection.start();
+
+        Session session = ServerConnectionFactory.createSession(connection,
+                                                                TRANSACTED_FALSE,
+                                                                ACTIVEMQ_SERVER,
+                                                                topic);
+
+        Destination destination = ServerConnectionFactory.createDestination(session,
+                                                                            CONFIG_SUBJECT,
+                                                                            this.getURL(),
+                                                                            ACTIVEMQ_SERVER,
+                                                                            topic);
+
+        MessageConsumer consumer = null;
+        consumer = session.createConsumer(destination);
+        Message message = consumer.receive();
+
+        TextMessage txtMsg = (TextMessage) message;
+        String configMsg = txtMsg.getText();
+
+        noOfMessages = Integer.parseInt(configMsg.substring(configMsg.indexOf("#")+1, configMsg.lastIndexOf("#")));
+
+        ServerConnectionFactory.close(connection, session);
+    }
+
+    /**
+     *  Create the subscriber/s then subscribe.
+     *
+     * @throws JMSException
+     */
+    protected void subscribe() throws JMSException {
+        String subjects[] = getSubjects();
+
+        for (int i = 0; i < this.getNoConsumer(); i++) {
+            String subject = subjects[i % getNoSubject()];
+            subscribe(subject);
+        }
+        ConsumerCount = getNoConsumer();
+    }
+
+    /**
+     *  Subscribe to the subject.
+     *
+     * @param subject
+     * @throws JMSException
+     */
+    protected void subscribe(String subject) throws JMSException {
+        destination(this.getTopic());
+        Connection connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
+                                                                                ACTIVEMQ_SERVER,
+                                                                                this.getTopic(),
+                                                                                this.getEmbeddedBroker());
+
+        if (this.getDurable()) {
+            IdGenerator idGenerator = new IdGenerator();
+            connection.setClientID(idGenerator.generateId());
+        }
+
+        // Start connection before receiving messages.
+        connection.start();
+
+        Session session = ServerConnectionFactory.createSession(connection,
+                                                                TRANSACTED_FALSE,
+                                                                ACTIVEMQ_SERVER,
+                                                                this.getTopic());
+
+        Destination destination = ServerConnectionFactory.createDestination(session,
+                                                                            subject,
+                                                                            this.getURL(),
+                                                                            ACTIVEMQ_SERVER,
+                                                                            this.getTopic());
+
+
+        if (this.getDurable() && this.getTopic()) {
+            consumer = session.createDurableSubscriber((Topic) destination, getClass().getName());
+        } else {
+            consumer = session.createConsumer(destination);
+        }
+
+        consumer.setMessageListener(this);
+        addResource(consumer);
+    }
+
+    /**
+     *  Create the publisher then send the confirmation message.
+     *
+     * @throws JMSException
+     */
+    protected void publishConfirmMessage() throws JMSException {
+        MessageProducer publisher = null;
+        String text = PUBLISH_MSG;
+        Connection connection = ServerConnectionFactory.createConnectionFactory(this.getURL(),
+                                                                                ACTIVEMQ_SERVER,
+                                                                                this.getTopic(),
+                                                                                this.getEmbeddedBroker());
+        if (this.getDurable()) {
+            IdGenerator idGenerator = new IdGenerator();
+            connection.setClientID(idGenerator.generateId());
+        }
+
+        Session session = ServerConnectionFactory.createSession(connection,
+                                                                this.getTransacted(),
+                                                                ACTIVEMQ_SERVER,
+                                                                this.getTopic());
+
+         Destination destination = ServerConnectionFactory.createDestination(session,
+                                                                            CONFIRM_SUBJECT,
+                                                                            this.getURL(),
+                                                                            ACTIVEMQ_SERVER,
+                                                                            this.getTopic());
+
+        publisher = session.createProducer(destination);
+
+         if (getDurable()) {
+             publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
+         } else {
+             publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         }
+
+        publishConfirmMessage(connection, session, publisher, text);
+    }
+
+    /**
+     * publish the confirmation message.
+     *
+     * @param connection
+     * @param session
+     * @param publisher
+     * @param text
+     * @throws JMSException
+     */
+    protected void publishConfirmMessage(Connection connection, Session session, MessageProducer publisher, String text)
+        throws JMSException {
+
+        Message message = session.createTextMessage(text);
+        publisher.send(message);
+
+        // Close the connection and session after sending the config message
+        ServerConnectionFactory.close(connection, session);
+    }
+
+    /**
+     * Runs and subscribes to messages.
+     *
+     * @throws JMSException
+     */
+    public void run() throws JMSException {
+
+        // Receives the config message
+        suscribeConfigMessage();
+
+        // Create subscriber
+        subscribe();
+
+        // Publish confirm messages
+        publishConfirmMessage();
+    }
+
+    /**
+     * Retrieves the sample as SampleResult object. There are times that this
+     * is ignored.
+     *
+     * @param e - Entry object.
+     * @return Returns the sample result.
+     */
+    public SampleResult sample(Entry e) {// Entry tends to be ignored ...
+        SampleResult res = new SampleResult();
+        res.setSampleLabel(getName());
+        res.setSamplerData(getURL());
+        res.sampleStart();
+
+        try {
+            this.run();
+        } catch (JMSException ex) {
+            log.error("Error running consumer ", ex);
+            res.setResponseCode("500");
+            res.setResponseMessage(ex.toString());
+        }
+
+        // Calculate response time
+        res.sampleEnd();
+
+        // Set if we were successful or not
+        res.setSuccessful(true);
+
+        return res;
+    }
+
+    public void onMessage(Message message) {
+        try {
+            ActiveMQMessage amsg = (ActiveMQMessage) message;
+            TextMessage textMessage = (TextMessage) message;
+
+            StringBuffer sb = new StringBuffer();
+            sb.append(textMessage.getText());
+            sb.append("#");
+            sb.append(amsg.getJMSMessageID());
+
+            addToMap(sb.toString());
+
+        } catch (JMSException e) {
+            log.error("Unable to force deserialize the content", e);
+        }
+    }
+
+    /**
+     *
+     * @param text Add the message to a Producer hash map.
+     */
+    private synchronized void addToMap(String text) {
+       msgCounter++;
+       String strMsgCounter = String.valueOf(msgCounter);
+       ProducerMap.put(strMsgCounter, text);
+    }
+
+    /**
+     *
+     * @return Resets the Producer map.
+     */
+    public synchronized  Map resetProducerMap() {
+        Map copy = Collections.synchronizedMap(new ConcurrentHashMap(ProducerMap));
+        ProducerMap.clear();
+        msgCounter = 0;
+        return copy;
+    }
+
+    /**
+     *
+     * @param dest
+     */
+    private void destination(boolean dest) {
+        destination = dest;
+    }
+
+
 }

Propchange: incubator/activemq/trunk/jmeter/src/java/org/activemq/sampler/ConsumerSysTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/jmeter/src/java/org/activemq/sampler/Producer.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message