Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3DC3C17A92 for ; Tue, 11 Nov 2014 18:41:38 +0000 (UTC) Received: (qmail 31707 invoked by uid 500); 11 Nov 2014 18:41:32 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 31568 invoked by uid 500); 11 Nov 2014 18:41:32 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 30328 invoked by uid 99); 11 Nov 2014 18:41:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Nov 2014 18:41:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id DD0A5A0D751; Tue, 11 Nov 2014 18:41:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andytaylor@apache.org To: commits@activemq.apache.org Date: Tue, 11 Nov 2014 18:41:48 -0000 Message-Id: <3c28e45758334f89925730e1ca00a0f0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/51] [abbrv] [partial] activemq-6 git commit: ACTIVEMQ6-4 - Rename packages to ActiveMQ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java new file mode 100644 index 0000000..4be89fe --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQSession.java @@ -0,0 +1,1276 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionInProgressException; +import javax.transaction.xa.XAResource; + +import org.apache.activemq6.selector.filter.FilterException; +import org.apache.activemq6.selector.SelectorParser; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.HornetQQueueExistsException; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientConsumer; +import org.apache.activemq6.api.core.client.ClientProducer; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.api.core.client.ClientSession.AddressQuery; +import org.apache.activemq6.api.core.client.ClientSession.QueueQuery; + +/** + * HornetQ implementation of a JMS Session. + *
+ * Note that we *do not* support JMS ASF (Application Server Facilities) optional + * constructs such as ConnectionConsumer + * + * @author Ovidiu Feodorov + * @author Tim Fox + * @author Andy Taylor + * + * + */ +public class HornetQSession implements QueueSession, TopicSession +{ + public static final int TYPE_GENERIC_SESSION = 0; + + public static final int TYPE_QUEUE_SESSION = 1; + + public static final int TYPE_TOPIC_SESSION = 2; + + private static SimpleString REJECTING_FILTER = new SimpleString("_HQX=-1"); + + private final HornetQConnection connection; + + private final ClientSession session; + + private final int sessionType; + + private final int ackMode; + + private final boolean transacted; + + private final boolean xa; + + private boolean recoverCalled; + + private final Set consumers = new HashSet(); + + // Constructors -------------------------------------------------- + + protected HornetQSession(final HornetQConnection connection, + final boolean transacted, + final boolean xa, + final int ackMode, + final ClientSession session, + final int sessionType) + { + this.connection = connection; + + this.ackMode = ackMode; + + this.session = session; + + this.sessionType = sessionType; + + this.transacted = transacted; + + this.xa = xa; + } + + // Session implementation ---------------------------------------- + + public BytesMessage createBytesMessage() throws JMSException + { + checkClosed(); + + return new HornetQBytesMessage(session); + } + + public MapMessage createMapMessage() throws JMSException + { + checkClosed(); + + return new HornetQMapMessage(session); + } + + public Message createMessage() throws JMSException + { + checkClosed(); + + return new HornetQMessage(session); + } + + public ObjectMessage createObjectMessage() throws JMSException + { + checkClosed(); + + return new HornetQObjectMessage(session); + } + + public ObjectMessage createObjectMessage(final Serializable object) throws JMSException + { + checkClosed(); + + HornetQObjectMessage msg = new HornetQObjectMessage(session); + + msg.setObject(object); + + return msg; + } + + public StreamMessage createStreamMessage() throws JMSException + { + checkClosed(); + + return new HornetQStreamMessage(session); + } + + public TextMessage createTextMessage() throws JMSException + { + checkClosed(); + + HornetQTextMessage msg = new HornetQTextMessage(session); + + msg.setText(null); + + return msg; + } + + public TextMessage createTextMessage(final String text) throws JMSException + { + checkClosed(); + + HornetQTextMessage msg = new HornetQTextMessage(session); + + msg.setText(text); + + return msg; + } + + public boolean getTransacted() throws JMSException + { + checkClosed(); + + return transacted; + } + + public int getAcknowledgeMode() throws JMSException + { + checkClosed(); + + return ackMode; + } + + public boolean isXA() + { + return xa; + } + + public void commit() throws JMSException + { + if (!transacted) + { + throw new IllegalStateException("Cannot commit a non-transacted session"); + } + if (xa) + { + throw new TransactionInProgressException("Cannot call commit on an XA session"); + } + try + { + session.commit(); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void rollback() throws JMSException + { + if (!transacted) + { + throw new IllegalStateException("Cannot rollback a non-transacted session"); + } + if (xa) + { + throw new TransactionInProgressException("Cannot call rollback on an XA session"); + } + + try + { + session.rollback(); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void close() throws JMSException + { + connection.getThreadAwareContext().assertNotCompletionListenerThread(); + connection.getThreadAwareContext().assertNotMessageListenerThread(); + synchronized (connection) + { + try + { + for (HornetQMessageConsumer cons : new HashSet(consumers)) + { + cons.close(); + } + + session.close(); + + connection.removeSession(this); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + } + + public void recover() throws JMSException + { + if (transacted) + { + throw new IllegalStateException("Cannot recover a transacted session"); + } + + try + { + session.rollback(true); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + + recoverCalled = true; + } + + public MessageListener getMessageListener() throws JMSException + { + checkClosed(); + + return null; + } + + public void setMessageListener(final MessageListener listener) throws JMSException + { + checkClosed(); + } + + public void run() + { + } + + public MessageProducer createProducer(final Destination destination) throws JMSException + { + if (destination != null && !(destination instanceof HornetQDestination)) + { + throw new InvalidDestinationException("Not a HornetQ Destination:" + destination); + } + + try + { + HornetQDestination jbd = (HornetQDestination)destination; + + if (jbd != null) + { + ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist"); + } + + connection.addKnownDestination(jbd.getSimpleAddress()); + } + + ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress()); + + return new HornetQMessageProducer(connection, producer, jbd, session); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public MessageConsumer createConsumer(final Destination destination) throws JMSException + { + return createConsumer(destination, null, false); + } + + public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException + { + return createConsumer(destination, messageSelector, false); + } + + public MessageConsumer createConsumer(final Destination destination, + final String messageSelector, + final boolean noLocal) throws JMSException + { + if (destination == null) + { + throw new InvalidDestinationException("Cannot create a consumer with a null destination"); + } + + if (!(destination instanceof HornetQDestination)) + { + throw new InvalidDestinationException("Not a HornetQDestination:" + destination); + } + + HornetQDestination jbdest = (HornetQDestination)destination; + + if (jbdest.isTemporary() && !connection.containsTemporaryQueue(jbdest.getSimpleAddress())) + { + throw new JMSException("Can not create consumer for temporary destination " + destination + + " from another JMS connection"); + } + + return createConsumer(jbdest, null, messageSelector, noLocal, ConsumerDurability.NON_DURABLE); + } + + public Queue createQueue(final String queueName) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == HornetQSession.TYPE_TOPIC_SESSION) + { + throw new IllegalStateException("Cannot create a queue using a TopicSession"); + } + + try + { + HornetQQueue queue = lookupQueue(queueName, false); + + if (queue == null) + { + queue = lookupQueue(queueName, true); + } + + if (queue == null) + { + throw new JMSException("There is no queue with name " + queueName); + } + else + { + return queue; + } + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public Topic createTopic(final String topicName) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a topic on a QueueSession"); + } + + try + { + HornetQTopic topic = lookupTopic(topicName, false); + + if (topic == null) + { + topic = lookupTopic(topicName, true); + } + + if (topic == null) + { + throw new JMSException("There is no topic with name " + topicName); + } + else + { + return topic; + } + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException + { + return createDurableSubscriber(topic, name, null, false); + } + + public TopicSubscriber createDurableSubscriber(final Topic topic, + final String name, + String messageSelector, + final boolean noLocal) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a durable subscriber on a QueueSession"); + } + checkTopic(topic); + if (!(topic instanceof HornetQDestination)) + { + throw new InvalidDestinationException("Not a HornetQTopic:" + topic); + } + if ("".equals(messageSelector)) + { + messageSelector = null; + } + + HornetQDestination jbdest = (HornetQDestination)topic; + + if (jbdest.isQueue()) + { + throw new InvalidDestinationException("Cannot create a subscriber on a queue"); + } + + return createConsumer(jbdest, name, messageSelector, noLocal, ConsumerDurability.DURABLE); + } + + private void checkTopic(Topic topic) throws InvalidDestinationException + { + if (topic == null) + { + throw HornetQJMSClientBundle.BUNDLE.nullTopic(); + } + } + + @Override + public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException + { + return createSharedConsumer(topic, sharedSubscriptionName, null); + } + + /** + * Note: Needs to throw an exception if a subscriptionName is already in use by another topic, or if the messageSelector is different + * + * validate multiple subscriptions on the same session. + * validate multiple subscriptions on different sessions + * validate failure in one connection while another connection stills fine. + * Validate different filters in different possible scenarios + * + * @param topic + * @param name + * @param messageSelector + * @return + * @throws JMSException + */ + @Override + public MessageConsumer createSharedConsumer(Topic topic, String name, String messageSelector) throws JMSException + { + if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a shared consumer on a QueueSession"); + } + checkTopic(topic); + HornetQTopic localTopic; + if (topic instanceof HornetQTopic) + { + localTopic = (HornetQTopic)topic; + } + else + { + localTopic = new HornetQTopic(topic.getTopicName()); + } + return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.NON_DURABLE, true); + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException + { + return createDurableConsumer(topic, name, null, false); + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException + { + if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a durable consumer on a QueueSession"); + } + checkTopic(topic); + HornetQTopic localTopic; + if (topic instanceof HornetQTopic) + { + localTopic = (HornetQTopic)topic; + } + else + { + localTopic = new HornetQTopic(topic.getTopicName()); + } + return createConsumer(localTopic, name, messageSelector, noLocal, ConsumerDurability.DURABLE); + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException + { + return createSharedDurableConsumer(topic, name, null); + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException + { + if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a shared durable consumer on a QueueSession"); + } + + checkTopic(topic); + + HornetQTopic localTopic; + + if (topic instanceof HornetQTopic) + { + localTopic = (HornetQTopic)topic; + } + else + { + localTopic = new HornetQTopic(topic.getTopicName()); + } + return internalCreateSharedConsumer(localTopic, name, messageSelector, ConsumerDurability.DURABLE, true); + } + + enum ConsumerDurability + { + DURABLE, NON_DURABLE; + } + + + /** + * This is an internal method for shared consumers + */ + private HornetQMessageConsumer internalCreateSharedConsumer(final HornetQDestination dest, + final String subscriptionName, + String selectorString, + ConsumerDurability durability, + final boolean shared) throws JMSException + { + try + { + + if (dest.isQueue()) + { + // This is not really possible unless someone makes a mistake on code + // createSharedConsumer only accpets Topics by declaration + throw new RuntimeException("Internal error: createSharedConsumer is only meant for Topics"); + } + + if (subscriptionName == null) + { + throw HornetQJMSClientBundle.BUNDLE.invalidSubscriptionName(); + } + + selectorString = "".equals(selectorString) ? null : selectorString; + + SimpleString coreFilterString = null; + + if (selectorString != null) + { + coreFilterString = new SimpleString(SelectorTranslator.convertToHornetQFilterString(selectorString)); + } + + ClientConsumer consumer; + + SimpleString autoDeleteQueueName = null; + + AddressQuery response = session.addressQuery(dest.getSimpleAddress()); + + if (!response.isExists()) + { + throw HornetQJMSClientBundle.BUNDLE.destinationDoesNotExist(dest.getSimpleAddress()); + } + + SimpleString queueName; + + if (dest.isTemporary() && durability == ConsumerDurability.DURABLE) + { + throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); + } + + queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), + subscriptionName)); + + if (durability == ConsumerDurability.DURABLE) + { + try + { + session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + } + catch (HornetQQueueExistsException ignored) + { + // We ignore this because querying and then creating the queue wouldn't be idempotent + // we could also add a parameter to ignore existence what would require a bigger work around to avoid + // compatibility. + } + } + else + { + session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, false); + } + + consumer = session.createConsumer(queueName, null, false); + + HornetQMessageConsumer jbc = new HornetQMessageConsumer(connection, this, + consumer, + false, + dest, + selectorString, + autoDeleteQueueName); + + consumers.add(jbc); + + return jbc; + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + + + private HornetQMessageConsumer createConsumer(final HornetQDestination dest, + final String subscriptionName, + String selectorString, final boolean noLocal, + ConsumerDurability durability) throws JMSException + { + try + { + selectorString = "".equals(selectorString) ? null : selectorString; + + if (noLocal) + { + connection.setHasNoLocal(); + + String filter; + if (connection.getClientID() != null) + { + filter = + HornetQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getClientID() + + "'"; + } + else + { + filter = HornetQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + connection.getUID() + "'"; + } + + if (selectorString != null) + { + selectorString += " AND " + filter; + } + else + { + selectorString = filter; + } + } + + SimpleString coreFilterString = null; + + if (selectorString != null) + { + coreFilterString = new SimpleString(SelectorTranslator.convertToHornetQFilterString(selectorString)); + } + + ClientConsumer consumer; + + SimpleString autoDeleteQueueName = null; + + if (dest.isQueue()) + { + AddressQuery response = session.addressQuery(dest.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist"); + } + + connection.addKnownDestination(dest.getSimpleAddress()); + + consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, false); + } + else + { + AddressQuery response = session.addressQuery(dest.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Topic " + dest.getName() + " does not exist"); + } + + connection.addKnownDestination(dest.getSimpleAddress()); + + SimpleString queueName; + + if (subscriptionName == null) + { + if (durability != ConsumerDurability.NON_DURABLE) + throw new RuntimeException(); + // Non durable sub + + queueName = new SimpleString(UUID.randomUUID().toString()); + + session.createTemporaryQueue(dest.getSimpleAddress(), queueName, coreFilterString); + + consumer = session.createConsumer(queueName, null, false); + + autoDeleteQueueName = queueName; + } + else + { + // Durable sub + if (durability != ConsumerDurability.DURABLE) + throw new RuntimeException(); + if (connection.getClientID() == null) + { + throw new IllegalStateException("Cannot create durable subscription - client ID has not been set"); + } + + if (dest.isTemporary()) + { + throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic"); + } + + queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), + subscriptionName)); + + QueueQuery subResponse = session.queueQuery(queueName); + + if (!subResponse.isExists()) + { + session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + } + else + { + // Already exists + if (subResponse.getConsumerCount() > 0) + { + throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)"); + } + + // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1): + // A client can change an existing durable subscription by + // creating a durable + // TopicSubscriber with the same name and a new topic and/or + // message selector. + // Changing a durable subscriber is equivalent to + // unsubscribing (deleting) the old + // one and creating a new one. + + SimpleString oldFilterString = subResponse.getFilterString(); + + boolean selectorChanged = coreFilterString == null && oldFilterString != null || + oldFilterString == null && + coreFilterString != null || + oldFilterString != null && + coreFilterString != null && + !oldFilterString.equals(coreFilterString); + + SimpleString oldTopicName = subResponse.getAddress(); + + boolean topicChanged = !oldTopicName.equals(dest.getSimpleAddress()); + + if (selectorChanged || topicChanged) + { + // Delete the old durable sub + session.deleteQueue(queueName); + + // Create the new one + session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, true); + } + } + + consumer = session.createConsumer(queueName, null, false); + } + } + + HornetQMessageConsumer jbc = new HornetQMessageConsumer(connection, + this, + consumer, + noLocal, + dest, + selectorString, + autoDeleteQueueName); + + consumers.add(jbc); + + return jbc; + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void ackAllConsumers() throws JMSException + { + checkClosed(); + } + + public QueueBrowser createBrowser(final Queue queue) throws JMSException + { + return createBrowser(queue, null); + } + + public QueueBrowser createBrowser(final Queue queue, String filterString) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == HornetQSession.TYPE_TOPIC_SESSION) + { + throw new IllegalStateException("Cannot create a browser on a TopicSession"); + } + if (queue == null) + { + throw new InvalidDestinationException("Cannot create a browser with a null queue"); + } + if (!(queue instanceof HornetQDestination)) + { + throw new InvalidDestinationException("Not a HornetQQueue:" + queue); + } + if ("".equals(filterString)) + { + filterString = null; + } + + // eager test of the filter syntax as required by JMS spec + try + { + if (filterString != null) + { + SelectorParser.parse(filterString.trim()); + } + } + catch (FilterException e) + { + throw JMSExceptionHelper.convertFromHornetQException(HornetQJMSClientBundle.BUNDLE.invalidFilter(e, new SimpleString(filterString))); + } + + HornetQDestination jbq = (HornetQDestination)queue; + + if (!jbq.isQueue()) + { + throw new InvalidDestinationException("Cannot create a browser on a topic"); + } + + try + { + AddressQuery message = session.addressQuery(new SimpleString(jbq.getAddress())); + if (!message.isExists()) + { + throw new InvalidDestinationException(jbq.getAddress() + " does not exist"); + } + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + + return new HornetQQueueBrowser((HornetQQueue)jbq, filterString, session); + + } + + public TemporaryQueue createTemporaryQueue() throws JMSException + { + // As per spec. section 4.11 + if (sessionType == HornetQSession.TYPE_TOPIC_SESSION) + { + throw new IllegalStateException("Cannot create a temporary queue using a TopicSession"); + } + + try + { + HornetQTemporaryQueue queue = HornetQDestination.createTemporaryQueue(this); + + SimpleString simpleAddress = queue.getSimpleAddress(); + + session.createTemporaryQueue(simpleAddress, simpleAddress); + + connection.addTemporaryQueue(simpleAddress); + + return queue; + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public TemporaryTopic createTemporaryTopic() throws JMSException + { + // As per spec. section 4.11 + if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot create a temporary topic on a QueueSession"); + } + + try + { + HornetQTemporaryTopic topic = HornetQDestination.createTemporaryTopic(this); + + SimpleString simpleAddress = topic.getSimpleAddress(); + + // We create a dummy subscription on the topic, that never receives messages - this is so we can perform JMS + // checks when routing messages to a topic that + // does not exist - otherwise we would not be able to distinguish from a non existent topic and one with no + // subscriptions - core has no notion of a topic + + session.createTemporaryQueue(simpleAddress, simpleAddress, HornetQSession.REJECTING_FILTER); + + connection.addTemporaryQueue(simpleAddress); + + return topic; + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void unsubscribe(final String name) throws JMSException + { + // As per spec. section 4.11 + if (sessionType == HornetQSession.TYPE_QUEUE_SESSION) + { + throw new IllegalStateException("Cannot unsubscribe using a QueueSession"); + } + + SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), + name)); + + try + { + QueueQuery response = session.queueQuery(queueName); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Cannot unsubscribe, subscription with name " + name + + " does not exist"); + } + + if (response.getConsumerCount() != 0) + { + throw new IllegalStateException("Cannot unsubscribe durable subscription " + name + + " since it has active subscribers"); + } + + session.deleteQueue(queueName); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + // XASession implementation + + public Session getSession() throws JMSException + { + if (!xa) + { + throw new IllegalStateException("Isn't an XASession"); + } + + return this; + } + + public XAResource getXAResource() + { + return session.getXAResource(); + } + + // QueueSession implementation + + public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException + { + return (QueueReceiver)createConsumer(queue, messageSelector); + } + + public QueueReceiver createReceiver(final Queue queue) throws JMSException + { + return (QueueReceiver)createConsumer(queue); + } + + public QueueSender createSender(final Queue queue) throws JMSException + { + return (QueueSender)createProducer(queue); + } + + // XAQueueSession implementation + + public QueueSession getQueueSession() throws JMSException + { + return (QueueSession)getSession(); + } + + // TopicSession implementation + + public TopicPublisher createPublisher(final Topic topic) throws JMSException + { + return (TopicPublisher)createProducer(topic); + } + + public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException + { + return (TopicSubscriber)createConsumer(topic, messageSelector, noLocal); + } + + public TopicSubscriber createSubscriber(final Topic topic) throws JMSException + { + return (TopicSubscriber)createConsumer(topic); + } + + // XATopicSession implementation + + public TopicSession getTopicSession() throws JMSException + { + return (TopicSession)getSession(); + } + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + return "HornetQSession->" + session; + } + + public ClientSession getCoreSession() + { + return session; + } + + public boolean isRecoverCalled() + { + return recoverCalled; + } + + public void setRecoverCalled(final boolean recoverCalled) + { + this.recoverCalled = recoverCalled; + } + + public void deleteTemporaryTopic(final HornetQDestination tempTopic) throws JMSException + { + if (!tempTopic.isTemporary()) + { + throw new InvalidDestinationException("Not a temporary topic " + tempTopic); + } + + try + { + AddressQuery response = session.addressQuery(tempTopic.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Cannot delete temporary topic " + tempTopic.getName() + + " does not exist"); + } + + if (response.getQueueNames().size() > 1) + { + throw new IllegalStateException("Cannot delete temporary topic " + tempTopic.getName() + + " since it has subscribers"); + } + + SimpleString address = tempTopic.getSimpleAddress(); + + session.deleteQueue(address); + + connection.removeTemporaryQueue(address); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void deleteTemporaryQueue(final HornetQDestination tempQueue) throws JMSException + { + if (!tempQueue.isTemporary()) + { + throw new InvalidDestinationException("Not a temporary queue " + tempQueue); + } + try + { + QueueQuery response = session.queueQuery(tempQueue.getSimpleAddress()); + + if (!response.isExists()) + { + throw new InvalidDestinationException("Cannot delete temporary queue " + tempQueue.getName() + + " does not exist"); + } + + if (response.getConsumerCount() > 0) + { + throw new IllegalStateException("Cannot delete temporary queue " + tempQueue.getName() + + " since it has subscribers"); + } + + SimpleString address = tempQueue.getSimpleAddress(); + + session.deleteQueue(address); + + connection.removeTemporaryQueue(address); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void start() throws JMSException + { + try + { + session.start(); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void stop() throws JMSException + { + try + { + session.stop(); + } + catch (HornetQException e) + { + throw JMSExceptionHelper.convertFromHornetQException(e); + } + } + + public void removeConsumer(final HornetQMessageConsumer consumer) + { + consumers.remove(consumer); + } + + // Package protected --------------------------------------------- + + void deleteQueue(final SimpleString queueName) throws JMSException + { + if (!session.isClosed()) + { + try + { + session.deleteQueue(queueName); + } + catch (HornetQException ignore) + { + // Exception on deleting queue shouldn't prevent close from completing + } + } + } + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + private void checkClosed() throws JMSException + { + if (session.isClosed()) + { + throw new IllegalStateException("Session is closed"); + } + } + + private HornetQQueue lookupQueue(final String queueName, boolean isTemporary) throws HornetQException + { + HornetQQueue queue; + + if (isTemporary) + { + queue = HornetQDestination.createTemporaryQueue(queueName); + } + else + { + queue = HornetQDestination.createQueue(queueName); + } + + QueueQuery response = session.queueQuery(queue.getSimpleAddress()); + + if (response.isExists()) + { + return queue; + } + else + { + return null; + } + } + + private HornetQTopic lookupTopic(final String topicName, final boolean isTemporary) throws HornetQException + { + + HornetQTopic topic; + + if (isTemporary) + { + topic = HornetQDestination.createTemporaryTopic(topicName); + } + else + { + topic = HornetQDestination.createTopic(topicName); + } + + AddressQuery query = session.addressQuery(topic.getSimpleAddress()); + + if (!query.isExists()) + { + return null; + } + else + { + return topic; + } + } + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java new file mode 100644 index 0000000..86182a8 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQStreamMessage.java @@ -0,0 +1,466 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.StreamMessage; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.ClientSession; +import org.apache.activemq6.core.client.impl.ClientMessageImpl; +import org.apache.activemq6.utils.DataConstants; + +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBoolean; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadByte; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBytes; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadChar; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadDouble; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadFloat; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadInteger; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadLong; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadObject; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadShort; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadString; + +/** + * HornetQ implementation of a JMS StreamMessage. + * + * @author Tim Fox + * + * Some parts based on JBM 1.x class by: + * + * @author Norbert Lataille (Norbert.Lataille@m4x.org) + * @author Adrian Brock + * @author Tim Fox + * @author Ovidiu Feodorov + * @author Andy Taylor + */ +public final class HornetQStreamMessage extends HornetQMessage implements StreamMessage +{ + public static final byte TYPE = Message.STREAM_TYPE; + + protected HornetQStreamMessage(final ClientSession session) + { + super(HornetQStreamMessage.TYPE, session); + } + + protected HornetQStreamMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + } + + public HornetQStreamMessage(final StreamMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, HornetQStreamMessage.TYPE, session); + + foreign.reset(); + + try + { + while (true) + { + Object obj = foreign.readObject(); + writeObject(obj); + } + } + catch (MessageEOFException e) + { + // Ignore + } + } + + // For testing only + public HornetQStreamMessage() + { + message = new ClientMessageImpl((byte)0, false, 0, 0, (byte)4, 1500); + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return HornetQStreamMessage.TYPE; + } + + // StreamMessage implementation ---------------------------------- + + public boolean readBoolean() throws JMSException + { + checkRead(); + try + { + return streamReadBoolean(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public byte readByte() throws JMSException + { + checkRead(); + + try + { + return streamReadByte(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public short readShort() throws JMSException + { + checkRead(); + try + { + return streamReadShort(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public char readChar() throws JMSException + { + checkRead(); + try + { + return streamReadChar(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readInt() throws JMSException + { + checkRead(); + try + { + return streamReadInteger(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public long readLong() throws JMSException + { + checkRead(); + try + { + return streamReadLong(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public float readFloat() throws JMSException + { + checkRead(); + try + { + return streamReadFloat(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public double readDouble() throws JMSException + { + checkRead(); + try + { + return streamReadDouble(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public String readString() throws JMSException + { + checkRead(); + try + { + return streamReadString(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + /** + * len here is used to control how many more bytes to read + */ + private int len = 0; + + public int readBytes(final byte[] value) throws JMSException + { + checkRead(); + try + { + Pair pairRead = streamReadBytes(message, len, value); + + len = pairRead.getA(); + return pairRead.getB(); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public Object readObject() throws JMSException + { + checkRead(); + try + { + return streamReadObject(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public void writeBoolean(final boolean value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BOOLEAN); + getBuffer().writeBoolean(value); + } + + public void writeByte(final byte value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BYTE); + getBuffer().writeByte(value); + } + + public void writeShort(final short value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.SHORT); + getBuffer().writeShort(value); + } + + public void writeChar(final char value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.CHAR); + getBuffer().writeShort((short)value); + } + + public void writeInt(final int value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.INT); + getBuffer().writeInt(value); + } + + public void writeLong(final long value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.LONG); + getBuffer().writeLong(value); + } + + public void writeFloat(final float value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.FLOAT); + getBuffer().writeInt(Float.floatToIntBits(value)); + } + + public void writeDouble(final double value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.DOUBLE); + getBuffer().writeLong(Double.doubleToLongBits(value)); + } + + public void writeString(final String value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.STRING); + getBuffer().writeNullableString(value); + } + + public void writeBytes(final byte[] value) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(value.length); + getBuffer().writeBytes(value); + } + + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException + { + checkWrite(); + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(length); + getBuffer().writeBytes(value, offset, length); + } + + public void writeObject(final Object value) throws JMSException + { + if (value instanceof String) + { + writeString((String)value); + } + else if (value instanceof Boolean) + { + writeBoolean((Boolean)value); + } + else if (value instanceof Byte) + { + writeByte((Byte)value); + } + else if (value instanceof Short) + { + writeShort((Short)value); + } + else if (value instanceof Integer) + { + writeInt((Integer)value); + } + else if (value instanceof Long) + { + writeLong((Long)value); + } + else if (value instanceof Float) + { + writeFloat((Float)value); + } + else if (value instanceof Double) + { + writeDouble((Double)value); + } + else if (value instanceof byte[]) + { + writeBytes((byte[])value); + } + else if (value instanceof Character) + { + writeChar((Character)value); + } + else if (value == null) + { + writeString(null); + } + else + { + throw new MessageFormatException("Invalid object type: " + value.getClass()); + } + } + + public void reset() throws JMSException + { + if (!readOnly) + { + readOnly = true; + } + getBuffer().resetReaderIndex(); + } + + // HornetQRAMessage overrides ---------------------------------------- + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + getBuffer().clear(); + } + + @Override + public void doBeforeSend() throws Exception + { + reset(); + } + + private HornetQBuffer getBuffer() + { + return message.getBodyBuffer(); + } + + @SuppressWarnings("rawtypes") + @Override + public boolean isBodyAssignableTo(Class c) + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java new file mode 100644 index 0000000..e8ad86b --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryQueue.java @@ -0,0 +1,67 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.TemporaryQueue; + + +/** + * HornetQ implementation of a JMS TemporaryQueue. + *
+ * This class can be instantiated directly. + * + * @author Tim Fox + * @version $Revision: 3569 $ + * + */ +public class HornetQTemporaryQueue extends HornetQQueue implements TemporaryQueue +{ + // Constants ----------------------------------------------------- + + private static final long serialVersionUID = -4624930377557954624L; + + // Static -------------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Constructors -------------------------------------------------- + + + // TemporaryQueue implementation ------------------------------------------ + + // Public -------------------------------------------------------- + + /** + * @param address + * @param name + * @param session + */ + public HornetQTemporaryQueue(String address, String name, HornetQSession session) + { + super(address, name, true, session); + } + + @Override + public String toString() + { + return "HornetQTemporaryQueue[" + name + "]"; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java new file mode 100644 index 0000000..442f0fc --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTemporaryTopic.java @@ -0,0 +1,53 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.TemporaryTopic; + +/** + * A HornetQTemporaryTopic + * + * @author Clebert Suconic + * + * + */ +public class HornetQTemporaryTopic extends HornetQTopic implements TemporaryTopic +{ + + // Constants ----------------------------------------------------- + + private static final long serialVersionUID = 845450764835635266L; + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + protected HornetQTemporaryTopic(final String address, final String name, + final HornetQSession session) + { + super(address, name, true, session); + } + + // Public -------------------------------------------------------- + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java new file mode 100644 index 0000000..4684cb2 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTextMessage.java @@ -0,0 +1,146 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.ClientMessage; +import org.apache.activemq6.api.core.client.ClientSession; + +import static org.apache.activemq6.reader.TextMessageUtil.readBodyText; +import static org.apache.activemq6.reader.TextMessageUtil.writeBodyText; + + +/** + * HornetQ implementation of a JMS TextMessage. + *
+ * This class was ported from SpyTextMessage in JBossMQ. + * + * @author Norbert Lataille (Norbert.Lataille@m4x.org) + * @author Jason Dillon + * @author Adrian Brock + * @author Tim Fox + * @author Ovidiu Feodorov + * @author Andy Taylor + * @version $Revision: 3412 $ + */ +public class HornetQTextMessage extends HornetQMessage implements TextMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.TEXT_TYPE; + + // Attributes ---------------------------------------------------- + + // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write + // methods are more efficient for a SimpleString + private SimpleString text; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + public HornetQTextMessage(final ClientSession session) + { + super(HornetQTextMessage.TYPE, session); + } + + public HornetQTextMessage(final ClientMessage message, final ClientSession session) + { + super(message, session); + } + + /** + * A copy constructor for non-HornetQ JMS TextMessages. + */ + public HornetQTextMessage(final TextMessage foreign, final ClientSession session) throws JMSException + { + super(foreign, HornetQTextMessage.TYPE, session); + + setText(foreign.getText()); + } + + // Public -------------------------------------------------------- + + @Override + public byte getType() + { + return HornetQTextMessage.TYPE; + } + + // TextMessage implementation ------------------------------------ + + public void setText(final String text) throws JMSException + { + checkWrite(); + + if (text != null) + { + this.text = new SimpleString(text); + } + else + { + this.text = null; + } + + writeBodyText(message, this.text); + } + + public String getText() + { + if (text != null) + { + return text.toString(); + } + else + { + return null; + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + text = null; + } + + // HornetQRAMessage override ----------------------------------------- + + @Override + public void doBeforeReceive() throws HornetQException + { + super.doBeforeReceive(); + + text = readBodyText(message); + } + + @Override + protected T getBodyInternal(Class c) + { + return (T) getText(); + } + + @Override + public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class c) + { + if (text == null) + return true; + return c.isAssignableFrom(java.lang.String.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java new file mode 100644 index 0000000..9b1d835 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopic.java @@ -0,0 +1,85 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.Topic; + +import org.apache.activemq6.api.core.SimpleString; + +/** + * HornetQ implementation of a JMS Topic. + *
+ * This class can be instantiated directly. + * + * @author Ovidiu Feodorov + * @author Tim Fox + * @version $Revision: 8737 $ + * + */ +public class HornetQTopic extends HornetQDestination implements Topic +{ + // Constants ----------------------------------------------------- + + private static final long serialVersionUID = 7873614001276404156L; + // Static -------------------------------------------------------- + + public static SimpleString createAddressFromName(final String name) + { + return new SimpleString(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name); + } + + // Attributes ---------------------------------------------------- + + // Constructors -------------------------------------------------- + + public HornetQTopic(final String name) + { + super(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + name, name, false, false, null); + } + + + /** + * @param address + * @param name + * @param temporary + * @param session + */ + protected HornetQTopic(String address, String name, boolean temporary, HornetQSession session) + { + super(address, name, temporary, false, session); + } + + + // Topic implementation ------------------------------------------ + + public String getTopicName() + { + return name; + } + + // Public -------------------------------------------------------- + + @Override + public String toString() + { + return "HornetQTopic[" + name + "]"; + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java new file mode 100644 index 0000000..26b0e72 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQTopicConnectionFactory.java @@ -0,0 +1,70 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.TopicConnectionFactory; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.jms.JMSFactoryType; + +/** + * A class that represents a TopicConnectionFactory. + * + * @author Howard Gao + */ +public class HornetQTopicConnectionFactory extends HornetQConnectionFactory implements TopicConnectionFactory +{ + private static final long serialVersionUID = 7317051989866548455L; + + /** + * + */ + public HornetQTopicConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public HornetQTopicConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + + /** + * @param ha + * @param groupConfiguration + */ + public HornetQTopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public HornetQTopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + public int getFactoryType() + { + return JMSFactoryType.TOPIC_CF.intValue(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java new file mode 100644 index 0000000..e6b0edf --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnection.java @@ -0,0 +1,70 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.XAQueueConnection; +import javax.jms.XAQueueSession; +import javax.jms.XASession; +import javax.jms.XATopicConnection; +import javax.jms.XATopicSession; + +import org.apache.activemq6.api.core.client.ClientSessionFactory; + +/** + * HornetQ implementation of a JMS XAConnection. + *

+ * The flat implementation of {@link XATopicConnection} and {@link XAQueueConnection} is per design, + * following common practices of JMS 1.1. + * @author Howard Gao + */ +public final class HornetQXAConnection extends HornetQConnection implements XATopicConnection, XAQueueConnection +{ + + public HornetQXAConnection(final String username, final String password, final int connectionType, + final String clientID, final int dupsOKBatchSize, final int transactionBatchSize, + final ClientSessionFactory sessionFactory) + { + super(username, password, connectionType, clientID, dupsOKBatchSize, transactionBatchSize, sessionFactory); + } + + @Override + public XASession createXASession() throws JMSException + { + checkClosed(); + return (XASession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_GENERIC_SESSION); + } + + @Override + public XAQueueSession createXAQueueSession() throws JMSException + { + checkClosed(); + return (XAQueueSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_QUEUE_SESSION); + + } + + @Override + public XATopicSession createXATopicSession() throws JMSException + { + checkClosed(); + return (XATopicSession)createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, HornetQSession.TYPE_TOPIC_SESSION); + } + + @Override + protected boolean isXA() + { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java new file mode 100644 index 0000000..b0ae3d7 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAConnectionFactory.java @@ -0,0 +1,76 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.XAQueueConnectionFactory; +import javax.jms.XATopicConnectionFactory; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.jms.JMSFactoryType; + +/** + * A class that represents a XAConnectionFactory. + *

+ * We consider the XAConnectionFactory to be the most complete possible option. It can be casted to any other connection factory since it is fully functional + * + * @author Howard Gao + */ +public class HornetQXAConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory, + XAQueueConnectionFactory +{ + private static final long serialVersionUID = 743611571839154115L; + + /** + * + */ + public HornetQXAConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public HornetQXAConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public HornetQXAConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public HornetQXAConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + @Override + public int getFactoryType() + { + return JMSFactoryType.XA_CF.intValue(); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java new file mode 100644 index 0000000..040f7db --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAJMSContext.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.XAJMSContext; + +public class HornetQXAJMSContext extends HornetQJMSContext implements XAJMSContext +{ + public HornetQXAJMSContext(HornetQConnectionForContext connection, ThreadAwareContext threadAwareContext) + { + super(connection, threadAwareContext); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java new file mode 100644 index 0000000..c8875ec --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXAQueueConnectionFactory.java @@ -0,0 +1,71 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.XAQueueConnectionFactory; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.jms.JMSFactoryType; + +/** + * A class that represents a XAQueueConnectionFactory. + * + * @author Howard Gao + * + */ +public class HornetQXAQueueConnectionFactory extends HornetQConnectionFactory implements XAQueueConnectionFactory +{ + private static final long serialVersionUID = 8612457847251087454L; + + /** + * + */ + public HornetQXAQueueConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public HornetQXAQueueConnectionFactory(ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public HornetQXAQueueConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public HornetQXAQueueConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + public int getFactoryType() + { + return JMSFactoryType.QUEUE_XA_CF.intValue(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java new file mode 100644 index 0000000..65aa067 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXASession.java @@ -0,0 +1,47 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.XAQueueSession; +import javax.jms.XATopicSession; + +import org.apache.activemq6.api.core.client.ClientSession; + +/** + * A HornetQXASession + * + * @author clebertsuconic + * + * + */ +public class HornetQXASession extends HornetQSession implements XAQueueSession, XATopicSession +{ + + /** + * @param connection + * @param transacted + * @param xa + * @param ackMode + * @param session + * @param sessionType + */ + protected HornetQXASession(HornetQConnection connection, + boolean transacted, + boolean xa, + int ackMode, + ClientSession session, + int sessionType) + { + super(connection, transacted, xa, ackMode, session, sessionType); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java new file mode 100644 index 0000000..d626c14 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/HornetQXATopicConnectionFactory.java @@ -0,0 +1,69 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.XATopicConnectionFactory; + +import org.apache.activemq6.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq6.api.core.TransportConfiguration; +import org.apache.activemq6.api.core.client.ServerLocator; +import org.apache.activemq6.api.jms.JMSFactoryType; + +/** + * A class that represents a XATopicConnectionFactory. + * + * @author Howard Gao + */ +public class HornetQXATopicConnectionFactory extends HornetQConnectionFactory implements XATopicConnectionFactory +{ + private static final long serialVersionUID = -7018290426884419693L; + + /** + * + */ + public HornetQXATopicConnectionFactory() + { + super(); + } + + /** + * @param serverLocator + */ + public HornetQXATopicConnectionFactory(final ServerLocator serverLocator) + { + super(serverLocator); + } + + /** + * @param ha + * @param groupConfiguration + */ + public HornetQXATopicConnectionFactory(final boolean ha, final DiscoveryGroupConfiguration groupConfiguration) + { + super(ha, groupConfiguration); + } + + /** + * @param ha + * @param initialConnectors + */ + public HornetQXATopicConnectionFactory(final boolean ha, final TransportConfiguration... initialConnectors) + { + super(ha, initialConnectors); + } + + public int getFactoryType() + { + return JMSFactoryType.TOPIC_XA_CF.intValue(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java ---------------------------------------------------------------------- diff --git a/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java new file mode 100644 index 0000000..ba5a224 --- /dev/null +++ b/activemq6-jms-client/src/main/java/org/apache/activemq6/jms/client/JMSExceptionHelper.java @@ -0,0 +1,91 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.jms.client; + +import javax.jms.InvalidDestinationException; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; + +import org.apache.activemq6.api.core.HornetQException; + +/** + * + * A JMSExceptionHelper + * + * @author Tim Fox + * + */ +public final class JMSExceptionHelper +{ + + public static JMSException convertFromHornetQException(final HornetQException me) + { + JMSException je; + switch (me.getType()) + { + case CONNECTION_TIMEDOUT: + je = new JMSException(me.getMessage()); + break; + + case ILLEGAL_STATE: + je = new javax.jms.IllegalStateException(me.getMessage()); + break; + + case INTERNAL_ERROR: + je = new JMSException(me.getMessage()); + break; + + case INVALID_FILTER_EXPRESSION: + je = new InvalidSelectorException(me.getMessage()); + break; + + case NOT_CONNECTED: + je = new JMSException(me.getMessage()); + break; + + case OBJECT_CLOSED: + je = new javax.jms.IllegalStateException(me.getMessage()); + break; + + case QUEUE_DOES_NOT_EXIST: + je = new InvalidDestinationException(me.getMessage()); + break; + + case QUEUE_EXISTS: + je = new InvalidDestinationException(me.getMessage()); + break; + + case SECURITY_EXCEPTION: + je = new JMSSecurityException(me.getMessage()); + break; + + case UNSUPPORTED_PACKET: + je = new javax.jms.IllegalStateException(me.getMessage()); + break; + + case TRANSACTION_ROLLED_BACK: + je = new javax.jms.TransactionRolledBackException(me.getMessage()); + break; + + default: + je = new JMSException(me.getMessage()); + } + + je.setStackTrace(me.getStackTrace()); + + je.initCause(me); + + return je; + } +}