activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r786040 [4/6] - in /activemq/sandbox/activemq-flow: activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/test/java/org/apache/activemq/legacy/broker/ activemq-all/src/test/java/org/apache/activemq/legacy/broker/advisory/ ...
Date Thu, 18 Jun 2009 12:42:29 GMT
Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,146 @@
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQSession;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JMSIndividualAckTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testAckedMessageAreConsumed() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testLastMessageAcked() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage msg1 = session.createTextMessage("msg1");
+        TextMessage msg2 = session.createTextMessage("msg2");
+        TextMessage msg3 = session.createTextMessage("msg3");
+        producer.send(msg1);
+        producer.send(msg2);
+        producer.send(msg3);
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals(msg1,msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals(msg2,msg);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+        session.close();
+    }
+    
+    /**
+     * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
+     * 
+     * @throws JMSException
+     */
+    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        // Don't ack the message.
+        
+        // Reset the session.  This should cause the unacknowledged message to be re-delivered.
+        session.close();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+                
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);        
+        msg.acknowledge();
+        
+        session.close();
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + getName();
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSIndividualAckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JMSQueueRedeliverTest extends JmsTopicRedeliverTest {
+    protected void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JMSQueueRedeliverTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsAutoAckListenerTest extends TestSupport implements MessageListener {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowleged messages are being consumed.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testAckedMessageAreConsumed() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(this);
+
+        Thread.sleep(10000);
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        // Attempt to Consume the message...check if message was acknowledge
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    public void onMessage(Message message) {
+        assertNotNull(message);
+
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckListenerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsAutoAckTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+    
+    /**
+     * Tests if acknowleged messages are being consumed.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testAckedMessageAreConsumed() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);        
+
+        session.close();
+    }
+    
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsAutoAckTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class JmsClientAckListenerTest extends TestSupport implements MessageListener {
+
+    private Connection connection;
+    private boolean dontAck;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowleged messages are being consumed.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testAckedMessageAreConsumed() throws Exception {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(this);
+
+        Thread.sleep(10000);
+
+        // Reset the session.
+        session.close();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    /**
+     * Tests if unacknowleged messages are being redelivered when the consumer
+     * connects again.
+     * 
+     * @throws javax.jms.JMSException
+     */
+    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
+        connection.start();
+        // don't aknowledge message on onMessage() call
+        dontAck = true;
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("test");
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(this);
+        // Don't ack the message.
+
+        // Reset the session. This should cause the Unacked message to be
+        // redelivered.
+        session.close();
+
+        Thread.sleep(10000);
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(2000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        session.close();
+    }
+
+    public void onMessage(Message message) {
+
+        assertNotNull(message);
+        if (!dontAck) {
+            try {
+                message.acknowledge();
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckListenerTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsClientAckTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testAckedMessageAreConsumed() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testLastMessageAcked() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+        producer.send(session.createTextMessage("Hello2"));
+        producer.send(session.createTextMessage("Hello3"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+    
+    /**
+     * Tests if unacknowledged messages are being re-delivered when the consumer connects again.
+     * 
+     * @throws JMSException
+     */
+    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        // Don't ack the message.
+        
+        // Reset the session.  This should cause the unacknowledged message to be re-delivered.
+        session.close();
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);        
+        msg.acknowledge();
+        
+        session.close();
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + getName();
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsClientAckTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class JmsConnectionStartStopTest extends TestSupport {
+
+    private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
+        .getLog(JmsConnectionStartStopTest.class);
+
+    private Connection startedConnection;
+    private Connection stoppedConnection;
+
+    /**
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+
+        LOG.info(getClass().getClassLoader().getResource("log4j.properties"));
+
+        ActiveMQConnectionFactory factory = createConnectionFactory();
+        startedConnection = factory.createConnection();
+        startedConnection.start();
+        stoppedConnection = factory.createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        stoppedConnection.close();
+        startedConnection.close();
+    }
+
+    /**
+     * Tests if the consumer receives the messages that were sent before the
+     * connection was started.
+     * 
+     * @throws JMSException
+     */
+    public void testStoppedConsumerHoldsMessagesTillStarted() throws JMSException {
+        Session startedSession = startedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session stoppedSession = stoppedConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        // Setup the consumers.
+        Topic topic = startedSession.createTopic("test");
+        MessageConsumer startedConsumer = startedSession.createConsumer(topic);
+        MessageConsumer stoppedConsumer = stoppedSession.createConsumer(topic);
+
+        // Send the message.
+        MessageProducer producer = startedSession.createProducer(topic);
+        TextMessage message = startedSession.createTextMessage("Hello");
+        producer.send(message);
+
+        // Test the assertions.
+        Message m = startedConsumer.receive(1000);
+        assertNotNull(m);
+
+        m = stoppedConsumer.receive(1000);
+        assertNull(m);
+
+        stoppedConnection.start();
+        m = stoppedConsumer.receive(5000);
+        assertNotNull(m);
+
+        startedSession.close();
+        stoppedSession.close();
+    }
+
+    /**
+     * Tests if the consumer is able to receive messages eveb when the
+     * connecction restarts multiple times.
+     * 
+     * @throws Exception
+     */
+    public void testMultipleConnectionStops() throws Exception {
+        testStoppedConsumerHoldsMessagesTillStarted();
+        stoppedConnection.stop();
+        testStoppedConsumerHoldsMessagesTillStarted();
+        stoppedConnection.stop();
+        testStoppedConsumerHoldsMessagesTillStarted();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsConnectionStartStopTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener {
+
+    private Connection connection;
+    private Session publisherSession;
+    private Session consumerSession;
+    private MessageConsumer consumer;
+    private MessageConsumer testConsumer;
+    private MessageProducer producer;
+    private Topic topic;
+    private Object lock = new Object();
+
+    /*
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        super.topic = true;
+        connection = createConnection();
+        connection.setClientID("connection:" + getSubject());
+        publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        topic = (Topic)super.createDestination("Test.Topic");
+        consumer = consumerSession.createConsumer(topic);
+        consumer.setMessageListener(this);
+        producer = publisherSession.createProducer(topic);
+        connection.start();
+    }
+
+    /*
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        connection.close();
+    }
+
+    /**
+     * Tests if a consumer can be created asynchronusly
+     * 
+     * @throws Exception
+     */
+    public void testCreateConsumer() throws Exception {
+        Message msg = super.createMessage();
+        producer.send(msg);
+        if (testConsumer == null) {
+            synchronized (lock) {
+                lock.wait(3000);
+            }
+        }
+        assertTrue(testConsumer != null);
+    }
+
+    /**
+     * Use the asynchronous subscription mechanism
+     * 
+     * @param message
+     */
+    public void onMessage(Message message) {
+        try {
+            testConsumer = consumerSession.createConsumer(topic);
+            consumerSession.createProducer(topic);
+            synchronized (lock) {
+                lock.notify();
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertTrue(false);
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsCreateConsumerInOnMessageTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.DeliveryMode;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsDurableQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+    /**
+     * Set up the test with a queue and persistent delivery mode.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = false;
+        deliveryMode = DeliveryMode.PERSISTENT;
+        super.setUp();
+    }
+
+    /**
+     * Returns the consumer subject.
+     */
+    protected String getConsumerSubject() {
+        return "FOO.>";
+    }
+
+    /**
+     * Returns the producer subject.
+     */
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableQueueWildcardSendReceiveTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsDurableTopicSelectorTest extends JmsTopicSelectorTest {
+    public void setUp() throws Exception {
+        durable = true;
+        super.setUp();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSelectorTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.5 $
+ */
+public class JmsDurableTopicSendReceiveTest extends JmsTopicSendReceiveTest {
+    private static final Log LOG = LogFactory.getLog(JmsDurableTopicSendReceiveTest.class);
+
+    protected Connection connection2;
+    protected Session session2;
+    protected Session consumeSession2;
+    protected MessageConsumer consumer2;
+    protected MessageProducer producer2;
+    protected Destination consumerDestination2;
+    protected Destination producerDestination2;
+
+    /**
+     * Set up a durable suscriber test.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        this.durable = true;
+        super.setUp();
+    }
+
+    /**
+     * Test if all the messages sent are being received.
+     * 
+     * @throws Exception
+     */
+    public void testSendWhileClosed() throws Exception {
+        connection2 = createConnection();
+        connection2.setClientID("test");
+        connection2.start();
+        session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer2 = session2.createProducer(null);
+        producer2.setDeliveryMode(deliveryMode);
+        producerDestination2 = session2.createTopic(getProducerSubject() + "2");
+        Thread.sleep(1000);
+
+        consumeSession2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerDestination2 = session2.createTopic(getConsumerSubject() + "2");
+        consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+        Thread.sleep(1000);
+        consumer2.close();
+        TextMessage message = session2.createTextMessage("test");
+        message.setStringProperty("test", "test");
+        message.setJMSType("test");
+        producer2.send(producerDestination2, message);
+        LOG.info("Creating durable consumer");
+        consumer2 = consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+        Message msg = consumer2.receive(1000);
+        assertNotNull(msg);
+        assertEquals(((TextMessage)msg).getText(), "test");
+        assertEquals(msg.getJMSType(), "test");
+        assertEquals(msg.getStringProperty("test"), "test");
+        connection2.stop();
+        connection2.close();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicSendReceiveTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.DeliveryMode;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsDurableTopicWildcardSendReceiveTest extends JmsTopicSendReceiveTest {
+
+    /**
+     * Sets up a test with a topic destination, durable suscriber and persistent
+     * delivery mode.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        topic = true;
+        durable = true;
+        deliveryMode = DeliveryMode.PERSISTENT;
+        super.setUp();
+    }
+
+    /**
+     * Returns the consumer subject.
+     */
+    protected String getConsumerSubject() {
+        return "FOO.>";
+    }
+
+    /**
+     * Returns the producer subject.
+     */
+    protected String getProducerSubject() {
+        return "FOO.BAR.HUMBUG";
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsDurableTopicWildcardSendReceiveTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.legacy.test2;
+
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsQueueSelectorTest extends JmsTopicSelectorTest {
+    public void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsQueueSelectorTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.7 $
+ */
+public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener {
+    private static final Log LOG = LogFactory.getLog(JmsSendReceiveTestSupport.class);
+
+    protected int messageCount = 100;
+    protected String[] data;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected List<Message> messages = createConcurrentList();
+    protected boolean topic = true;
+    protected boolean durable;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected final Object lock = new Object();
+    protected boolean verbose;
+
+    /*
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        String temp = System.getProperty("messageCount");
+
+        if (temp != null) {
+            int i = Integer.parseInt(temp);
+            if (i > 0) {
+                messageCount = i;
+            }
+        }
+
+        LOG.info("Message count for test case is: " + messageCount);
+        data = new String[messageCount];
+
+        for (int i = 0; i < messageCount; i++) {
+            data[i] = "Text for message: " + i + " at " + new Date();
+        }
+    }
+
+    /**
+     * Sends and consumes the messages.
+     * 
+     * @throws Exception
+     */
+    public void testSendReceive() throws Exception {
+        messages.clear();
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            sendToProducer(producer, producerDestination, message);
+            messageSent();
+        }
+
+        assertMessagesAreReceived();
+        LOG.info("" + data.length + " messages(s) received, closing down connections");
+    }
+
+    /**
+     * Sends a message to a destination using the supplied producer
+     * @param producer
+     * @param producerDestination
+     * @param message
+     * @throws JMSException
+     */
+    protected void sendToProducer(MessageProducer producer,
+            Destination producerDestination, Message message) throws JMSException {
+        producer.send(producerDestination, message);   
+    }
+
+    /**
+     * Asserts messages are received.
+     * 
+     * @throws JMSException
+     */
+    protected void assertMessagesAreReceived() throws JMSException {
+        waitForMessagesToBeDelivered();
+        assertMessagesReceivedAreValid(messages);
+    }
+
+    /**
+     * Tests if the messages received are valid.
+     * 
+     * @param receivedMessages - list of received messages.
+     * @throws JMSException
+     */
+    protected void assertMessagesReceivedAreValid(List<Message> receivedMessages) throws JMSException {
+        List<Object> copyOfMessages = Arrays.asList(receivedMessages.toArray());
+        int counter = 0;
+
+        if (data.length != copyOfMessages.size()) {
+            for (Iterator<Object> iter = copyOfMessages.iterator(); iter.hasNext();) {
+                TextMessage message = (TextMessage)iter.next();
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("<== " + counter++ + " = " + message.getText());
+                }
+            }
+        }
+
+        assertEquals("Not enough messages received", data.length, receivedMessages.size());
+
+        for (int i = 0; i < data.length; i++) {
+            TextMessage received = (TextMessage)receivedMessages.get(i);
+            String text = received.getText();
+            String stringProperty = received.getStringProperty("stringProperty");
+            int intProperty = received.getIntProperty("intProperty");
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.info("Received Text: " + text);
+                }
+            }
+
+            assertEquals("Message: " + i, data[i], text);
+            assertEquals(data[i], stringProperty);
+            assertEquals(i, intProperty);
+        }
+    }
+
+    /**
+     * Waits for messages to be delivered.
+     */
+    protected void waitForMessagesToBeDelivered() {
+        long maxWaitTime = 30000;
+        long waitTime = maxWaitTime;
+        long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis();
+
+        synchronized (lock) {
+            while (messages.size() < data.length && waitTime >= 0) {
+                try {
+                    lock.wait(200);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+
+                waitTime = maxWaitTime - (System.currentTimeMillis() - start);
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+     */
+    public synchronized void onMessage(Message message) {
+        consumeMessage(message, messages);
+    }
+
+    /**
+     * Consumes messages.
+     * 
+     * @param message - message to be consumed.
+     * @param messageList -list of consumed messages.
+     */
+    protected void consumeMessage(Message message, List<Message> messageList) {
+        if (verbose) {
+            if (LOG.isDebugEnabled()) {
+                LOG.info("Received message: " + message);
+            }
+        }
+
+        messageList.add(message);
+
+        if (messageList.size() >= data.length) {
+            synchronized (lock) {
+                lock.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * Returns the ArrayList as a synchronized list.
+     * 
+     * @return List
+     */
+    protected List<Message> createConcurrentList() {
+        return Collections.synchronizedList(new ArrayList<Message>());
+    }
+
+    /**
+     * Just a hook so can insert failure tests
+     * 
+     * @throws Exception
+     */
+    protected void messageSent() throws Exception {
+
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveTestSupport.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ *
+ */
+public class JmsSendReceiveWithMessageExpirationTest extends TestSupport {
+
+    private static final Log LOG = LogFactory.getLog(JmsSendReceiveWithMessageExpirationTest.class);
+
+    protected int messageCount = 100;
+    protected String[] data;
+    protected Session session;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected boolean durable;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+    protected long timeToLive = 5000;
+    protected boolean verbose;
+
+    protected Connection connection;
+
+    protected void setUp() throws Exception {
+
+        super.setUp();
+
+        data = new String[messageCount];
+
+        for (int i = 0; i < messageCount; i++) {
+            data[i] = "Text for message: " + i + " at " + new Date();
+        }
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+
+        if (durable) {
+            connection.setClientID(getClass().getName());
+        }
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    /**
+     * Test consuming an expired queue.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeExpiredQueue() throws Exception {
+
+        MessageProducer producer = createProducer(timeToLive);
+
+        consumerDestination = session.createQueue(getConsumerSubject());
+        producerDestination = session.createQueue(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // sleeps a second longer than the expiration time.
+        // Basically waits till queue expires.
+        Thread.sleep(timeToLive + 1000);
+
+        // message should have expired.
+        assertNull(consumer.receive(1000));
+    }
+
+    /**
+     * Sends and consumes the messages to a queue destination.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeQueue() throws Exception {
+
+        MessageProducer producer = createProducer(0);
+
+        consumerDestination = session.createQueue(getConsumerSubject());
+        producerDestination = session.createQueue(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // should receive a queue since there is no expiration.
+        assertNotNull(consumer.receive(1000));
+    }
+
+    /**
+     * Test consuming an expired topic.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeExpiredTopic() throws Exception {
+
+        MessageProducer producer = createProducer(timeToLive);
+
+        consumerDestination = session.createTopic(getConsumerSubject());
+        producerDestination = session.createTopic(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // sleeps a second longer than the expiration time.
+        // Basically waits till topic expires.
+        Thread.sleep(timeToLive + 1000);
+
+        // message should have expired.
+        assertNull(consumer.receive(1000));
+    }
+
+    /**
+     * Sends and consumes the messages to a topic destination.
+     * 
+     * @throws Exception
+     */
+    public void testConsumeTopic() throws Exception {
+
+        MessageProducer producer = createProducer(0);
+
+        consumerDestination = session.createTopic(getConsumerSubject());
+        producerDestination = session.createTopic(getProducerSubject());
+
+        MessageConsumer consumer = createConsumer();
+        connection.start();
+
+        for (int i = 0; i < data.length; i++) {
+            Message message = session.createTextMessage(data[i]);
+            message.setStringProperty("stringProperty", data[i]);
+            message.setIntProperty("intProperty", i);
+
+            if (verbose) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
+                }
+            }
+
+            producer.send(producerDestination, message);
+        }
+
+        // should receive a topic since there is no expiration.
+        assertNotNull(consumer.receive(1000));
+    }
+
+    protected MessageProducer createProducer(long timeToLive) throws JMSException {
+        MessageProducer producer = session.createProducer(null);
+        producer.setDeliveryMode(deliveryMode);
+        producer.setTimeToLive(timeToLive);
+
+        return producer;
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            return session.createDurableSubscriber((Topic)consumerDestination, getName());
+        }
+        return session.createConsumer(consumerDestination);
+    }
+
+    protected void tearDown() throws Exception {
+        LOG.info("Dumping stats...");
+        LOG.info("Closing down connection");
+
+        session.close();
+        connection.close();
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsSendReceiveWithMessageExpirationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsTopicRedeliverTest extends TestSupport {
+
+    private static final Log LOG = LogFactory.getLog(JmsTopicRedeliverTest.class);
+
+    protected Connection connection;
+    protected Session session;
+    protected Session consumeSession;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected boolean topic = true;
+    protected boolean durable;
+    protected boolean verbose;
+    protected long initRedeliveryDelay;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+        initRedeliveryDelay = ((ActiveMQConnection)connection).getRedeliveryPolicy().getInitialRedeliveryDelay();
+
+        if (durable) {
+            connection.setClientID(getClass().getName());
+        }
+
+        LOG.info("Created connection: " + connection);
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+        LOG.info("Created session: " + session);
+        LOG.info("Created consumeSession: " + consumeSession);
+        producer = session.createProducer(null);
+        // producer.setDeliveryMode(deliveryMode);
+
+        LOG.info("Created producer: " + producer);
+
+        if (topic) {
+            consumerDestination = session.createTopic(getConsumerSubject());
+            producerDestination = session.createTopic(getProducerSubject());
+        } else {
+            consumerDestination = session.createQueue(getConsumerSubject());
+            producerDestination = session.createQueue(getProducerSubject());
+        }
+
+        LOG.info("Created  consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+        LOG.info("Created  producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
+        consumer = createConsumer();
+        connection.start();
+
+        LOG.info("Created connection: " + connection);
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Returns the consumer subject.
+     * 
+     * @return String - consumer subject
+     * @see org.apache.activemq.legacy.testsupport.test.TestSupport#getConsumerSubject()
+     */
+    protected String getConsumerSubject() {
+        return "TEST";
+    }
+
+    /**
+     * Returns the producer subject.
+     * 
+     * @return String - producer subject
+     * @see org.apache.activemq.legacy.testsupport.test.TestSupport#getProducerSubject()
+     */
+    protected String getProducerSubject() {
+        return "TEST";
+    }
+
+    /**
+     * Sends and consumes the messages.
+     * 
+     * @throws Exception
+     */
+    public void testRecover() throws Exception {
+        String text = "TEST";
+        Message sendMessage = session.createTextMessage(text);
+
+        if (verbose) {
+            LOG.info("About to send a message: " + sendMessage + " with text: " + text);
+        }
+        producer.send(producerDestination, sendMessage);
+
+        // receive but don't acknowledge
+        Message unackMessage = consumer.receive(initRedeliveryDelay + 1000);
+        assertNotNull(unackMessage);
+        String unackId = unackMessage.getJMSMessageID();
+        assertEquals(((TextMessage)unackMessage).getText(), text);
+        assertFalse(unackMessage.getJMSRedelivered());
+        // assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"),1);
+
+        // receive then acknowledge
+        consumeSession.recover();
+        Message ackMessage = consumer.receive(initRedeliveryDelay + 1000);
+        assertNotNull(ackMessage);
+        ackMessage.acknowledge();
+        String ackId = ackMessage.getJMSMessageID();
+        assertEquals(((TextMessage)ackMessage).getText(), text);
+        assertTrue(ackMessage.getJMSRedelivered());
+        // assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"),2);
+        assertEquals(unackId, ackId);
+        consumeSession.recover();
+        assertNull(consumer.receiveNoWait());
+    }
+
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            return consumeSession.createDurableSubscriber((Topic)consumerDestination, getName());
+        }
+        return consumeSession.createConsumer(consumerDestination);
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicRedeliverTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.2 $
+ */
+public class JmsTopicSelectorTest extends TestSupport {
+    private static final Log LOG = LogFactory.getLog(JmsTopicSelectorTest.class);
+
+    protected Connection connection;
+    protected Session session;
+    protected MessageConsumer consumer;
+    protected MessageProducer producer;
+    protected Destination consumerDestination;
+    protected Destination producerDestination;
+    protected boolean topic = true;
+    protected boolean durable;
+    protected int deliveryMode = DeliveryMode.PERSISTENT;
+
+    public JmsTopicSelectorTest() {
+        super();
+    }
+
+    public JmsTopicSelectorTest(String name) {
+        super(name);
+    }
+
+    public void setUp() throws Exception {
+        super.setUp();
+
+        connectionFactory = createConnectionFactory();
+        connection = createConnection();
+        if (durable) {
+            connection.setClientID(getClass().getName());
+        }
+
+        LOG.info("Created connection: " + connection);
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        LOG.info("Created session: " + session);
+
+        if (topic) {
+            consumerDestination = session.createTopic(getConsumerSubject());
+            producerDestination = session.createTopic(getProducerSubject());
+        } else {
+            consumerDestination = session.createQueue(getConsumerSubject());
+            producerDestination = session.createQueue(getProducerSubject());
+        }
+
+        LOG.info("Created  consumer destination: " + consumerDestination + " of type: " + consumerDestination.getClass());
+        LOG.info("Created  producer destination: " + producerDestination + " of type: " + producerDestination.getClass());
+        producer = session.createProducer(producerDestination);
+        producer.setDeliveryMode(deliveryMode);
+
+        LOG.info("Created producer: " + producer + " delivery mode = " + (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT"));
+        connection.start();
+    }
+
+    public void tearDown() throws Exception {
+        session.close();
+        connection.close();
+    }
+
+    protected MessageConsumer createConsumer(String selector) throws JMSException {
+        if (durable) {
+            LOG.info("Creating durable consumer");
+            return session.createDurableSubscriber((Topic)consumerDestination, getName(), selector, false);
+        }
+        return session.createConsumer(consumerDestination, selector);
+    }
+
+    public void sendMessages() throws Exception {
+        TextMessage message = session.createTextMessage("1");
+        message.setIntProperty("id", 1);
+        message.setJMSType("a");
+        message.setStringProperty("stringProperty", "a");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+
+        message = session.createTextMessage("2");
+        message.setIntProperty("id", 2);
+        message.setJMSType("a");
+        message.setStringProperty("stringProperty", "a");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", false);
+        producer.send(message);
+
+        message = session.createTextMessage("3");
+        message.setIntProperty("id", 3);
+        message.setJMSType("a");
+        message.setStringProperty("stringProperty", "a");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+
+        message = session.createTextMessage("4");
+        message.setIntProperty("id", 4);
+        message.setJMSType("b");
+        message.setStringProperty("stringProperty", "b");
+        message.setLongProperty("longProperty", 2);
+        message.setBooleanProperty("booleanProperty", false);
+        producer.send(message);
+
+        message = session.createTextMessage("5");
+        message.setIntProperty("id", 5);
+        message.setJMSType("c");
+        message.setStringProperty("stringProperty", "c");
+        message.setLongProperty("longProperty", 3);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+    }
+
+    public void consumeMessages(int remaining) throws Exception {
+        consumer = createConsumer(null);
+        for (int i = 0; i < remaining; i++) {
+            consumer.receive(1000);
+        }
+        consumer.close();
+
+    }
+
+    public void testPropertySelector() throws Exception {
+        int remaining = 5;
+        Message message = null;
+        consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true");
+        sendMessages();
+        while (true) {
+            message = consumer.receive(1000);
+            if (message == null) {
+                break;
+            }
+            String text = ((TextMessage)message).getText();
+            if (!text.equals("1") && !text.equals("3")) {
+                fail("unexpected message: " + text);
+            }
+            remaining--;
+        }
+        assertEquals(remaining, 3);
+        consumer.close();
+        consumeMessages(remaining);
+
+    }
+
+    public void testJMSPropertySelector() throws Exception {
+        int remaining = 5;
+        Message message = null;
+        consumer = createConsumer("JMSType = 'a' and stringProperty = 'a'");
+        sendMessages();
+        while (true) {
+            message = consumer.receive(1000);
+            if (message == null) {
+                break;
+            }
+            String text = ((TextMessage)message).getText();
+            if (!text.equals("1") && !text.equals("2") && !text.equals("3")) {
+                fail("unexpected message: " + text);
+            }
+            remaining--;
+        }
+        assertEquals(remaining, 2);
+        consumer.close();
+        consumeMessages(remaining);
+
+    }
+
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSelectorTest.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java?rev=786040&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java Thu Jun 18 12:42:23 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.test2;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JmsTopicSendReceiveSubscriberTest extends JmsTopicSendReceiveTest {
+    protected MessageConsumer createConsumer() throws JMSException {
+        if (durable) {
+            return super.createConsumer();
+        } else {
+            TopicSession topicSession = (TopicSession)session;
+            return topicSession.createSubscriber((Topic)consumerDestination, null, false);
+        }
+    }
+}

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/sandbox/activemq-flow/activemq-client/src/test/java/org/apache/activemq/legacy/test2/JmsTopicSendReceiveSubscriberTest.java
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message