activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r554315 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnectionFactory.java ActiveMQMessageProducer.java ActiveMQMessageProducerSupport.java ActiveMQSession.java CustomDestination.java
Date Sun, 08 Jul 2007 05:39:01 GMT
Author: jstrachan
Date: Sat Jul  7 22:39:00 2007
New Revision: 554315

URL: http://svn.apache.org/viewvc?view=rev&rev=554315
Log:
Added support for custom destinations; for example to allow Camel endpoints to be used from
within the JMS client

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=554315&r1=554314&r2=554315
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Sat Jul  7 22:39:00 2007
@@ -251,24 +251,9 @@
         	
             connection.setUserName(userName);
             connection.setPassword(password);
-            connection.setPrefetchPolicy(getPrefetchPolicy());
-            connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
-            connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
-            connection.setCopyMessageOnSend(isCopyMessageOnSend());
-            connection.setUseCompression(isUseCompression());
-            connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
-            connection.setDispatchAsync(isDispatchAsync());
-            connection.setUseAsyncSend(isUseAsyncSend());
-            connection.setAlwaysSyncSend(isAlwaysSyncSend());
-            connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
-            connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
-            connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
-            connection.setRedeliveryPolicy(getRedeliveryPolicy());
-            connection.setTransformer(getTransformer());
-            connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
-            connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
-            connection.setProducerWindowSize(getProducerWindowSize());
-            connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
+
+            configureConnection(connection);
+
             transport.start();
 
             if( clientID !=null )
@@ -291,6 +276,28 @@
     protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl
stats) throws Exception {
         ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
stats);
         return connection;
+    }
+
+
+    protected void configureConnection(ActiveMQConnection connection) {
+        connection.setPrefetchPolicy(getPrefetchPolicy());
+        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
+        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
+        connection.setCopyMessageOnSend(isCopyMessageOnSend());
+        connection.setUseCompression(isUseCompression());
+        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
+        connection.setDispatchAsync(isDispatchAsync());
+        connection.setUseAsyncSend(isUseAsyncSend());
+        connection.setAlwaysSyncSend(isAlwaysSyncSend());
+        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
+        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
+        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
+        connection.setRedeliveryPolicy(getRedeliveryPolicy());
+        connection.setTransformer(getTransformer());
+        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
+        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
+        connection.setProducerWindowSize(getProducerWindowSize());
+        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
     }
 
     // /////////////////////////////////////////////

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=554315&r1=554314&r2=554315
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
Sat Jul  7 22:39:00 2007
@@ -17,18 +17,6 @@
  */
 package org.apache.activemq;
 
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageProducer;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
@@ -39,6 +27,14 @@
 import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.util.IntrospectionSupport;
 
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * A client uses a <CODE>MessageProducer</CODE> object to send messages to a
  * destination. A <CODE>MessageProducer</CODE> object is created by passing a
@@ -72,26 +68,20 @@
  * @see javax.jms.QueueSender
  * @see javax.jms.Session#createProducer
  */
-public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, Closeable,
Disposable {
+public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable,
Disposable {
 
-    protected ActiveMQSession session;
     protected ProducerInfo info;
     private JMSProducerStatsImpl stats;
     private AtomicLong messageSequence;
 
     protected boolean closed;
-    private boolean disableMessageID;
-    private boolean disableMessageTimestamp;
-    private int defaultDeliveryMode;
-    private int defaultPriority;
-    private long defaultTimeToLive;
     private long startTime;
     private MessageTransformer transformer;
     private UsageManager producerWindow;
 
     protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination
destination)
             throws JMSException {
-        this.session = session;
+        super(session);
         this.info = new ProducerInfo(producerId);
         this.info.setWindowSize(session.connection.getProducerWindowSize());        
         if (destination!=null && destination.getOptions() != null) {
@@ -105,9 +95,7 @@
         	producerWindow = new UsageManager("Producer Window: "+producerId);
         	producerWindow.setLimit(this.info.getWindowSize());
         }
-        
-        this.disableMessageID = false;
-        this.disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
+
         this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
         this.defaultPriority = Message.DEFAULT_PRIORITY;
         this.defaultTimeToLive = Message.DEFAULT_TIME_TO_LIVE;
@@ -128,183 +116,6 @@
     }
 
     /**
-     * Sets whether message IDs are disabled.
-     * <P>
-     * Since message IDs take some effort to create and increase a message's
-     * size, some JMS providers may be able to optimize message overhead if
-     * they are given a hint that the message ID is not used by an application.
-     * By calling the <CODE>setDisableMessageID</CODE> method on this message
-     * producer, a JMS client enables this potential optimization for all
-     * messages sent by this message producer. If the JMS provider accepts this
-     * hint, these messages must have the message ID set to null; if the
-     * provider ignores the hint, the message ID must be set to its normal
-     * unique value.
-     * <P>
-     * Message IDs are enabled by default.
-     *
-     * @param value indicates if message IDs are disabled
-     * @throws JMSException if the JMS provider fails to close the producer due to
-     *                      some internal error.
-     */
-    public void setDisableMessageID(boolean value) throws JMSException {
-        checkClosed();
-        this.disableMessageID = value;
-    }
-
-    /**
-     * Gets an indication of whether message IDs are disabled.
-     *
-     * @return an indication of whether message IDs are disabled
-     * @throws JMSException if the JMS provider fails to determine if message IDs are
-     *                      disabled due to some internal error.
-     */
-    public boolean getDisableMessageID() throws JMSException {
-        checkClosed();
-        return this.disableMessageID;
-    }
-
-    /**
-     * Sets whether message timestamps are disabled.
-     * <P>
-     * Since timestamps take some effort to create and increase a message's
-     * size, some JMS providers may be able to optimize message overhead if
-     * they are given a hint that the timestamp is not used by an application.
-     * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
-     * message producer, a JMS client enables this potential optimization for
-     * all messages sent by this message producer. If the JMS provider accepts
-     * this hint, these messages must have the timestamp set to zero; if the
-     * provider ignores the hint, the timestamp must be set to its normal
-     * value.
-     * <P>
-     * Message timestamps are enabled by default.
-     *
-     * @param value indicates if message timestamps are disabled
-     * @throws JMSException if the JMS provider fails to close the producer due to
-     *                      some internal error.
-     */
-    public void setDisableMessageTimestamp(boolean value) throws JMSException {
-        checkClosed();
-        this.disableMessageTimestamp = value;
-    }
-
-    /**
-     * Gets an indication of whether message timestamps are disabled.
-     *
-     * @return an indication of whether message timestamps are disabled
-     * @throws JMSException if the JMS provider fails to close the producer due to
-     *                      some internal error.
-     */
-    public boolean getDisableMessageTimestamp() throws JMSException {
-        checkClosed();
-        return this.disableMessageTimestamp;
-    }
-
-    /**
-     * Sets the producer's default delivery mode.
-     * <P>
-     * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
-     *
-     * @param newDeliveryMode the message delivery mode for this message producer; legal
-     *                        values are <code>DeliveryMode.NON_PERSISTENT</code>
and
-     *                        <code>DeliveryMode.PERSISTENT</code>
-     * @throws JMSException if the JMS provider fails to set the delivery mode due to
-     *                      some internal error.
-     * @see javax.jms.MessageProducer#getDeliveryMode
-     * @see javax.jms.DeliveryMode#NON_PERSISTENT
-     * @see javax.jms.DeliveryMode#PERSISTENT
-     * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
-     */
-    public void setDeliveryMode(int newDeliveryMode) throws JMSException {
-        if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT)
{
-            throw new IllegalStateException("unkown delivery mode: " + newDeliveryMode);
-        }
-        checkClosed();
-        this.defaultDeliveryMode = newDeliveryMode;
-    }
-
-    /**
-     * Gets the producer's default delivery mode.
-     *
-     * @return the message delivery mode for this message producer
-     * @throws JMSException if the JMS provider fails to close the producer due to
-     *                      some internal error.
-     */
-    public int getDeliveryMode() throws JMSException {
-        checkClosed();
-        return this.defaultDeliveryMode;
-    }
-
-    /**
-     * Sets the producer's default priority.
-     * <P>
-     * The JMS API defines ten levels of priority value, with 0 as the lowest
-     * priority and 9 as the highest. Clients should consider priorities 0-4 as
-     * gradations of normal priority and priorities 5-9 as gradations of
-     * expedited priority. Priority is set to 4 by default.
-     *
-     * @param newDefaultPriority the message priority for this message producer; must be
a
-     *                           value between 0 and 9
-     * @throws JMSException if the JMS provider fails to set the delivery mode due to
-     *                      some internal error.
-     * @see javax.jms.MessageProducer#getPriority
-     * @see javax.jms.Message#DEFAULT_PRIORITY
-     */
-    public void setPriority(int newDefaultPriority) throws JMSException {
-        if (newDefaultPriority < 0 || newDefaultPriority > 9) {
-            throw new IllegalStateException("default priority must be a value between 0 and
9");
-        }
-        checkClosed();
-        this.defaultPriority = newDefaultPriority;
-    }
-
-    /**
-     * Gets the producer's default priority.
-     *
-     * @return the message priority for this message producer
-     * @throws JMSException if the JMS provider fails to close the producer due to
-     *                      some internal error.
-     * @see javax.jms.MessageProducer#setPriority
-     */
-    public int getPriority() throws JMSException {
-        checkClosed();
-        return this.defaultPriority;
-    }
-
-    /**
-     * Sets the default length of time in milliseconds from its dispatch time
-     * that a produced message should be retained by the message system.
-     * <P>
-     * Time to live is set to zero by default.
-     *
-     * @param timeToLive the message time to live in milliseconds; zero is unlimited
-     * @throws JMSException if the JMS provider fails to set the time to live due to
-     *                      some internal error.
-     * @see javax.jms.MessageProducer#getTimeToLive
-     * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
-     */
-    public void setTimeToLive(long timeToLive) throws JMSException {
-        if (timeToLive < 0l) {
-            throw new IllegalStateException("cannot set a negative timeToLive");
-        }
-        checkClosed();
-        this.defaultTimeToLive = timeToLive;
-    }
-
-    /**
-     * Gets the default length of time in milliseconds from its dispatch time
-     * that a produced message should be retained by the message system.
-     *
-     * @return the message time to live in milliseconds; zero is unlimited
-     * @throws JMSException if the JMS provider fails to get the time to live due to
-     *                      some internal error.
-     * @see javax.jms.MessageProducer#setTimeToLive
-     */
-    public long getTimeToLive() throws JMSException {
-        checkClosed();
-        return this.defaultTimeToLive;
-    }
-
-    /**
      * Gets the destination associated with this <CODE>MessageProducer</CODE>.
      *
      * @return this producer's <CODE>Destination/ <CODE>
@@ -351,91 +162,6 @@
         if (closed) {
             throw new IllegalStateException("The producer is closed");
         }
-    }
-
-    /**
-     * Sends a message using the <CODE>MessageProducer</CODE>'s default
-     * delivery mode, priority, and time to live.
-     *
-     * @param message the message to send
-     * @throws JMSException                if the JMS provider fails to send the message
due to some
-     *                                     internal error.
-     * @throws MessageFormatException      if an invalid message is specified.
-     * @throws InvalidDestinationException if a client uses this method with a <CODE>
-     *                                     MessageProducer</CODE> with an invalid destination.
-     * @throws java.lang.UnsupportedOperationException
-     *                                     if a client uses this method with a <CODE>
-     *                                     MessageProducer</CODE> that did not specify
a
-     *                                     destination at creation time.
-     * @see javax.jms.Session#createProducer
-     * @see javax.jms.MessageProducer
-     * @since 1.1
-     */
-    public void send(Message message) throws JMSException {
-        this.send(this.getDestination(),
-                  message,
-                  this.defaultDeliveryMode,
-                  this.defaultPriority,
-                  this.defaultTimeToLive);
-    }
-
-    /**
-     * Sends a message to the destination, specifying delivery mode, priority,
-     * and time to live.
-     *
-     * @param message      the message to send
-     * @param deliveryMode the delivery mode to use
-     * @param priority     the priority for this message
-     * @param timeToLive   the message's lifetime (in milliseconds)
-     * @throws JMSException                if the JMS provider fails to send the message
due to some
-     *                                     internal error.
-     * @throws MessageFormatException      if an invalid message is specified.
-     * @throws InvalidDestinationException if a client uses this method with a <CODE>
-     *                                     MessageProducer</CODE> with an invalid destination.
-     * @throws java.lang.UnsupportedOperationException
-     *                                     if a client uses this method with a <CODE>
-     *                                     MessageProducer</CODE> that did not specify
a
-     *                                     destination at creation time.
-     * @see javax.jms.Session#createProducer
-     * @since 1.1
-     */
-    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws
JMSException {
-        this.send(this.getDestination(),
-                  message,
-                  deliveryMode,
-                  priority,
-                  timeToLive);
-    }
-
-    /**
-     * Sends a message to a destination for an unidentified message producer.
-     * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
-     * priority, and time to live.
-     * <P>
-     * Typically, a message producer is assigned a destination at creation
-     * time; however, the JMS API also supports unidentified message producers,
-     * which require that the destination be supplied every time a message is
-     * sent.
-     *
-     * @param destination the destination to send this message to
-     * @param message     the message to send
-     * @throws JMSException                if the JMS provider fails to send the message
due to some
-     *                                     internal error.
-     * @throws MessageFormatException      if an invalid message is specified.
-     * @throws InvalidDestinationException if a client uses this method with an invalid destination.
-     * @throws java.lang.UnsupportedOperationException
-     *                                     if a client uses this method with a <CODE>
-     *                                     MessageProducer</CODE> that specified a
destination at
-     *                                     creation time.
-     * @see javax.jms.Session#createProducer
-     * @see javax.jms.MessageProducer
-     */
-    public void send(Destination destination, Message message) throws JMSException {
-        this.send(destination,
-                  message,
-                  this.defaultDeliveryMode,
-                  this.defaultPriority,
-                  this.defaultTimeToLive);
     }
 
     /**

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java?view=auto&rev=554315
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
Sat Jul  7 22:39:00 2007
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.jms.Message;
+
+/**
+ * A useful base class for implementing a {@link MessageProducer}
+ *
+ * @version $Revision: $
+ */
+public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable
{
+    protected ActiveMQSession session;
+    protected boolean disableMessageID;
+    protected boolean disableMessageTimestamp;
+    protected int defaultDeliveryMode;
+    protected int defaultPriority;
+    protected long defaultTimeToLive;
+
+    public ActiveMQMessageProducerSupport(ActiveMQSession session) {
+        this.session = session;
+        disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
+    }
+
+    /**
+     * Sets whether message IDs are disabled.
+     * <P>
+     * Since message IDs take some effort to create and increase a message's
+     * size, some JMS providers may be able to optimize message overhead if
+     * they are given a hint that the message ID is not used by an application.
+     * By calling the <CODE>setDisableMessageID</CODE> method on this message
+     * producer, a JMS client enables this potential optimization for all
+     * messages sent by this message producer. If the JMS provider accepts this
+     * hint, these messages must have the message ID set to null; if the
+     * provider ignores the hint, the message ID must be set to its normal
+     * unique value.
+     * <P>
+     * Message IDs are enabled by default.
+     *
+     * @param value indicates if message IDs are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due
to
+     *                      some internal error.
+     */
+    public void setDisableMessageID(boolean value) throws JMSException {
+        checkClosed();
+        this.disableMessageID = value;
+    }
+
+    /**
+     * Gets an indication of whether message IDs are disabled.
+     *
+     * @return an indication of whether message IDs are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to determine if message IDs
are
+     *                      disabled due to some internal error.
+     */
+    public boolean getDisableMessageID() throws JMSException {
+        checkClosed();
+        return this.disableMessageID;
+    }
+
+    /**
+     * Sets whether message timestamps are disabled.
+     * <P>
+     * Since timestamps take some effort to create and increase a message's
+     * size, some JMS providers may be able to optimize message overhead if
+     * they are given a hint that the timestamp is not used by an application.
+     * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this
+     * message producer, a JMS client enables this potential optimization for
+     * all messages sent by this message producer. If the JMS provider accepts
+     * this hint, these messages must have the timestamp set to zero; if the
+     * provider ignores the hint, the timestamp must be set to its normal
+     * value.
+     * <P>
+     * Message timestamps are enabled by default.
+     *
+     * @param value indicates if message timestamps are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due
to
+     *                      some internal error.
+     */
+    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+        checkClosed();
+        this.disableMessageTimestamp = value;
+    }
+
+    /**
+     * Gets an indication of whether message timestamps are disabled.
+     *
+     * @return an indication of whether message timestamps are disabled
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due
to
+     *                      some internal error.
+     */
+    public boolean getDisableMessageTimestamp() throws JMSException {
+        checkClosed();
+        return this.disableMessageTimestamp;
+    }
+
+    /**
+     * Sets the producer's default delivery mode.
+     * <P>
+     * Delivery mode is set to <CODE>PERSISTENT</CODE> by default.
+     *
+     * @param newDeliveryMode the message delivery mode for this message producer; legal
+     *                        values are <code>DeliveryMode.NON_PERSISTENT</code>
and
+     *                        <code>DeliveryMode.PERSISTENT</code>
+     * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode
due to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#getDeliveryMode
+     * @see javax.jms.DeliveryMode#NON_PERSISTENT
+     * @see javax.jms.DeliveryMode#PERSISTENT
+     * @see javax.jms.Message#DEFAULT_DELIVERY_MODE
+     */
+    public void setDeliveryMode(int newDeliveryMode) throws JMSException {
+        if (newDeliveryMode != DeliveryMode.PERSISTENT && newDeliveryMode != DeliveryMode.NON_PERSISTENT)
{
+            throw new javax.jms.IllegalStateException("unkown delivery mode: " + newDeliveryMode);
+        }
+        checkClosed();
+        this.defaultDeliveryMode = newDeliveryMode;
+    }
+
+    /**
+     * Gets the producer's default delivery mode.
+     *
+     * @return the message delivery mode for this message producer
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due
to
+     *                      some internal error.
+     */
+    public int getDeliveryMode() throws JMSException {
+        checkClosed();
+        return this.defaultDeliveryMode;
+    }
+
+    /**
+     * Sets the producer's default priority.
+     * <P>
+     * The JMS API defines ten levels of priority value, with 0 as the lowest
+     * priority and 9 as the highest. Clients should consider priorities 0-4 as
+     * gradations of normal priority and priorities 5-9 as gradations of
+     * expedited priority. Priority is set to 4 by default.
+     *
+     * @param newDefaultPriority the message priority for this message producer; must be
a
+     *                           value between 0 and 9
+     * @throws javax.jms.JMSException if the JMS provider fails to set the delivery mode
due to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#getPriority
+     * @see javax.jms.Message#DEFAULT_PRIORITY
+     */
+    public void setPriority(int newDefaultPriority) throws JMSException {
+        if (newDefaultPriority < 0 || newDefaultPriority > 9) {
+            throw new IllegalStateException("default priority must be a value between 0 and
9");
+        }
+        checkClosed();
+        this.defaultPriority = newDefaultPriority;
+    }
+
+    /**
+     * Gets the producer's default priority.
+     *
+     * @return the message priority for this message producer
+     * @throws javax.jms.JMSException if the JMS provider fails to close the producer due
to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#setPriority
+     */
+    public int getPriority() throws JMSException {
+        checkClosed();
+        return this.defaultPriority;
+    }
+
+    /**
+     * Sets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     * <P>
+     * Time to live is set to zero by default.
+     *
+     * @param timeToLive the message time to live in milliseconds; zero is unlimited
+     * @throws javax.jms.JMSException if the JMS provider fails to set the time to live due
to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#getTimeToLive
+     * @see javax.jms.Message#DEFAULT_TIME_TO_LIVE
+     */
+    public void setTimeToLive(long timeToLive) throws JMSException {
+        if (timeToLive < 0l) {
+            throw new IllegalStateException("cannot set a negative timeToLive");
+        }
+        checkClosed();
+        this.defaultTimeToLive = timeToLive;
+    }
+
+    /**
+     * Gets the default length of time in milliseconds from its dispatch time
+     * that a produced message should be retained by the message system.
+     *
+     * @return the message time to live in milliseconds; zero is unlimited
+     * @throws javax.jms.JMSException if the JMS provider fails to get the time to live due
to
+     *                      some internal error.
+     * @see javax.jms.MessageProducer#setTimeToLive
+     */
+    public long getTimeToLive() throws JMSException {
+        checkClosed();
+        return this.defaultTimeToLive;
+    }
+
+    /**
+     * Sends a message using the <CODE>MessageProducer</CODE>'s default
+     * delivery mode, priority, and time to live.
+     *
+     * @param message the message to send
+     * @throws javax.jms.JMSException                if the JMS provider fails to send the
message due to some
+     *                                     internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with a
<CODE>
+     *                                     MessageProducer</CODE> with an invalid destination.
+     * @throws UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> that did not specify
a
+     *                                     destination at creation time.
+     * @see javax.jms.Session#createProducer
+     * @see javax.jms.MessageProducer
+     * @since 1.1
+     */
+    public void send(Message message) throws JMSException {
+        this.send(this.getDestination(),
+                  message,
+                  this.defaultDeliveryMode,
+                  this.defaultPriority,
+                  this.defaultTimeToLive);
+    }
+
+    /**
+     * Sends a message to the destination, specifying delivery mode, priority,
+     * and time to live.
+     *
+     * @param message      the message to send
+     * @param deliveryMode the delivery mode to use
+     * @param priority     the priority for this message
+     * @param timeToLive   the message's lifetime (in milliseconds)
+     * @throws javax.jms.JMSException                if the JMS provider fails to send the
message due to some
+     *                                     internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with a
<CODE>
+     *                                     MessageProducer</CODE> with an invalid destination.
+     * @throws UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> that did not specify
a
+     *                                     destination at creation time.
+     * @see javax.jms.Session#createProducer
+     * @since 1.1
+     */
+    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws
JMSException {
+        this.send(this.getDestination(),
+                  message,
+                  deliveryMode,
+                  priority,
+                  timeToLive);
+    }
+
+    /**
+     * Sends a message to a destination for an unidentified message producer.
+     * Uses the <CODE>MessageProducer</CODE>'s default delivery mode,
+     * priority, and time to live.
+     * <P>
+     * Typically, a message producer is assigned a destination at creation
+     * time; however, the JMS API also supports unidentified message producers,
+     * which require that the destination be supplied every time a message is
+     * sent.
+     *
+     * @param destination the destination to send this message to
+     * @param message     the message to send
+     * @throws javax.jms.JMSException                if the JMS provider fails to send the
message due to some
+     *                                     internal error.
+     * @throws javax.jms.MessageFormatException      if an invalid message is specified.
+     * @throws javax.jms.InvalidDestinationException if a client uses this method with an
invalid destination.
+     * @throws UnsupportedOperationException
+     *                                     if a client uses this method with a <CODE>
+     *                                     MessageProducer</CODE> that specified a
destination at
+     *                                     creation time.
+     * @see javax.jms.Session#createProducer
+     * @see javax.jms.MessageProducer
+     */
+    public void send(Destination destination, Message message) throws JMSException {
+        this.send(destination,
+                  message,
+                  this.defaultDeliveryMode,
+                  this.defaultPriority,
+                  this.defaultTimeToLive);
+    }
+
+
+    protected abstract void checkClosed() throws IllegalStateException;
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=554315&r1=554314&r2=554315
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Sat
Jul  7 22:39:00 2007
@@ -851,6 +851,11 @@
      */
     public MessageProducer createProducer(Destination destination) throws JMSException {
         checkClosed();
+        if (destination instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) destination;
+            return customDestination.createProducer(this);
+        }
+
         return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation
                 .transformDestination(destination));
     }
@@ -904,6 +909,12 @@
      */
     public MessageConsumer createConsumer(Destination destination, String messageSelector)
throws JMSException {
         checkClosed();
+
+        if (destination instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) destination;
+            return customDestination.createConsumer(this, messageSelector);
+        }
+
         int prefetch = 0;
 
         ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
@@ -958,7 +969,7 @@
      *            expression are delivered. A value of null or an empty string
      *            indicates that there is no message selector for the message
      *            consumer.
-     * @param NoLocal -
+     * @param noLocal -
      *            if true, and the destination is a topic, inhibits the delivery
      *            of messages published by its own connection. The behavior for
      *            <CODE>NoLocal</CODE> is not specified if the destination is
@@ -973,13 +984,20 @@
      *             if the message selector is invalid.
      * @since 1.1
      */
-    public MessageConsumer createConsumer(Destination destination, String messageSelector,
boolean NoLocal)
+    public MessageConsumer createConsumer(Destination destination, String messageSelector,
boolean noLocal)
             throws JMSException {
         checkClosed();
+
+        if (destination instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) destination;
+            return customDestination.createConsumer(this, messageSelector, noLocal);
+        }
+
+
         ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
         return new ActiveMQMessageConsumer(this, getNextConsumerId(), 
                 ActiveMQMessageTransformation.transformDestination(destination), null, messageSelector,

-                prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(),
NoLocal, false, asyncDispatch);
+                prefetchPolicy.getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(),
noLocal, false, asyncDispatch);
     }
 
     /**
@@ -1145,6 +1163,12 @@
     public TopicSubscriber createDurableSubscriber(Topic topic,String name,String messageSelector,boolean
noLocal)
                     throws JMSException{
         checkClosed();
+
+        if (topic instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) topic;
+            return customDestination.createDurableSubscriber(this, name, messageSelector,
noLocal);
+        }
+
         connection.checkClientIDWasManuallySpecified();
         ActiveMQPrefetchPolicy prefetchPolicy=this.connection.getPrefetchPolicy();
         int prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy
@@ -1272,6 +1296,12 @@
      */
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
         checkClosed();
+
+        if (queue instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) queue;
+            return customDestination.createReceiver(this, messageSelector);
+        }
+
         ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
         return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation
                 .transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),

@@ -1294,6 +1324,11 @@
      */
     public QueueSender createSender(Queue queue) throws JMSException {
         checkClosed();
+        if (queue instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) queue;
+            return customDestination.createSender(this);
+        }
+
         return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
     }
 
@@ -1366,6 +1401,12 @@
      */
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean
noLocal) throws JMSException {
         checkClosed();
+
+        if (topic instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) topic;
+            return customDestination.createSubscriber(this, messageSelector, noLocal);
+        }
+
         ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
         return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation
                 .transformDestination(topic), null, messageSelector, prefetchPolicy.getTopicPrefetch(),

@@ -1392,6 +1433,11 @@
      */
     public TopicPublisher createPublisher(Topic topic) throws JMSException {
         checkClosed();
+        
+        if (topic instanceof CustomDestination)  {
+            CustomDestination customDestination = (CustomDestination) topic;
+            return customDestination.createPublisher(this);
+        }
         return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
     }
 
@@ -1807,6 +1853,10 @@
         return transformer;
     }
 
+    public ActiveMQConnection getConnection() {
+        return connection;
+    }
+
     /**
      * Sets the transformer used to transform messages before they are sent on to the JMS
bus
      * or when they are received from the bus but before they are delivered to the JMS client
@@ -1830,7 +1880,6 @@
     public List getUnconsumedMessages() {
 		return executor.getUnconsumedMessages();
 	}
-
 
     public String toString() {
         return "ActiveMQSession {id="+info.getSessionId()+",started="+started.get()+"}";

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java?view=auto&rev=554315
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java
(added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/CustomDestination.java
Sat Jul  7 22:39:00 2007
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TopicSubscriber;
+import javax.jms.QueueReceiver;
+import javax.jms.TopicPublisher;
+import javax.jms.QueueSender;
+import javax.jms.JMSException;
+
+/**
+ * Represents a hook to allow the support of custom destinations
+ * such as to support <a href="http://activemq.apache.org/camel/">Apache Camel</a>
+ * to create and manage endpoints
+ *
+ * @version $Revision: $
+ */
+public interface CustomDestination extends Destination {
+
+    // Consumers
+    //-----------------------------------------------------------------------
+    MessageConsumer createConsumer(ActiveMQSession session, String messageSelector);
+    MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean
noLocal);
+
+    TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean
noLocal);
+    TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String
messageSelector, boolean noLocal);
+
+    QueueReceiver createReceiver(ActiveMQSession session, String messageSelector);
+
+    // Producers
+    //-----------------------------------------------------------------------
+    MessageProducer createProducer(ActiveMQSession session) throws JMSException;
+
+    TopicPublisher createPublisher(ActiveMQSession session) throws JMSException;
+
+    QueueSender createSender(ActiveMQSession session) throws JMSException;
+}



Mime
View raw message