Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 54536 invoked from network); 14 Nov 2006 21:17:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 14 Nov 2006 21:17:05 -0000 Received: (qmail 90020 invoked by uid 500); 14 Nov 2006 21:17:10 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 89992 invoked by uid 500); 14 Nov 2006 21:17:10 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 89889 invoked by uid 99); 14 Nov 2006 21:17:10 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2006 13:17:10 -0800 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Nov 2006 13:16:53 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 699821A984D; Tue, 14 Nov 2006 13:16:21 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r474985 [2/5] - in /incubator/activemq/trunk: activemq-console/src/main/java/org/apache/activemq/console/ activemq-console/src/test/resources/ activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/... Date: Tue, 14 Nov 2006 21:16:17 -0000 To: activemq-commits@geronimo.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061114211621.699821A984D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java?view=diff&rev=474985&r1=474984&r2=474985 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java Tue Nov 14 13:16:11 2006 @@ -1,238 +1,238 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq; - -import java.util.Date; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - -/** - * - */ -public class JmsSendReceiveWithMessageExpirationTest extends TestSupport { - - private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory - .getLog(JmsSendReceiveWithMessageExpirationTest.class); - - protected int messageCount = 100; - protected String[] data; - protected Session session; - protected Destination consumerDestination; - protected Destination producerDestination; - protected boolean durable = false; - protected int deliveryMode = DeliveryMode.PERSISTENT; - protected long timeToLive = 5000; - protected boolean verbose = false; - - protected Connection connection; - - protected void setUp() throws Exception { - - super.setUp(); - - data = new String[messageCount]; - - for (int i = 0; i < messageCount; i++) { - data[i] = "Text for message: " + i + " at " + new Date(); - } - - connectionFactory = createConnectionFactory(); - connection = createConnection(); - - if (durable) { - connection.setClientID(getClass().getName()); - } - - session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - /** - * Test consuming an expired queue. - * - * @throws Exception - */ - public void testConsumeExpiredQueue() throws Exception { - - MessageProducer producer = createProducer(timeToLive); - - consumerDestination = session.createQueue(getConsumerSubject()); - producerDestination = session.createQueue(getProducerSubject()); - - MessageConsumer consumer = createConsumer(); - connection.start(); - - for (int i = 0; i < data.length; i++) { - Message message = session.createTextMessage(data[i]); - message.setStringProperty("stringProperty",data[i]); - message.setIntProperty("intProperty",i); - - if (verbose) { - if (log.isDebugEnabled()) { - log.debug("About to send a queue message: " + message + " with text: " + data[i]); - } - } - - producer.send(producerDestination, message); - } - - // sleeps a second longer than the expiration time. - // Basically waits till queue expires. - Thread.sleep(timeToLive + 1000); - - // message should have expired. - assertNull(consumer.receive(1000)); - } - - /** - * Sends and consumes the messages to a queue destination. - * - * @throws Exception - */ - public void testConsumeQueue() throws Exception { - - MessageProducer producer = createProducer(0); - - consumerDestination = session.createQueue(getConsumerSubject()); - producerDestination = session.createQueue(getProducerSubject()); - - MessageConsumer consumer = createConsumer(); - connection.start(); - - for (int i = 0; i < data.length; i++) { - Message message = session.createTextMessage(data[i]); - message.setStringProperty("stringProperty",data[i]); - message.setIntProperty("intProperty",i); - - if (verbose) { - if (log.isDebugEnabled()) { - log.debug("About to send a queue message: " + message + " with text: " + data[i]); - } - } - - producer.send(producerDestination, message); - } - - // should receive a queue since there is no expiration. - assertNotNull(consumer.receive(1000)); - } - - /** - * Test consuming an expired topic. - * - * @throws Exception - */ - public void testConsumeExpiredTopic() throws Exception { - - MessageProducer producer = createProducer(timeToLive); - - consumerDestination = session.createTopic(getConsumerSubject()); - producerDestination = session.createTopic(getProducerSubject()); - - MessageConsumer consumer = createConsumer(); - connection.start(); - - for (int i = 0; i < data.length; i++) { - Message message = session.createTextMessage(data[i]); - message.setStringProperty("stringProperty",data[i]); - message.setIntProperty("intProperty",i); - - if (verbose) { - if (log.isDebugEnabled()) { - log.debug("About to send a topic message: " + message + " with text: " + data[i]); - } - } - - producer.send(producerDestination, message); - } - - // sleeps a second longer than the expiration time. - // Basically waits till topic expires. - Thread.sleep(timeToLive + 1000); - - // message should have expired. - assertNull(consumer.receive(1000)); - } - - /** - * Sends and consumes the messages to a topic destination. - * - * @throws Exception - */ - public void testConsumeTopic() throws Exception { - - MessageProducer producer = createProducer(0); - - consumerDestination = session.createTopic(getConsumerSubject()); - producerDestination = session.createTopic(getProducerSubject()); - - MessageConsumer consumer = createConsumer(); - connection.start(); - - for (int i = 0; i < data.length; i++) { - Message message = session.createTextMessage(data[i]); - message.setStringProperty("stringProperty",data[i]); - message.setIntProperty("intProperty",i); - - if (verbose) { - if (log.isDebugEnabled()) { - log.debug("About to send a topic message: " + message + " with text: " + data[i]); - } - } - - producer.send(producerDestination, message); - } - - // should receive a topic since there is no expiration. - assertNotNull(consumer.receive(1000)); - } - - - - protected MessageProducer createProducer(long timeToLive) throws JMSException { - MessageProducer producer = session.createProducer(null); - producer.setDeliveryMode(deliveryMode); - producer.setTimeToLive(timeToLive); - - return producer; - } - - protected MessageConsumer createConsumer() throws JMSException { - if (durable) { - log.info("Creating durable consumer"); - return session.createDurableSubscriber((Topic) consumerDestination, getName()); - } - return session.createConsumer(consumerDestination); - } - - protected void tearDown() throws Exception { - log.info("Dumping stats..."); - log.info("Closing down connection"); - - session.close(); - connection.close(); - } - -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Date; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +/** + * + */ +public class JmsSendReceiveWithMessageExpirationTest extends TestSupport { + + private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(JmsSendReceiveWithMessageExpirationTest.class); + + protected int messageCount = 100; + protected String[] data; + protected Session session; + protected Destination consumerDestination; + protected Destination producerDestination; + protected boolean durable = false; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected long timeToLive = 5000; + protected boolean verbose = false; + + protected Connection connection; + + protected void setUp() throws Exception { + + super.setUp(); + + data = new String[messageCount]; + + for (int i = 0; i < messageCount; i++) { + data[i] = "Text for message: " + i + " at " + new Date(); + } + + connectionFactory = createConnectionFactory(); + connection = createConnection(); + + if (durable) { + connection.setClientID(getClass().getName()); + } + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + /** + * Test consuming an expired queue. + * + * @throws Exception + */ + public void testConsumeExpiredQueue() throws Exception { + + MessageProducer producer = createProducer(timeToLive); + + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty",data[i]); + message.setIntProperty("intProperty",i); + + if (verbose) { + if (log.isDebugEnabled()) { + log.debug("About to send a queue message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // sleeps a second longer than the expiration time. + // Basically waits till queue expires. + Thread.sleep(timeToLive + 1000); + + // message should have expired. + assertNull(consumer.receive(1000)); + } + + /** + * Sends and consumes the messages to a queue destination. + * + * @throws Exception + */ + public void testConsumeQueue() throws Exception { + + MessageProducer producer = createProducer(0); + + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty",data[i]); + message.setIntProperty("intProperty",i); + + if (verbose) { + if (log.isDebugEnabled()) { + log.debug("About to send a queue message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // should receive a queue since there is no expiration. + assertNotNull(consumer.receive(1000)); + } + + /** + * Test consuming an expired topic. + * + * @throws Exception + */ + public void testConsumeExpiredTopic() throws Exception { + + MessageProducer producer = createProducer(timeToLive); + + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty",data[i]); + message.setIntProperty("intProperty",i); + + if (verbose) { + if (log.isDebugEnabled()) { + log.debug("About to send a topic message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // sleeps a second longer than the expiration time. + // Basically waits till topic expires. + Thread.sleep(timeToLive + 1000); + + // message should have expired. + assertNull(consumer.receive(1000)); + } + + /** + * Sends and consumes the messages to a topic destination. + * + * @throws Exception + */ + public void testConsumeTopic() throws Exception { + + MessageProducer producer = createProducer(0); + + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty",data[i]); + message.setIntProperty("intProperty",i); + + if (verbose) { + if (log.isDebugEnabled()) { + log.debug("About to send a topic message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // should receive a topic since there is no expiration. + assertNotNull(consumer.receive(1000)); + } + + + + protected MessageProducer createProducer(long timeToLive) throws JMSException { + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(deliveryMode); + producer.setTimeToLive(timeToLive); + + return producer; + } + + protected MessageConsumer createConsumer() throws JMSException { + if (durable) { + log.info("Creating durable consumer"); + return session.createDurableSubscriber((Topic) consumerDestination, getName()); + } + return session.createConsumer(consumerDestination); + } + + protected void tearDown() throws Exception { + log.info("Dumping stats..."); + log.info("Closing down connection"); + + session.close(); + connection.close(); + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/VolumeTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java?view=diff&rev=474985&r1=474984&r2=474985 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java Tue Nov 14 13:16:11 2006 @@ -15,310 +15,310 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.network; - -import java.net.URI; -import java.util.ArrayList; -import java.util.Iterator; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.advisory.ConsumerEvent; -import org.apache.activemq.advisory.ConsumerEventSource; -import org.apache.activemq.advisory.ConsumerListener; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; - -/** - * These test cases are used to verifiy that network connections get re established in all broker - * restart scenarios. - * - * @author chirino - */ -public class NetworkReconnectTest extends TestCase { - - protected static final Log log = LogFactory.getLog(NetworkReconnectTest.class); - - private BrokerService producerBroker; - private BrokerService consumerBroker; - private ActiveMQConnectionFactory producerConnectionFactory; - private ActiveMQConnectionFactory consumerConnectionFactory; - private Destination destination; - private ArrayList connections = new ArrayList(); - - public void testMultipleProducerBrokerRestarts() throws Exception { - for (int i = 0; i < 10; i++) { - testWithProducerBrokerRestart(); - disposeConsumerConnections(); - } - } - - public void testWithoutRestarts() throws Exception { - startProducerBroker(); - startConsumerBroker(); - - MessageConsumer consumer = createConsumer(); - AtomicInteger counter = createConsumerCounter(producerConnectionFactory); - waitForConsumerToArrive(counter); - - String messageId = sendMessage(); - Message message = consumer.receive(1000); - - assertEquals(messageId, message.getJMSMessageID()); - - assertNull( consumer.receiveNoWait() ); - - } - - public void testWithProducerBrokerRestart() throws Exception { - startProducerBroker(); - startConsumerBroker(); - - MessageConsumer consumer = createConsumer(); - AtomicInteger counter = createConsumerCounter(producerConnectionFactory); - waitForConsumerToArrive(counter); - - String messageId = sendMessage(); - Message message = consumer.receive(1000); - - assertEquals(messageId, message.getJMSMessageID()); - assertNull( consumer.receiveNoWait() ); - - // Restart the first broker... - stopProducerBroker(); - startProducerBroker(); - - counter = createConsumerCounter(producerConnectionFactory); - waitForConsumerToArrive(counter); - - messageId = sendMessage(); - message = consumer.receive(1000); - - assertEquals(messageId, message.getJMSMessageID()); - assertNull( consumer.receiveNoWait() ); - - } - - public void testWithConsumerBrokerRestart() throws Exception { - - startProducerBroker(); - startConsumerBroker(); - - MessageConsumer consumer = createConsumer(); - AtomicInteger counter = createConsumerCounter(producerConnectionFactory); - waitForConsumerToArrive(counter); - - String messageId = sendMessage(); - Message message = consumer.receive(1000); - - assertEquals(messageId, message.getJMSMessageID()); - assertNull( consumer.receiveNoWait() ); - - // Restart the first broker... - stopConsumerBroker(); - waitForConsumerToLeave(counter); - startConsumerBroker(); - - consumer = createConsumer(); - waitForConsumerToArrive(counter); - - messageId = sendMessage(); - message = consumer.receive(1000); - - assertEquals(messageId, message.getJMSMessageID()); - assertNull( consumer.receiveNoWait() ); - - } - - public void testWithConsumerBrokerStartDelay() throws Exception { - - startConsumerBroker(); - MessageConsumer consumer = createConsumer(); - - Thread.sleep(1000*5); - - startProducerBroker(); - AtomicInteger counter = createConsumerCounter(producerConnectionFactory); - waitForConsumerToArrive(counter); - - String messageId = sendMessage(); - Message message = consumer.receive(1000); - - assertEquals(messageId, message.getJMSMessageID()); - - assertNull( consumer.receiveNoWait() ); - - } - - - public void testWithProducerBrokerStartDelay() throws Exception { - - startProducerBroker(); - AtomicInteger counter = createConsumerCounter(producerConnectionFactory); - - Thread.sleep(1000*5); - - startConsumerBroker(); - MessageConsumer consumer = createConsumer(); - - waitForConsumerToArrive(counter); - - String messageId = sendMessage(); - Message message = consumer.receive(1000); - - assertEquals(messageId, message.getJMSMessageID()); - - assertNull( consumer.receiveNoWait() ); - - } - - protected void setUp() throws Exception { - - log.info("==============================================================================="); - log.info("Running Test Case: "+getName()); - log.info("==============================================================================="); - - producerConnectionFactory = createProducerConnectionFactory(); - consumerConnectionFactory = createConsumerConnectionFactory(); - destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE"); - - } - - protected void tearDown() throws Exception { - disposeConsumerConnections(); - try { - stopProducerBroker(); - } catch (Throwable e) { - } - try { - stopConsumerBroker(); - } catch (Throwable e) { - } - } - - protected void disposeConsumerConnections() { - for (Iterator iter = connections.iterator(); iter.hasNext();) { - Connection connection = (Connection) iter.next(); - try { connection.close(); } catch (Throwable ignore) {} - } - } - - protected void startProducerBroker() throws Exception { - if( producerBroker==null ) { - producerBroker = createFirstBroker(); - producerBroker.start(); - } - } - - protected void stopProducerBroker() throws Exception { - if( producerBroker!=null ) { - producerBroker.stop(); - producerBroker=null; - } - } - - protected void startConsumerBroker() throws Exception { - if( consumerBroker==null ) { - consumerBroker = createSecondBroker(); - consumerBroker.start(); - } - } - - protected void stopConsumerBroker() throws Exception { - if( consumerBroker!=null ) { - consumerBroker.stop(); - consumerBroker=null; - } - } - - protected BrokerService createFirstBroker() throws Exception { - return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml")); - } - - protected BrokerService createSecondBroker() throws Exception { - return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml")); - } - - protected ActiveMQConnectionFactory createProducerConnectionFactory() { - return new ActiveMQConnectionFactory("vm://broker1"); - } - - protected ActiveMQConnectionFactory createConsumerConnectionFactory() { - return new ActiveMQConnectionFactory("vm://broker2"); - } - - protected String sendMessage() throws JMSException { - Connection connection = null; - try { - connection = producerConnectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - Message message = session.createMessage(); - producer.send(message); - return message.getJMSMessageID(); - } finally { - try { connection.close(); } catch (Throwable ignore) {} - } - } - - protected MessageConsumer createConsumer() throws JMSException { - Connection connection = consumerConnectionFactory.createConnection(); - connections.add(connection); - connection.start(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - return session.createConsumer(destination); - } - - protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception { - final AtomicInteger rc = new AtomicInteger(0); - Connection connection = cf.createConnection(); - connections.add(connection); - connection.start(); - - ConsumerEventSource source = new ConsumerEventSource(connection, destination); - source.setConsumerListener(new ConsumerListener(){ - public void onConsumerEvent(ConsumerEvent event) { - rc.set(event.getConsumerCount()); - } - }); - source.start(); - - return rc; - } - - protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException { - for( int i=0; i < 100; i++ ) { - if( consumerCounter.get() > 0 ) { - return; - } - Thread.sleep(100); - } - fail("The consumer did not arrive."); - } - - protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException { - for( int i=0; i < 100; i++ ) { - if( consumerCounter.get() == 0 ) { - return; - } - Thread.sleep(100); - } - fail("The consumer did not leave."); - } - -} +package org.apache.activemq.network; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +/** + * These test cases are used to verifiy that network connections get re established in all broker + * restart scenarios. + * + * @author chirino + */ +public class NetworkReconnectTest extends TestCase { + + protected static final Log log = LogFactory.getLog(NetworkReconnectTest.class); + + private BrokerService producerBroker; + private BrokerService consumerBroker; + private ActiveMQConnectionFactory producerConnectionFactory; + private ActiveMQConnectionFactory consumerConnectionFactory; + private Destination destination; + private ArrayList connections = new ArrayList(); + + public void testMultipleProducerBrokerRestarts() throws Exception { + for (int i = 0; i < 10; i++) { + testWithProducerBrokerRestart(); + disposeConsumerConnections(); + } + } + + public void testWithoutRestarts() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithProducerBrokerRestart() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + // Restart the first broker... + stopProducerBroker(); + startProducerBroker(); + + counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + messageId = sendMessage(); + message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithConsumerBrokerRestart() throws Exception { + + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + // Restart the first broker... + stopConsumerBroker(); + waitForConsumerToLeave(counter); + startConsumerBroker(); + + consumer = createConsumer(); + waitForConsumerToArrive(counter); + + messageId = sendMessage(); + message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithConsumerBrokerStartDelay() throws Exception { + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + Thread.sleep(1000*5); + + startProducerBroker(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + + public void testWithProducerBrokerStartDelay() throws Exception { + + startProducerBroker(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + + Thread.sleep(1000*5); + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + protected void setUp() throws Exception { + + log.info("==============================================================================="); + log.info("Running Test Case: "+getName()); + log.info("==============================================================================="); + + producerConnectionFactory = createProducerConnectionFactory(); + consumerConnectionFactory = createConsumerConnectionFactory(); + destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE"); + + } + + protected void tearDown() throws Exception { + disposeConsumerConnections(); + try { + stopProducerBroker(); + } catch (Throwable e) { + } + try { + stopConsumerBroker(); + } catch (Throwable e) { + } + } + + protected void disposeConsumerConnections() { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + Connection connection = (Connection) iter.next(); + try { connection.close(); } catch (Throwable ignore) {} + } + } + + protected void startProducerBroker() throws Exception { + if( producerBroker==null ) { + producerBroker = createFirstBroker(); + producerBroker.start(); + } + } + + protected void stopProducerBroker() throws Exception { + if( producerBroker!=null ) { + producerBroker.stop(); + producerBroker=null; + } + } + + protected void startConsumerBroker() throws Exception { + if( consumerBroker==null ) { + consumerBroker = createSecondBroker(); + consumerBroker.start(); + } + } + + protected void stopConsumerBroker() throws Exception { + if( consumerBroker!=null ) { + consumerBroker.stop(); + consumerBroker=null; + } + } + + protected BrokerService createFirstBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml")); + } + + protected BrokerService createSecondBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml")); + } + + protected ActiveMQConnectionFactory createProducerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker1"); + } + + protected ActiveMQConnectionFactory createConsumerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker2"); + } + + protected String sendMessage() throws JMSException { + Connection connection = null; + try { + connection = producerConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + Message message = session.createMessage(); + producer.send(message); + return message.getJMSMessageID(); + } finally { + try { connection.close(); } catch (Throwable ignore) {} + } + } + + protected MessageConsumer createConsumer() throws JMSException { + Connection connection = consumerConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(destination); + } + + protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception { + final AtomicInteger rc = new AtomicInteger(0); + Connection connection = cf.createConnection(); + connections.add(connection); + connection.start(); + + ConsumerEventSource source = new ConsumerEventSource(connection, destination); + source.setConsumerListener(new ConsumerListener(){ + public void onConsumerEvent(ConsumerEvent event) { + rc.set(event.getConsumerCount()); + } + }); + source.start(); + + return rc; + } + + protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException { + for( int i=0; i < 100; i++ ) { + if( consumerCounter.get() > 0 ) { + return; + } + Thread.sleep(100); + } + fail("The consumer did not arrive."); + } + + protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException { + for( int i=0; i < 100; i++ ) { + if( consumerCounter.get() == 0 ) { + return; + } + Thread.sleep(100); + } + fail("The consumer did not leave."); + } + +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java?view=diff&rev=474985&r1=474984&r2=474985 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java Tue Nov 14 13:16:11 2006 @@ -15,77 +15,77 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.network; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.util.ArrayList; -import java.util.Iterator; - -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; - - -/** - * Test network reconnects over SSH tunnels. This case can be especially tricky since the SSH tunnels - * fool the TCP transport into thinking that they are initially connected. - * - * @author chirino - */ -public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest { - - ArrayList processes = new ArrayList(); - - - protected BrokerService createFirstBroker() throws Exception { - return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml")); - } - - protected BrokerService createSecondBroker() throws Exception { - return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml")); - } - - protected void setUp() throws Exception { - startProcess("ssh -Nn -L60006:localhost:61616 localhost"); - startProcess("ssh -Nn -L60007:localhost:61617 localhost"); - super.setUp(); - } - - protected void tearDown() throws Exception { - super.tearDown(); - for (Iterator iter = processes.iterator(); iter.hasNext();) { - Process p = (Process) iter.next(); - p.destroy(); - } - } - - private void startProcess(String command) throws IOException { - final Process process = Runtime.getRuntime().exec(command); - processes.add(process); - new Thread("stdout: "+command){ - public void run() { - try { - InputStream is = process.getInputStream(); - int c; - while((c=is.read())>=0) { - System.out.write(c); - } - } catch (IOException e) { - } - } - }.start(); - new Thread("stderr: "+command){ - public void run() { - try { - InputStream is = process.getErrorStream(); - int c; - while((c=is.read())>=0) { - System.err.write(c); - } - } catch (IOException e) { - } - } - }.start(); - } -} +package org.apache.activemq.network; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; + + +/** + * Test network reconnects over SSH tunnels. This case can be especially tricky since the SSH tunnels + * fool the TCP transport into thinking that they are initially connected. + * + * @author chirino + */ +public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest { + + ArrayList processes = new ArrayList(); + + + protected BrokerService createFirstBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml")); + } + + protected BrokerService createSecondBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml")); + } + + protected void setUp() throws Exception { + startProcess("ssh -Nn -L60006:localhost:61616 localhost"); + startProcess("ssh -Nn -L60007:localhost:61617 localhost"); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + for (Iterator iter = processes.iterator(); iter.hasNext();) { + Process p = (Process) iter.next(); + p.destroy(); + } + } + + private void startProcess(String command) throws IOException { + final Process process = Runtime.getRuntime().exec(command); + processes.add(process); + new Thread("stdout: "+command){ + public void run() { + try { + InputStream is = process.getInputStream(); + int c; + while((c=is.read())>=0) { + System.out.write(c); + } + } catch (IOException e) { + } + } + }.start(); + new Thread("stderr: "+command){ + public void run() { + try { + InputStream is = process.getErrorStream(); + int c; + while((c=is.read())>=0) { + System.err.write(c); + } + } catch (IOException e) { + } + } + }.start(); + } +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveQueueTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/JaasCertificateAuthenticationBrokerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/StubDoNothingCallbackHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/StubJaasConfiguration.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/StubLoginModule.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/security/StubSecurityContext.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportFactoryTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportServerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SslTransportTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/StubSSLServerSocket.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/StubSSLSession.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/StubSSLSocket.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/StubSSLSocketFactory.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/StubSslTransport.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/tcp/StubX509Certificate.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-jaas/src/main/java/org/apache/activemq/jaas/CertificateCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-jaas/src/main/java/org/apache/activemq/jaas/CertificateLoginModule.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-jaas/src/main/java/org/apache/activemq/jaas/JaasCertificateCallbackHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-jaas/src/main/java/org/apache/activemq/jaas/JassCredentialCallbackHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-jaas/src/test/java/org/apache/activemq/jaas/CertificateLoginModuleTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-jaas/src/test/java/org/apache/activemq/jaas/StubCertificateLoginModule.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CGeneratorTask.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CHeadersGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CSharpClassesGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CSharpGeneratorTask.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CSharpMarshallingGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CSourcesGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/CppGeneratorTask.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/JavaGeneratorTask.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/JavaMarshallingGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/JavaTestsGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/MultiSourceGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/SingleSourceGenerator.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java?view=diff&rev=474985&r1=474984&r2=474985 ============================================================================== --- incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java (original) +++ incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java Tue Nov 14 13:16:11 2006 @@ -1,78 +1,78 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.filter; - -import java.io.StringReader; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.TextMessage; -import javax.xml.xpath.XPath; -import javax.xml.xpath.XPathConstants; -import javax.xml.xpath.XPathExpressionException; -import javax.xml.xpath.XPathFactory; - -import org.apache.activemq.command.Message; -import org.apache.activemq.util.ByteArrayInputStream; -import org.xml.sax.InputSource; - -public class JAXPXPathEvaluator implements XPathExpression.XPathEvaluator { - - private static final XPathFactory factory = XPathFactory.newInstance(); - private javax.xml.xpath.XPathExpression expression; - - public JAXPXPathEvaluator(String xpathExpression) { - try { - XPath xpath = factory.newXPath(); - expression = xpath.compile(xpathExpression); - } catch (XPathExpressionException e) { - throw new RuntimeException("Invalid XPath expression: "+xpathExpression); - } - } - - public boolean evaluate(Message message) throws JMSException { - if( message instanceof TextMessage ) { - String text = ((TextMessage)message).getText(); - return evaluate(text); - } else if ( message instanceof BytesMessage ) { - BytesMessage bm = (BytesMessage) message; - byte data[] = new byte[(int) bm.getBodyLength()]; - bm.readBytes(data); - return evaluate(data); - } - return false; - } - - private boolean evaluate(byte[] data) { - try { - InputSource inputSource = new InputSource(new ByteArrayInputStream(data)); - return ((Boolean)expression.evaluate(inputSource, XPathConstants.BOOLEAN)).booleanValue(); - } catch (XPathExpressionException e) { - return false; - } - } - - private boolean evaluate(String text) { - try { - InputSource inputSource = new InputSource(new StringReader(text)); - return ((Boolean)expression.evaluate(inputSource, XPathConstants.BOOLEAN)).booleanValue(); - } catch (XPathExpressionException e) { - return false; - } - } -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.filter; + +import java.io.StringReader; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.TextMessage; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + +import org.apache.activemq.command.Message; +import org.apache.activemq.util.ByteArrayInputStream; +import org.xml.sax.InputSource; + +public class JAXPXPathEvaluator implements XPathExpression.XPathEvaluator { + + private static final XPathFactory factory = XPathFactory.newInstance(); + private javax.xml.xpath.XPathExpression expression; + + public JAXPXPathEvaluator(String xpathExpression) { + try { + XPath xpath = factory.newXPath(); + expression = xpath.compile(xpathExpression); + } catch (XPathExpressionException e) { + throw new RuntimeException("Invalid XPath expression: "+xpathExpression); + } + } + + public boolean evaluate(Message message) throws JMSException { + if( message instanceof TextMessage ) { + String text = ((TextMessage)message).getText(); + return evaluate(text); + } else if ( message instanceof BytesMessage ) { + BytesMessage bm = (BytesMessage) message; + byte data[] = new byte[(int) bm.getBodyLength()]; + bm.readBytes(data); + return evaluate(data); + } + return false; + } + + private boolean evaluate(byte[] data) { + try { + InputSource inputSource = new InputSource(new ByteArrayInputStream(data)); + return ((Boolean)expression.evaluate(inputSource, XPathConstants.BOOLEAN)).booleanValue(); + } catch (XPathExpressionException e) { + return false; + } + } + + private boolean evaluate(String text) { + try { + InputSource inputSource = new InputSource(new StringReader(text)); + return ((Boolean)expression.evaluate(inputSource, XPathConstants.BOOLEAN)).booleanValue(); + } catch (XPathExpressionException e) { + return false; + } + } +} Propchange: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/JAXPXPathEvaluator.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java?view=diff&rev=474985&r1=474984&r2=474985 ============================================================================== --- incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java (original) +++ incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java Tue Nov 14 13:16:11 2006 @@ -1,62 +1,62 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.filter; - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.TextMessage; - -import org.apache.activemq.command.Message; -import org.apache.activemq.util.ByteArrayInputStream; -import org.apache.xmlbeans.XmlObject; - -public class XMLBeansXPathEvaluator implements XPathExpression.XPathEvaluator { - - private final String xpath; - - public XMLBeansXPathEvaluator(String xpath) { - this.xpath = xpath; - } - - public boolean evaluate(Message message) throws JMSException { - if( message instanceof TextMessage ) { - String text = ((TextMessage)message).getText(); - try { - XmlObject object = XmlObject.Factory.parse(text); - XmlObject[] objects = object.selectPath(xpath); - return object!=null && objects.length>0; - } catch (Throwable e) { - return false; - } - - } else if ( message instanceof BytesMessage ) { - BytesMessage bm = (BytesMessage) message; - byte data[] = new byte[(int) bm.getBodyLength()]; - bm.readBytes(data); - try { - XmlObject object = XmlObject.Factory.parse(new ByteArrayInputStream(data)); - XmlObject[] objects = object.selectPath(xpath); - return object!=null && objects.length>0; - } catch (Throwable e) { - return false; - } - } - return false; - } -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.filter; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.apache.activemq.command.Message; +import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.xmlbeans.XmlObject; + +public class XMLBeansXPathEvaluator implements XPathExpression.XPathEvaluator { + + private final String xpath; + + public XMLBeansXPathEvaluator(String xpath) { + this.xpath = xpath; + } + + public boolean evaluate(Message message) throws JMSException { + if( message instanceof TextMessage ) { + String text = ((TextMessage)message).getText(); + try { + XmlObject object = XmlObject.Factory.parse(text); + XmlObject[] objects = object.selectPath(xpath); + return object!=null && objects.length>0; + } catch (Throwable e) { + return false; + } + + } else if ( message instanceof BytesMessage ) { + BytesMessage bm = (BytesMessage) message; + byte data[] = new byte[(int) bm.getBodyLength()]; + bm.readBytes(data); + try { + XmlObject object = XmlObject.Factory.parse(new ByteArrayInputStream(data)); + XmlObject[] objects = object.selectPath(xpath); + return object!=null && objects.length>0; + } catch (Throwable e) { + return false; + } + } + return false; + } +} Propchange: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/filter/XMLBeansXPathEvaluator.java ------------------------------------------------------------------------------ svn:eol-style = native