Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 54586 invoked from network); 1 Feb 2009 23:36:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 1 Feb 2009 23:36:28 -0000 Received: (qmail 71687 invoked by uid 500); 1 Feb 2009 23:36:28 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 71630 invoked by uid 500); 1 Feb 2009 23:36:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 71621 invoked by uid 99); 1 Feb 2009 23:36:27 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 01 Feb 2009 15:36:27 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 01 Feb 2009 23:36:21 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E15802388AE9; Sun, 1 Feb 2009 23:35:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r739885 [2/5] - in /activemq/activemq-blaze/trunk/src: main/java/org/apache/activeblaze/ main/java/org/apache/activeblaze/group/ main/java/org/apache/activeblaze/impl/transport/ main/java/org/apache/activeblaze/jms/ main/java/org/apache/act... Date: Sun, 01 Feb 2009 23:35:56 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090201233557.E15802388AE9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import java.util.Enumeration; +import java.util.Vector; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.jms.ConnectionMetaData; + +/** + * A ConnectionMetaData object provides information describing + * the Connection object. + */ + +public final class BlazeJmsConnectionMetaData implements ConnectionMetaData { + + public static final String PROVIDER_VERSION; + public static final int PROVIDER_MAJOR_VERSION; + public static final int PROVIDER_MINOR_VERSION; + + public static final BlazeJmsConnectionMetaData INSTANCE = new BlazeJmsConnectionMetaData(); + + static { + String version = null; + int major = 0; + int minor = 0; + try { + Package p = Package.getPackage("org.apache.activeblaze"); + if (p != null) { + version = p.getImplementationVersion(); + Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*"); + Matcher m = pattern.matcher(version); + if (m.matches()) { + major = Integer.parseInt(m.group(1)); + minor = Integer.parseInt(m.group(2)); + } + } + } catch (Throwable e) { + } + PROVIDER_VERSION = version; + PROVIDER_MAJOR_VERSION = major; + PROVIDER_MINOR_VERSION = minor; + } + + private BlazeJmsConnectionMetaData() { + } + + /** + * Gets the JMS API version. + * + * @return the JMS API version + */ + + public String getJMSVersion() { + return "1.1"; + } + + /** + * Gets the JMS major version number. + * + * @return the JMS API major version number + */ + + public int getJMSMajorVersion() { + return 1; + } + + /** + * Gets the JMS minor version number. + * + * @return the JMS API minor version number + */ + + public int getJMSMinorVersion() { + return 1; + } + + /** + * Gets the JMS provider name. + * + * @return the JMS provider name + */ + + public String getJMSProviderName() { + return "ActiveBlaze"; + } + + /** + * Gets the JMS provider version. + * + * @return the JMS provider version + */ + + public String getProviderVersion() { + return PROVIDER_VERSION; + } + + /** + * Gets the JMS provider major version number. + * + * @return the JMS provider major version number + */ + + public int getProviderMajorVersion() { + return PROVIDER_MAJOR_VERSION; + } + + /** + * Gets the JMS provider minor version number. + * + * @return the JMS provider minor version number + */ + + public int getProviderMinorVersion() { + return PROVIDER_MINOR_VERSION; + } + + /** + * Gets an enumeration of the JMSX property names. + * + * @return an Enumeration of JMSX property names + */ + + public Enumeration getJMSXPropertyNames() { + Vector jmxProperties = new Vector(); + return jmxProperties.elements(); + } + + +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsConnectionMetaData.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import java.io.Externalizable; +import java.util.Map; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; +import org.apache.activeblaze.Destination; +import org.apache.activeblaze.jndi.JNDIStorable; +import org.apache.activemq.protobuf.Buffer; + +/** + * Jms Destination + * + */ +public class BlazeJmsDestination extends JNDIStorable implements Externalizable, javax.jms.Destination, + Comparable { + protected transient Destination destination; + + /** + * Constructor + */ + public BlazeJmsDestination() { + this.destination = new Destination(); + } + + /** + * Constructor + * + * @param dest + */ + public BlazeJmsDestination(Destination dest) { + this.destination = dest; + } + + /** + * Constructor + * + * @param name + */ + public BlazeJmsDestination(String name) { + this.destination.setName(new Buffer(name)); + } + + /** + * @return the destination + */ + public Destination getDestination() { + return this.destination; + } + + /** + * @return name of destination + */ + public String getName() { + return this.destination.getName().toStringUtf8(); + } + + /** + * @return the topic + */ + public boolean isTopic() { + return this.destination.isTopic(); + } + + /** + * @return the temporary + */ + public boolean isTemporary() { + return this.destination.isTemporary(); + } + + /** + * @return true if a Topic + */ + public boolean isQueue() { + return this.destination.isQueue(); + } + + /** + * @param props + * @see org.apache.activeblaze.jndi.JNDIStorable#buildFromProperties(java.util.Properties) + */ + @Override + protected void buildFromProperties(Map props) { + + this.destination.setName(new Buffer(getProperty(props, "name", ""))); + Boolean bool = Boolean.valueOf(getProperty(props,"topic", Boolean.TRUE.toString())); + this.destination.setTopic(bool.booleanValue()); + bool = Boolean.valueOf(getProperty(props,"temporary", Boolean.FALSE.toString())); + this.destination.setTemporary(bool.booleanValue()); + } + + /** + * @param props + * @see org.apache.activeblaze.jndi.JNDIStorable#populateProperties(java.util.Properties) + */ + @Override + protected void populateProperties(Map props) { + props.put("name", this.destination.getName().toStringUtf8()); + props.put("topic", Boolean.toString(this.destination.isTopic())); + props.put("temporary", Boolean.toString(this.destination.isTemporary())); + } + + /** + * + * @param other + * the Object to be compared. + * @return a negative integer, zero, or a positive integer as this object is less than, equal to, or greater than + * the specified object. + * @see java.lang.Comparable#compareTo(java.lang.Object) + */ + public int compareTo(BlazeJmsDestination other) { + if (other != null) { + if (this.destination.isTemporary() == other.destination.isTemporary()) { + return this.destination.getName().toStringUtf8().compareTo(other.destination.getName().toStringUtf8()); + } + return -1; + } + return -1; + } + + /** + * Transform a javax.jms.Destination to a BlazeJmsDestination + * + * @param dest + * @return a BlazeJmsDestination + * @throws JMSException + */ + public static BlazeJmsDestination transform(javax.jms.Destination dest) throws JMSException { + if (dest == null) { + return null; + } + if (dest instanceof BlazeJmsDestination) { + return (BlazeJmsDestination) dest; + } + if (dest instanceof TemporaryQueue) { + return new BlazeJmsTempQueue(((TemporaryQueue) dest).getQueueName()); + } + if (dest instanceof TemporaryTopic) { + return new BlazeJmsTempTopic(((TemporaryTopic) dest).getTopicName()); + } + if (dest instanceof Queue) { + return new BlazeJmsQueue(((Queue) dest).getQueueName()); + } + if (dest instanceof Topic) { + return new BlazeJmsTopic(((Topic) dest).getTopicName()); + } + throw new JMSException("Could not transform the destination into a ActiveMQ destination: " + dest); + } + + /** + * @param dest + * @return a JMS destination + * + */ + public static BlazeJmsDestination createJmsDestination(Destination dest) { + if (dest.isTopic()) { + if (!dest.isTemporary()) { + return new BlazeJmsTopic(dest); + } + return new BlazeJmsTempTopic(dest); + } + if (!dest.isTemporary()) { + return new BlazeJmsQueue(dest); + } + return new BlazeJmsTempQueue(dest); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsDestination.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; + +/** + * Create those nice, old fashioned JMS Exceptions + * + */ +public final class BlazeJmsExceptionSupport { + + private BlazeJmsExceptionSupport() { + } + + public static JMSException create(String msg, Throwable cause) { + JMSException exception = new JMSException(msg); + exception.initCause(cause); + return exception; + } + + public static JMSException create(String msg, Exception cause) { + JMSException exception = new JMSException(msg); + exception.setLinkedException(cause); + exception.initCause(cause); + return exception; + } + + public static JMSException create(Throwable cause) { + if (cause instanceof JMSException) { + return (JMSException)cause; + } + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + JMSException exception = new JMSException(msg); + exception.initCause(cause); + return exception; + } + + public static JMSException create(Exception cause) { + if (cause instanceof JMSException) { + return (JMSException)cause; + } + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + JMSException exception = new JMSException(msg); + exception.setLinkedException(cause); + exception.initCause(cause); + return exception; + } + + public static MessageEOFException createMessageEOFException(Exception cause) { + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + MessageEOFException exception = new MessageEOFException(msg); + exception.setLinkedException(cause); + exception.initCause(cause); + return exception; + } + + public static MessageFormatException createMessageFormatException(Exception cause) { + String msg = cause.getMessage(); + if (msg == null || msg.length() == 0) { + msg = cause.toString(); + } + MessageFormatException exception = new MessageFormatException(msg); + exception.setLinkedException(cause); + exception.initCause(cause); + return exception; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsExceptionSupport.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; + +/** + * implementation of a Jms Message Consumer + * + */ +public class BlazeJmsMessageConsumer implements MessageConsumer { + protected final BlazeJmsSession session; + protected final BlazeJmsDestination destination; + private boolean closed; + private MessageListener messageListener; + private String messageSelector = ""; + + protected BlazeJmsMessageConsumer(BlazeJmsSession s,BlazeJmsDestination destination) { + this.session=s; + this.destination=destination; + } + /** + * @see javax.jms.MessageConsumer#close() + */ + public void close() { + this.closed=true; + this.session.remove(this); + + } + + /** + * @return the MessageListener + * @throws JMSException + * @see javax.jms.MessageConsumer#getMessageListener() + */ + public MessageListener getMessageListener() throws JMSException { + checkClosed(); + return this.messageListener; + } + + /** + * @return the Message Selector + * @throws JMSException + * @see javax.jms.MessageConsumer#getMessageSelector() + */ + public String getMessageSelector() throws JMSException { + checkClosed(); + return this.messageSelector; + } + + /** + * @return a Message or null if closed during the operation + * @throws JMSException + * @see javax.jms.MessageConsumer#receive() + */ + public Message receive() throws JMSException { + checkClosed(); + return null; + } + + /** + * @param timeout + * @return a MEssage or null + * @throws JMSException + * @see javax.jms.MessageConsumer#receive(long) + */ + public Message receive(long timeout) throws JMSException { + checkClosed(); + return null; + } + + /** + * @return a Message or null + * @throws JMSException + * @see javax.jms.MessageConsumer#receiveNoWait() + */ + public Message receiveNoWait() throws JMSException { + checkClosed(); + return null; + } + + /** + * @param listener + * @throws JMSException + * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) + */ + public void setMessageListener(MessageListener listener) throws JMSException { + checkClosed(); + this.messageListener=listener; + + } + + /** + * @param messageSelector the messageSelector to set + * @throws IllegalStateException + */ + public void setMessageSelector(String messageSelector) throws IllegalStateException { + checkClosed(); + this.messageSelector = messageSelector; + } + + protected void checkClosed() throws IllegalStateException { + if (this.closed) { + throw new IllegalStateException("The MessageProducer is closed"); + } + } + +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageConsumer.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; + +/** + * Implementation of a Jms MessageProducer + * + */ +public class BlazeJmsMessageProducer implements MessageProducer { + protected final BlazeJmsSession session; + protected BlazeJmsDestination destination; + protected final boolean flexibleDestination; + protected int deliveryMode = DeliveryMode.NON_PERSISTENT; + protected int priority = Message.DEFAULT_PRIORITY; + protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE; + protected boolean closed; + protected boolean disableMessageId; + protected boolean disableTimestamp; + + protected BlazeJmsMessageProducer(BlazeJmsSession s, BlazeJmsDestination dest) { + this.session = s; + this.destination = dest; + this.flexibleDestination = dest == null; + } + + /** + * Close the producer + * + * @see javax.jms.MessageProducer#close() + */ + public void close() { + this.closed = true; + this.session.remove(this); + } + + /** + * @return the delivery mode + * @throws JMSException + * @see javax.jms.MessageProducer#getDeliveryMode() + */ + public int getDeliveryMode() throws JMSException { + checkClosed(); + return this.deliveryMode; + } + + /** + * @return the destination + * @throws JMSException + * @see javax.jms.MessageProducer#getDestination() + */ + public Destination getDestination() throws JMSException { + checkClosed(); + return this.destination; + } + + /** + * @return true if disableIds is set + * @throws JMSException + * @see javax.jms.MessageProducer#getDisableMessageID() + */ + public boolean getDisableMessageID() throws JMSException { + checkClosed(); + return this.disableMessageId; + } + + /** + * @return true if disable timestamp is set + * @throws JMSException + * @see javax.jms.MessageProducer#getDisableMessageTimestamp() + */ + public boolean getDisableMessageTimestamp() throws JMSException { + checkClosed(); + return this.disableTimestamp; + } + + /** + * @return the priority + * @throws JMSException + * @see javax.jms.MessageProducer#getPriority() + */ + public int getPriority() throws JMSException { + checkClosed(); + return this.priority; + } + + /** + * @return timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#getTimeToLive() + */ + public long getTimeToLive() throws JMSException { + checkClosed(); + return this.timeToLive; + } + + /** + * @param message + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Message) + */ + public void send(Message message) throws JMSException { + send(this.destination, message, this.deliveryMode, this.priority, this.timeToLive); + } + + /** + * @param destination + * @param message + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Destination, javax.jms.Message) + */ + public void send(Destination destination, Message message) throws JMSException { + send(destination, message, this.deliveryMode, this.priority, this.timeToLive); + } + + /** + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Message, int, int, long) + */ + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + send(this.destination, message, deliveryMode, priority, timeToLive); + } + + /** + * @param destination + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#send(javax.jms.Destination, javax.jms.Message, int, int, long) + */ + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException { + setDestination(destination); + this.session.send(destination, message, deliveryMode, priority, timeToLive); + } + + /** + * @param deliveryMode + * @throws JMSException + * @see javax.jms.MessageProducer#setDeliveryMode(int) + */ + public void setDeliveryMode(int deliveryMode) throws JMSException { + checkClosed(); + this.deliveryMode = deliveryMode; + } + + /** + * @param value + * @throws JMSException + * @see javax.jms.MessageProducer#setDisableMessageID(boolean) + */ + public void setDisableMessageID(boolean value) throws JMSException { + checkClosed(); + this.disableMessageId = value; + } + + /** + * @param value + * @throws JMSException + * @see javax.jms.MessageProducer#setDisableMessageTimestamp(boolean) + */ + public void setDisableMessageTimestamp(boolean value) throws JMSException { + checkClosed(); + this.disableTimestamp = value; + } + + /** + * @param defaultPriority + * @throws JMSException + * @see javax.jms.MessageProducer#setPriority(int) + */ + public void setPriority(int defaultPriority) throws JMSException { + checkClosed(); + this.priority = defaultPriority; + } + + /** + * @param timeToLive + * @throws JMSException + * @see javax.jms.MessageProducer#setTimeToLive(long) + */ + public void setTimeToLive(long timeToLive) throws JMSException { + checkClosed(); + this.timeToLive = timeToLive; + } + + /** + * @param dest + * the destination to set + * @throws JMSException + * @throws InvalidDestinationException + */ + public void setDestination(Destination dest) throws JMSException { + BlazeJmsDestination destination = BlazeJmsDestination.transform(dest); + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + if (!this.flexibleDestination && !destination.equals(this.destination)) { + throw new UnsupportedOperationException("This producer can only send messages to: " + + this.destination.getName()); + } + this.destination = destination; + } + + protected void checkClosed() throws IllegalStateException { + if (this.closed) { + throw new IllegalStateException("The MessageProducer is closed"); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageProducer.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import java.util.Enumeration; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage; +import org.apache.activeblaze.jms.message.BlazeJmsMapMessage; +import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage; +import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage; +import org.apache.activeblaze.jms.message.BlazeJmsTextMessage; + +/** + * A helper class for converting normal JMS interfaces into ActiveMQ specific ones. + * + * @version $Revision: 1.1 $ + */ +public final class BlazeJmsMessageTransformation { + private BlazeJmsMessageTransformation() { + } + + /** + * @param dest + * @return a BlazeJmsDestination + * @throws JMSException + */ + private static BlazeJmsDestination transformDestination(Destination dest) throws JMSException { + return BlazeJmsDestination.transform(dest); + } + + /** + * @param message + * @return a BlazeJmsDestination + * @throws JMSException + */ + public static BlazeJmsMessage transformMessage(Message message) throws JMSException { + if (message instanceof BlazeJmsMessage) { + return (BlazeJmsMessage) message; + } + BlazeJmsMessage transformedMessage = null; + if (message instanceof BytesMessage) { + BytesMessage bytesMsg = (BytesMessage) message; + bytesMsg.reset(); + BlazeJmsBytesMessage msg = new BlazeJmsBytesMessage(); + try { + for (;;) { + // Reads a byte from the message stream until the stream + // is empty + msg.writeByte(bytesMsg.readByte()); + } + } catch (MessageEOFException e) { + // if an end of message stream as expected + } catch (JMSException e) { + } + transformedMessage = msg; + } else if (message instanceof MapMessage) { + MapMessage mapMsg = (MapMessage) message; + BlazeJmsMapMessage msg = new BlazeJmsMapMessage(); + Enumeration iter = mapMsg.getMapNames(); + while (iter.hasMoreElements()) { + String name = iter.nextElement().toString(); + msg.setObject(name, mapMsg.getObject(name)); + } + transformedMessage = msg; + } else if (message instanceof ObjectMessage) { + ObjectMessage objMsg = (ObjectMessage) message; + BlazeJmsObjectMessage msg = new BlazeJmsObjectMessage(); + msg.setObject(objMsg.getObject()); + msg.storeContent(); + transformedMessage = msg; + } else if (message instanceof StreamMessage) { + StreamMessage streamMessage = (StreamMessage) message; + streamMessage.reset(); + BlazeJmsStreamMessage msg = new BlazeJmsStreamMessage(); + Object obj = null; + try { + while ((obj = streamMessage.readObject()) != null) { + msg.writeObject(obj); + } + } catch (MessageEOFException e) { + // if an end of message stream as expected + } catch (JMSException e) { + } + transformedMessage = msg; + } else if (message instanceof TextMessage) { + TextMessage textMsg = (TextMessage) message; + BlazeJmsTextMessage msg = new BlazeJmsTextMessage(); + msg.setText(textMsg.getText()); + transformedMessage = msg; + } else { + transformedMessage = new BlazeJmsMessage(); + } + copyProperties(message, transformedMessage); + return transformedMessage; + } + + /** + * Copies the standard JMS and user defined properties from the givem message to the specified message + * + * @param fromMessage + * the message to take the properties from + * @param toMessage + * the message to add the properties to + * @throws JMSException + */ + public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException { + toMessage.setJMSMessageID(fromMessage.getJMSMessageID()); + toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID()); + toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo())); + toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination())); + toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode()); + toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered()); + toMessage.setJMSType(fromMessage.getJMSType()); + toMessage.setJMSExpiration(fromMessage.getJMSExpiration()); + toMessage.setJMSPriority(fromMessage.getJMSPriority()); + toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp()); + Enumeration propertyNames = fromMessage.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + String name = propertyNames.nextElement().toString(); + Object obj = fromMessage.getObjectProperty(name); + toMessage.setObjectProperty(name, obj); + } + } + + +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsMessageTransformation.java ------------------------------------------------------------------------------ svn:executable = * Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.Queue; +import org.apache.activeblaze.Destination; + +/** + * Queue implementation + * + */ +public class BlazeJmsQueue extends BlazeJmsDestination implements Queue { + /** + * Constructor + */ + public BlazeJmsQueue(){ + this(""); + } + + /** + * Constructor + * @param dest + */ + public BlazeJmsQueue(Destination dest) { + super(dest); + } + + /** + * Constructor + * @param name + */ + public BlazeJmsQueue(String name){ + super(name); + this.destination.setTopic(false); + } + + /** + * @return name + * @see javax.jms.Queue#getQueueName() + */ + public String getQueueName() { + return getName(); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.IllegalStateException; +import javax.jms.Queue; +import javax.jms.QueueReceiver; + +/** + * Implementation of a Jms QueueReceiver + * + */ +public class BlazeJmsQueueReceiver extends BlazeJmsMessageConsumer implements QueueReceiver { + + /** + * Constructor + * @param s + */ + protected BlazeJmsQueueReceiver(BlazeJmsSession s,BlazeJmsDestination d) { + super(s,d); + } + + /** + * @return the Queue + * @throws IllegalStateException + * @see javax.jms.QueueReceiver#getQueue() + */ + public Queue getQueue() throws IllegalStateException { + checkClosed(); + return (Queue) this.destination; + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueReceiver.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueSender; + +/** + * Implementation of a Queue Sender + * + */ +public class BlazeJmsQueueSender extends BlazeJmsMessageProducer implements QueueSender { + /** + * Constructor + * + * @param s + * @param dest + */ + protected BlazeJmsQueueSender(BlazeJmsSession s, BlazeJmsDestination dest) { + super(s, dest); + } + + /** + * @return the Queue + * @throws IllegalStateException + * @see javax.jms.QueueSender#getQueue() + */ + public Queue getQueue() throws IllegalStateException { + checkClosed(); + return (Queue) this.destination; + } + + /** + * @param queue + * @param message + * @throws JMSException + * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message) + */ + public void send(Queue queue, Message message) throws JMSException { + super.send(queue, message); + } + + /** + * @param queue + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.QueueSender#send(javax.jms.Queue, javax.jms.Message, int, int, long) + */ + public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + super.send(message, deliveryMode, priority, timeToLive); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSender.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSubscriber; + +/** + * Jms QueueSession implementation + * + */ +public class BlazeJmsQueueSession extends BlazeJmsSession { + /** + * Constructor + * @param connection + * @param acknowledgementMode + */ + protected BlazeJmsQueueSession(BlazeJmsConnection connection, int acknowledgementMode) { + super(connection, acknowledgementMode); + } + + public MessageConsumer createConsumer(Destination destination) throws JMSException { + if (destination instanceof Topic) { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + return super.createConsumer(destination); + } + + /** + * @param destination + * @param messageSelector + * @return + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String) + */ + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + if (destination instanceof Topic) { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + return super.createConsumer(destination, messageSelector); + } + + /** + * @param destination + * @param messageSelector + * @param NoLocal + * @return + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String, boolean) + */ + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) + throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @param name + * @return + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String) + */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @param name + * @param messageSelector + * @param noLocal + * @return + * @throws IllegalStateException + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String, java.lang.String, boolean) + */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws IllegalStateException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param destination + * @return + * @throws JMSException + * @see javax.jms.Session#createProducer(javax.jms.Destination) + */ + public MessageProducer createProducer(Destination destination) throws JMSException { + if (destination instanceof Topic) { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + return super.createProducer(destination); + } + + /** + * @return + * @throws JMSException + * @see javax.jms.Session#createTemporaryTopic() + */ + public TemporaryTopic createTemporaryTopic() throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topicName + * @return + * @throws JMSException + * @see javax.jms.Session#createTopic(java.lang.String) + */ + public Topic createTopic(String topicName) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param name + * @throws JMSException + * @see javax.jms.Session#unsubscribe(java.lang.String) + */ + public void unsubscribe(String name) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @return + * @throws JMSException + * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic) + */ + public TopicPublisher createPublisher(Topic topic) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @return + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic) + */ + public TopicSubscriber createSubscriber(Topic topic) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } + + /** + * @param topic + * @param messageSelector + * @param noLocal + * @return + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, java.lang.String, boolean) + */ + public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { + throw new IllegalStateException("Operation not supported by a QueueSession"); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsQueueSession.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import org.apache.activeblaze.jms.message.BlazeJmsBytesMessage; +import org.apache.activeblaze.jms.message.BlazeJmsMapMessage; +import org.apache.activeblaze.jms.message.BlazeJmsMessage; +import org.apache.activeblaze.jms.message.BlazeJmsObjectMessage; +import org.apache.activeblaze.jms.message.BlazeJmsStreamMessage; +import org.apache.activeblaze.jms.message.BlazeJmsTextMessage; + +/** + * JMS Session implementation + * + */ +public class BlazeJmsSession implements Session, QueueSession, TopicSession { + private final BlazeJmsConnection connection; + private final int acknowledgementMode; + private final List producers = new CopyOnWriteArrayList(); + private final List consumers = new CopyOnWriteArrayList(); + private MessageListener messageListener; + private boolean closed; + + /** + * Constructor + * + * @param connection + * @param acknowledgementMode + */ + protected BlazeJmsSession(BlazeJmsConnection connection, int acknowledgementMode) { + this.connection = connection; + this.acknowledgementMode = acknowledgementMode; + } + + /** + * @throws JMSException + * @see javax.jms.Session#close() + */ + public void close() throws JMSException { + this.closed = true; + this.connection.removeSession(this); + for (MessageConsumer c : this.consumers) { + c.close(); + } + this.consumers.clear(); + } + + /** + * @throws JMSException + * @see javax.jms.Session#commit() + */ + public void commit() throws JMSException { + checkClosed(); + } + + /** + * @param queue + * @return QueueBrowser + * @throws JMSException + * @see javax.jms.Session#createBrowser(javax.jms.Queue) + */ + public QueueBrowser createBrowser(Queue queue) throws JMSException { + checkClosed(); + return null; + } + + /** + * @param queue + * @param messageSelector + * @return QueueBrowser + * @throws JMSException + * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String) + */ + public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { + checkClosed(); + return null; + } + + /** + * @return BytesMessage + * @throws IllegalStateException + * @see javax.jms.Session#createBytesMessage() + */ + public BytesMessage createBytesMessage() throws IllegalStateException { + checkClosed(); + return new BlazeJmsBytesMessage(); + } + + /** + * @param destination + * @return a MessageConsumer + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination) + */ + public MessageConsumer createConsumer(Destination destination) throws JMSException { + checkClosed(); + return new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination)); + } + + /** + * @param destination + * @param messageSelector + * @return MessageConsumer + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String) + */ + public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { + checkClosed(); + BlazeJmsMessageConsumer result = new BlazeJmsMessageConsumer(this, BlazeJmsDestination.transform(destination)); + result.setMessageSelector(messageSelector); + return result; + } + + /** + * @param destination + * @param messageSelector + * @param NoLocal + * @return the MessageConsumer + * @throws JMSException + * @see javax.jms.Session#createConsumer(javax.jms.Destination, java.lang.String, boolean) + */ + public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) + throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(destination); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, NoLocal); + result.setMessageSelector(messageSelector); + return result; + } + + /** + * @param topic + * @param name + * @return a TopicSubscriber + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String) + */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, false); + return result; + } + + /** + * @param topic + * @param name + * @param messageSelector + * @param noLocal + * @return TopicSubscriber + * @throws JMSException + * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic, java.lang.String, java.lang.String, boolean) + */ + public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) + throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", true, noLocal); + result.setMessageSelector(messageSelector); + return result; + } + + /** + * @return MapMessage + * @throws IllegalStateException + * @see javax.jms.Session#createMapMessage() + */ + public MapMessage createMapMessage() throws IllegalStateException { + checkClosed(); + return new BlazeJmsMapMessage(); + } + + /** + * @return Message + * @throws IllegalStateException + * @see javax.jms.Session#createMessage() + */ + public Message createMessage() throws IllegalStateException { + checkClosed(); + return new BlazeJmsMessage(); + } + + /** + * @return ObjectMessage + * @throws IllegalStateException + * @see javax.jms.Session#createObjectMessage() + */ + public ObjectMessage createObjectMessage() throws IllegalStateException { + checkClosed(); + return new BlazeJmsObjectMessage(); + } + + /** + * @param object + * @return ObjectMessage + * @throws JMSException + * @see javax.jms.Session#createObjectMessage(java.io.Serializable) + */ + public ObjectMessage createObjectMessage(Serializable object) throws JMSException { + checkClosed(); + ObjectMessage result = createObjectMessage(); + result.setObject(object); + return result; + } + + /** + * @param destination + * @return MessageProducer + * @throws JMSException + * @see javax.jms.Session#createProducer(javax.jms.Destination) + */ + public MessageProducer createProducer(Destination destination) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(destination); + BlazeJmsMessageProducer result = new BlazeJmsMessageProducer(this, dest); + return result; + } + + /** + * @param queueName + * @return Queue + * @throws JMSException + * @see javax.jms.Session#createQueue(java.lang.String) + */ + public Queue createQueue(String queueName) throws JMSException { + checkClosed(); + return new BlazeJmsQueue(queueName); + } + + /** + * @return StreamMessage + * @throws JMSException + * @see javax.jms.Session#createStreamMessage() + */ + public StreamMessage createStreamMessage() throws JMSException { + checkClosed(); + return new BlazeJmsStreamMessage(); + } + + /** + * @return TemporaryQueue + * @throws JMSException + * @see javax.jms.Session#createTemporaryQueue() + */ + public TemporaryQueue createTemporaryQueue() throws JMSException { + checkClosed(); + return new BlazeJmsTempQueue(this.connection.tempDestinationGenerator.generateId()); + } + + /** + * @return TemporaryTopic + * @throws JMSException + * @see javax.jms.Session#createTemporaryTopic() + */ + public TemporaryTopic createTemporaryTopic() throws JMSException { + checkClosed(); + return new BlazeJmsTempTopic(this.connection.tempDestinationGenerator.generateId()); + } + + /** + * @return TextMessage + * @throws JMSException + * @see javax.jms.Session#createTextMessage() + */ + public TextMessage createTextMessage() throws JMSException { + checkClosed(); + return new BlazeJmsTextMessage(); + } + + /** + * @param text + * @return TextMessage + * @throws JMSException + * @see javax.jms.Session#createTextMessage(java.lang.String) + */ + public TextMessage createTextMessage(String text) throws JMSException { + checkClosed(); + BlazeJmsTextMessage result = new BlazeJmsTextMessage(); + result.setText(text); + return result; + } + + /** + * @param topicName + * @return Topic + * @throws JMSException + * @see javax.jms.Session#createTopic(java.lang.String) + */ + public Topic createTopic(String topicName) throws JMSException { + checkClosed(); + return new BlazeJmsTopic(topicName); + } + + /** + * @return acknowledgeMode + * @throws JMSException + * @see javax.jms.Session#getAcknowledgeMode() + */ + public int getAcknowledgeMode() throws JMSException { + checkClosed(); + return this.acknowledgementMode; + } + + /** + * @return + * @throws JMSException + * @see javax.jms.Session#getMessageListener() + */ + public MessageListener getMessageListener() throws JMSException { + checkClosed(); + return this.messageListener; + } + + /** + * @return + * @throws JMSException + * @see javax.jms.Session#getTransacted() + */ + public boolean getTransacted() throws JMSException { + checkClosed(); + return this.acknowledgementMode == Session.SESSION_TRANSACTED; + } + + /** + * @throws JMSException + * @see javax.jms.Session#recover() + */ + public void recover() throws JMSException { + checkClosed(); + } + + /** + * @throws JMSException + * @see javax.jms.Session#rollback() + */ + public void rollback() throws JMSException { + checkClosed(); + } + + /** + * + * @see javax.jms.Session#run() + */ + public void run() { + // TODO Auto-generated method stub + } + + /** + * @param listener + * @throws JMSException + * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) + */ + public void setMessageListener(MessageListener listener) throws JMSException { + checkClosed(); + this.messageListener = listener; + } + + /** + * @param name + * @throws JMSException + * @see javax.jms.Session#unsubscribe(java.lang.String) + */ + public void unsubscribe(String name) throws JMSException { + checkClosed(); + } + + /** + * @param queue + * @return + * @throws JMSException + * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue) + */ + public QueueReceiver createReceiver(Queue queue) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(queue); + BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest); + return result; + } + + /** + * @param queue + * @param messageSelector + * @return QueueReceiver + * @throws JMSException + * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue, java.lang.String) + */ + public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(queue); + BlazeJmsQueueReceiver result = new BlazeJmsQueueReceiver(this, dest); + result.setMessageSelector(messageSelector); + return result; + } + + /** + * @param queue + * @return QueueSender + * @throws JMSException + * @see javax.jms.QueueSession#createSender(javax.jms.Queue) + */ + public QueueSender createSender(Queue queue) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(queue); + BlazeJmsQueueSender result = new BlazeJmsQueueSender(this, dest); + return result; + } + + /** + * @param topic + * @return TopicPublisher + * @throws JMSException + * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic) + */ + public TopicPublisher createPublisher(Topic topic) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); + BlazeJmsTopicPublisher result = new BlazeJmsTopicPublisher(this, dest); + return result; + } + + /** + * @param topic + * @return TopicSubscriber + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic) + */ + public TopicSubscriber createSubscriber(Topic topic) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, false); + return result; + } + + /** + * @param topic + * @param messageSelector + * @param noLocal + * @return TopicSubscriber + * @throws JMSException + * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic, java.lang.String, boolean) + */ + public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { + checkClosed(); + BlazeJmsDestination dest = BlazeJmsDestination.transform(topic); + BlazeJmsTopicSubscriber result = new BlazeJmsTopicSubscriber(this, dest, "", false, noLocal); + result.setMessageSelector(messageSelector); + return result; + } + + protected void remove(MessageConsumer consumer) { + this.consumers.remove(consumer); + } + + protected void remove(MessageProducer producer) { + this.producers.remove(producer); + } + + protected void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) + throws JMSException { + BlazeJmsDestination destination = BlazeJmsDestination.transform(dest); + BlazeJmsMessage message = BlazeJmsMessageTransformation.transformMessage(msg); + send(destination, message, deliveryMode, priority, timeToLive); + } + + private void send(BlazeJmsDestination destination, BlazeJmsMessage message, int deliveryMode, int priority, + long timeToLive) throws JMSException { + message.setJMSDestination(destination); + message.setJMSDeliveryMode(deliveryMode); + message.setJMSPriority(priority); + if (timeToLive > 0) { + long timeStamp = System.currentTimeMillis(); + message.setTimeStamp(timeStamp); + message.setExpiration(System.currentTimeMillis() + timeToLive); + } + try { + if (destination.isTopic()) { + this.connection.channel.send(destination.getName(), message); + } else { + this.connection.channel.broadcast(destination.getName(), message); + } + } catch (Exception e) { + throw BlazeJmsExceptionSupport.create(e); + } + } + + protected void checkClosed() throws IllegalStateException { + if (this.closed) { + throw new IllegalStateException("The MessageProducer is closed"); + } + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsSession.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import org.apache.activeblaze.Destination; + +/** + * Temporary Destination + * + */ +public class BlazeJmsTempDestination extends BlazeJmsDestination{ + + /** + * Constructor + */ + public BlazeJmsTempDestination(){ + this(""); + } + + /** + * Constructor + * @param dest + */ + public BlazeJmsTempDestination(Destination dest) { + super(dest); + } + + /** + * Constructor + * @param name + */ + public BlazeJmsTempDestination(String name){ + super(name); + this.destination.setTemporary(true); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempDestination.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.JMSException; +import javax.jms.TemporaryQueue; +import org.apache.activeblaze.Destination; + +/** + * TemporaryQueue + * + */ +public class BlazeJmsTempQueue extends BlazeJmsTempDestination implements TemporaryQueue { + /** + * Constructor + */ + public BlazeJmsTempQueue(){ + this(""); + } + + /** + * Constructor + * @param dest + */ + public BlazeJmsTempQueue(Destination dest) { + super(dest); + } + + /** + * Constructor + * @param name + */ + public BlazeJmsTempQueue(String name){ + super(name); + this.destination.setTopic(false); + } + + /** + * @see javax.jms.TemporaryQueue#delete() + */ + public void delete() { + // TODO Auto-generated method stub + + } + + /** + * @return name + * @see javax.jms.Queue#getQueueName() + */ + public String getQueueName() { + return getName(); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.JMSException; +import javax.jms.TemporaryTopic; +import org.apache.activeblaze.Destination; + +/** + * Temporary Topic + * + */ +public class BlazeJmsTempTopic extends BlazeJmsTempDestination implements TemporaryTopic{ + /** + * Constructor + */ + public BlazeJmsTempTopic(){ + this(""); + } + + /** + * Constructor + * @param dest + */ + public BlazeJmsTempTopic(Destination dest) { + super(dest); + } + + /** + * Constructor + * @param name + */ + public BlazeJmsTempTopic(String name){ + super(name); + this.destination.setTopic(true); + } + + /** + * @see javax.jms.TemporaryTopic#delete() + */ + public void delete() { + // TODO Auto-generated method stub + + } + + /** + * @return name + * @see javax.jms.Topic#getTopicName() + */ + public String getTopicName() { + return getName(); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTempTopic.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.Topic; +import org.apache.activeblaze.Destination; + +/** + * TemporaryQueue + * + */ +public class BlazeJmsTopic extends BlazeJmsDestination implements Topic { + /** + * Constructor + */ + public BlazeJmsTopic(){ + this(""); + } + + /** + * Constructor + * @param dest + */ + public BlazeJmsTopic(Destination dest) { + super(dest); + } + + /** + * Constructor + * @param name + */ + public BlazeJmsTopic(String name){ + super(name); + this.destination.setTopic(true); + } + + /** + * @return the name + * @see javax.jms.Topic#getTopicName() + */ + public String getTopicName() { + return getName(); + } +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopic.java ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java URL: http://svn.apache.org/viewvc/activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java?rev=739885&view=auto ============================================================================== --- activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java (added) +++ activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java Sun Feb 1 23:35:54 2009 @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activeblaze.jms; + +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Topic; +import javax.jms.TopicPublisher; + +/** + * Implementation of a TopicPublisher + * + */ +public class BlazeJmsTopicPublisher extends BlazeJmsMessageProducer implements TopicPublisher { + + /** + * Constructor + * @param s + * @param destination + */ + protected BlazeJmsTopicPublisher(BlazeJmsSession s, BlazeJmsDestination destination) { + super(s, destination); + } + + /** + * @return the Topic + * @throws IllegalStateException + * @see javax.jms.TopicPublisher#getTopic() + */ + public Topic getTopic() throws IllegalStateException { + checkClosed(); + return (Topic) this.destination; + } + + /** + * @param message + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Message) + */ + public void publish(Message message) throws JMSException { + super.send(message); + + } + + /** + * @param topic + * @param message + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message) + */ + public void publish(Topic topic, Message message) throws JMSException { + super.send(topic,message); + + } + + /** + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Message, int, int, long) + */ + public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + super.send(message, deliveryMode, priority, timeToLive); + + } + + /** + * @param topic + * @param message + * @param deliveryMode + * @param priority + * @param timeToLive + * @throws JMSException + * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message, int, int, long) + */ + public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) + throws JMSException { + super.send(topic, message, deliveryMode, priority, timeToLive); + + } + + +} Propchange: activemq/activemq-blaze/trunk/src/main/java/org/apache/activeblaze/jms/BlazeJmsTopicPublisher.java ------------------------------------------------------------------------------ svn:eol-style = native