From commits-return-16310-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Thu Jun 2 21:34:44 2011 Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 89FF94018 for ; Thu, 2 Jun 2011 21:34:44 +0000 (UTC) Received: (qmail 54987 invoked by uid 500); 2 Jun 2011 21:34:44 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 54962 invoked by uid 500); 2 Jun 2011 21:34:44 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 54955 invoked by uid 99); 2 Jun 2011 21:34:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jun 2011 21:34:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jun 2011 21:34:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 29E2A2388A36; Thu, 2 Jun 2011 21:34:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1130827 - in /activemq/activemq-apollo/trunk/apollo-openwire/src: main/scala/org/apache/activemq/apollo/openwire/ test/resources/ test/scala/org/apache/activemq/apollo/openwire/ Date: Thu, 02 Jun 2011 21:34:22 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110602213422.29E2A2388A36@eris.apache.org> Author: tabish Date: Thu Jun 2 21:34:21 2011 New Revision: 1130827 URL: http://svn.apache.org/viewvc?rev=1130827&view=rev Log: https://issues.apache.org/jira/browse/APLO-30 Enable exclusive consumers, adds new tests and updates the openwire test resources a bit. Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala Removed: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTest.scala Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1130827&r1=1130826&r2=1130827&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Thu Jun 2 21:34:21 2011 @@ -157,7 +157,7 @@ class OpenwireProtocolHandler extends Pr outbound_sessions = new SinkMux[Command](connection.transport_sink.map { x:Command => x.setCommandId(next_command_id) - info("sending frame: %s", x) + debug("sending frame: %s", x.toString) x }, dispatchQueue, OpenwireCodec) connection_session = new OverflowSink(outbound_sessions.open(dispatchQueue)); @@ -214,7 +214,7 @@ class OpenwireProtocolHandler extends Pr } try { current_command = command - println("received: %s", command) + trace("received: %s", command) if (wire_format == null) { command match { case codec: OpenwireCodec => @@ -444,6 +444,9 @@ class OpenwireProtocolHandler extends Pr var selector_expression:BooleanExpression = _ var destination:Array[DestinationDTO] = _ + override def exclusive = info.isExclusive + override def browser = info.isBrowser + def attach = { if( info.getDestination == null ) fail("destination was not set") Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml?rev=1130827&r1=1130826&r2=1130827&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/apollo-openwire.xml Thu Jun 2 21:34:21 2011 @@ -23,6 +23,6 @@ localhost - + \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties?rev=1130827&r1=1130826&r2=1130827&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/resources/log4j.properties Thu Jun 2 21:34:21 2011 @@ -16,20 +16,31 @@ ## --------------------------------------------------------------------------- # -# The logging properties used during tests.. +# Setup the default logging levels # -log4j.rootLogger=WARN, console, file -log4j.logger.org.apache.activemq=TRACE +log4j.rootLogger=WARN, console, logfile +log4j.logger.org.apache.activemq.apollo=INFO -# Console will only display warnnings +# +# Uncomment one of the following to enable debug logging +# +# log4j.logger.org.apache.activemq.apollo=DEBUG +# log4j.logger.org.apache.activemq.apollo.broker=DEBUG +# log4j.logger.org.apache.activemq.apollo.web=DEBUG +# log4j.logger.org.apache.activemq.apollo.cli=DEBUG +# log4j.logger.org.apache.activemq.apollo.broker.store.hawtdb=DEBUG + +# Console Settings log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%-5p | %t | %m%n -log4j.appender.console.threshold=TRACE +log4j.appender.console.layout.ConversionPattern=%-5p | %m%n +log4j.appender.console.threshold=INFO -# File appender will contain all info messages -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n -log4j.appender.file.file=target/test.log -log4j.appender.file.append=true +# File Settings +log4j.appender.logfile=org.apache.log4j.RollingFileAppender +log4j.appender.logfile.file=${apollo.base}/log/apollo.log +log4j.appender.logfile.maxFileSize=5MB +log4j.appender.logfile.maxBackupIndex=5 +log4j.appender.logfile.append=true +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala?rev=1130827&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/DurableSubscriberTest.scala Thu Jun 2 21:34:21 2011 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire + +import javax.jms.{TextMessage, Session} + +class DurableSubscriberTest extends OpenwireTestSupport { + + test("Topic /w Durable sub retains messages.") { + + def create_durable_sub() { + val connection = connect(false) + connection.setClientID("test") + connection.start() + val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val subscriber = session.createDurableSubscriber(topic("example"), "test") + session.close() + connection.close() + if (default_connection == connection) { + default_connection = null + } + } + + create_durable_sub() + + connect(false) + default_connection.setClientID("test") + default_connection.start() + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(topic("example")) + def put(id:Int) { + producer.send(session.createTextMessage("message:"+id)) + } + + List(1,2,3).foreach(put _) + + val subscriber = session.createDurableSubscriber(topic("example"), "test") + + def get(id:Int) { + val m = subscriber.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(topic("example")) + m.getText should equal ("message:"+id) + } + + List(1,2,3).foreach(get _) + } + +} \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala?rev=1130827&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/ExclusiveConsumerTest.scala Thu Jun 2 21:34:21 2011 @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire + +import javax.jms.{TextMessage, Session} +import org.apache.activemq.apollo.openwire.command.ActiveMQQueue + +class ExclusiveConsumerTest extends OpenwireTestSupport { + + test("Exclusive Consumer Selected when created first") { + + connect(); + + val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1")); + val producer = senderSession.createProducer(queue("TEST.QUEUE1")); + + val msg = senderSession.createTextMessage("test"); + producer.send(msg); + + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + exclusiveConsumer.receive(100) should not be(null) + fallbackConsumer.receive(100) should be(null) + } + + test("Exclusive Consumer Selected when Created After Non-Exclusive Consumer") { + + connect(); + + val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1")); + val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val producer = senderSession.createProducer(queue("TEST.QUEUE1")); + + val msg = senderSession.createTextMessage("test"); + producer.send(msg); + + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + exclusiveConsumer.receive(100) should not be(null) + fallbackConsumer.receive(100) should be(null) + } + + test("Failover To Another Exclusive Consumer Created First") { + connect(); + + val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1")); + val producer = senderSession.createProducer(queue("TEST.QUEUE1")); + + val msg = senderSession.createTextMessage("test"); + producer.send(msg); + + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + exclusiveConsumer1.receive(100) should not be(null) + exclusiveConsumer2.receive(100) should be(null) + fallbackConsumer.receive(100) should be(null) + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer1.close() + + producer.send(msg); + producer.send(msg); + + exclusiveConsumer2.receive(100) should not be(null) + fallbackConsumer.receive(100) should be(null) + } + + test("Failover To Another Exclusive Consumer Created After a non-exclusive Consumer") { + + connect(); + + val exclusiveSession1 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val exclusiveSession2 = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + val exclusiveConsumer1 = exclusiveSession1.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1")); + val exclusiveConsumer2 = exclusiveSession2.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val producer = senderSession.createProducer(queue("TEST.QUEUE1")); + + val msg = senderSession.createTextMessage("test"); + producer.send(msg); + + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + exclusiveConsumer1.receive(100) should not be(null) + exclusiveConsumer2.receive(100) should be(null) + fallbackConsumer.receive(100) should be(null) + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer1.close() + + producer.send(msg); + producer.send(msg); + + exclusiveConsumer2.receive(100) should not be(null) + fallbackConsumer.receive(100) should be(null) + } + + test("Failover To NonExclusive Consumer") { + + connect(); + + val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + val exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1")); + val producer = senderSession.createProducer(queue("TEST.QUEUE1")); + + val msg = senderSession.createTextMessage("test"); + producer.send(msg); + + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + exclusiveConsumer.receive(100) should not be(null) + fallbackConsumer.receive(100) should be(null) + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer.close() + + producer.send(msg); + fallbackConsumer.receive(100) should not be(null) + } + + test("Fallback To Exclusive Consumer") { + connect(); + + val exclusiveSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val fallbackSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val senderSession = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + var exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + val fallbackConsumer = fallbackSession.createConsumer(queue("TEST.QUEUE1")); + val producer = senderSession.createProducer(queue("TEST.QUEUE1")); + + val msg = senderSession.createTextMessage("test"); + producer.send(msg); + + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assert(exclusiveConsumer.receive(100) != null, "The exclusive consumer should have got a Message"); + assert(fallbackConsumer.receive(100) == null, "The non-exclusive consumer shouldn't have a message"); + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer.close() + + producer.send(msg) + assert(fallbackConsumer.receive(100) != null, "The non-exclusive consumer should have a message"); + + // Create exclusive consumer to determine if it will start receiving + // the messages. + exclusiveConsumer = exclusiveSession.createConsumer(queue("TEST.QUEUE1?consumer.exclusive=true")) + + producer.send(msg) + + // Verify exclusive consumer receives the message. + exclusiveConsumer.receive(100) should not be(null) +// fallbackConsumer.receive(100) should be(null) + } + +} \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala?rev=1130827&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/OpenwireTestSupport.scala Thu Jun 2 21:34:21 2011 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.apollo.openwire + +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterEach +import java.lang.String +import org.apache.activemq.apollo.broker.{KeyStorage, Broker, BrokerFactory} +import org.apache.activemq.apollo.util.{FileSupport, Logging, FunSuiteSupport, ServiceControl} +import FileSupport._ +import javax.jms.Connection +import org.apache.activemq.ActiveMQConnectionFactory +import org.apache.activemq.command.{ActiveMQTopic, ActiveMQQueue} + +class OpenwireTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach with Logging { + var broker: Broker = null + var port = 0 + + val broker_config_uri = "xml:classpath:apollo-openwire.xml" + + override protected def beforeAll() { + info("Loading broker configuration from the classpath with URI: " + broker_config_uri) + broker = BrokerFactory.createBroker(broker_config_uri) + ServiceControl.start(broker, "Starting broker") + port = broker.get_socket_address.getPort + } + + var default_connection:Connection = _ + var connections = List[Connection]() + + override protected def afterAll() { + broker.stop() + } + + override protected def afterEach() { + super.afterEach() + connections.foreach(_.close()) + connections = Nil + default_connection = null + } + + def create_connection_factory = new ActiveMQConnectionFactory("tcp://localhost:%d?wireFormat.maxInactivityDuration=1000000&wireFormat.maxInactivityDurationInitalDelay=1000000".format(port)) + def create_connection: Connection = create_connection_factory.createConnection + def queue(value:String) = new ActiveMQQueue(value); + def topic(value:String) = new ActiveMQTopic(value); + + def connect(start:Boolean=true) = { + val connection = create_connection + connections ::= connection + if(default_connection==null) { + default_connection = connection + } + if( start ) { + connection.start() + } + connection + } + +} \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala?rev=1130827&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/QueueTest.scala Thu Jun 2 21:34:21 2011 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire + +import javax.jms.{TextMessage, Session} + +class QueueTest extends OpenwireTestSupport { + + test("Queue order preserved") { + + connect() + + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(queue("example")) + def put(id:Int) { + producer.send(session.createTextMessage("message:"+id)) + } + + List(1,2,3).foreach(put _) + + val consumer = session.createConsumer(queue("example")) + + def get(id:Int) { + val m = consumer.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(queue("example")) + m.getText should equal ("message:"+id) + } + + List(1,2,3).foreach(get _) + } + + test("Test that messages are consumed ") { + + connect() + var session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + + val queue = session.createQueue("test"); + val producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + var consumer = session.createConsumer(queue) + var msg = consumer.receive(1000) + assert(msg != null) + + Thread.sleep(1000) + + session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + // Attempt to Consume the message...check if message was acknowledge + consumer = session.createConsumer(queue) + msg = consumer.receive(1000) + assert(msg == null) + + session.close() + } + + test("Queue and a selector") { + connect() + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(queue("example")) + + def put(id:Int, color:String) { + val message = session.createTextMessage("message:" + id) + message.setStringProperty("color", color) + producer.send(message) + } + + List((1, "red"), (2, "blue"), (3, "red")).foreach { + case (id, color) => put(id, color) + } + + val consumer = session.createConsumer(queue("example"), "color='red'") + + def get(id:Int) { + val m = consumer.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(queue("example")) + m.getText should equal ("message:"+id) + } + + get(1) + get(3) + } +} \ No newline at end of file Added: activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala?rev=1130827&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala (added) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/test/scala/org/apache/activemq/apollo/openwire/TopicTest.scala Thu Jun 2 21:34:21 2011 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.openwire + +import javax.jms.{TextMessage, Session} + +class TopicTest extends OpenwireTestSupport { + + test("Topic drops messages sent before before subscription is established") { + connect() + val session = default_connection.createSession(false, Session.AUTO_ACKNOWLEDGE) + val producer = session.createProducer(topic("example")) + def put(id:Int) { + producer.send(session.createTextMessage("message:"+id)) + } + + put(1) + + val consumer = session.createConsumer(topic("example")) + + put(2) + put(3) + + def get(id:Int) { + val m = consumer.receive().asInstanceOf[TextMessage] + m.getJMSDestination should equal(topic("example")) + m.getText should equal ("message:"+id) + } + + List(2,3).foreach(get _) + } + +} \ No newline at end of file