activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1166216 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ActiveMQSession.java test/java/org/apache/activemq/JMSIndividualAckTest.java
Date Wed, 07 Sep 2011 15:11:11 GMT
Author: tabish
Date: Wed Sep  7 15:11:10 2011
New Revision: 1166216

URL: http://svn.apache.org/viewvc?rev=1166216&view=rev
Log:
fix for https://issues.apache.org/jira/browse/AMQ-3486 with test case.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1166216&r1=1166215&r2=1166216&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed
Sep  7 15:11:10 2011
@@ -87,6 +87,7 @@ import org.apache.activemq.thread.Schedu
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.Callback;
+import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -172,21 +173,21 @@ import org.slf4j.LoggerFactory;
  * transactions directly, it is unlikely that many JMS clients will do this.
  * Support for JTA in the JMS API is targeted at systems vendors who will be
  * integrating the JMS API into their application server products.
- * 
- * 
+ *
+ *
  * @see javax.jms.Session
  * @see javax.jms.QueueSession
  * @see javax.jms.TopicSession
  * @see javax.jms.XASession
  */
 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable,
ActiveMQDispatcher {
-	
-	/**
-	 * Only acknowledge an individual message - using message.acknowledge()
-	 * as opposed to CLIENT_ACKNOWLEDGE which 
-	 * acknowledges all messages consumed by a session at when acknowledge()
-	 * is called
-	 */
+
+    /**
+     * Only acknowledge an individual message - using message.acknowledge()
+     * as opposed to CLIENT_ACKNOWLEDGE which
+     * acknowledges all messages consumed by a session at when acknowledge()
+     * is called
+     */
     public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
     public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
 
@@ -229,7 +230,7 @@ public class ActiveMQSession implements 
 
     /**
      * Construct the Session
-     * 
+     *
      * @param connection
      * @param sessionId
      * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
@@ -253,7 +254,7 @@ public class ActiveMQSession implements 
         this.scheduler=connection.getScheduler();
         this.connectionExecutor=connection.getExecutor();
         this.executor = new ActiveMQSessionExecutor(this);
-        connection.addSession(this);        
+        connection.addSession(this);
         if (connection.isStarted()) {
             start();
         }
@@ -266,7 +267,7 @@ public class ActiveMQSession implements 
 
     /**
      * Sets the transaction context of the session.
-     * 
+     *
      * @param transactionContext - provides the means to control a JMS
      *                transaction.
      */
@@ -276,7 +277,7 @@ public class ActiveMQSession implements 
 
     /**
      * Returns the transaction context of the session.
-     * 
+     *
      * @return transactionContext - session's transaction context.
      */
     public TransactionContext getTransactionContext() {
@@ -285,7 +286,7 @@ public class ActiveMQSession implements 
 
     /*
      * (non-Javadoc)
-     * 
+     *
      * @see org.apache.activemq.management.StatsCapable#getStats()
      */
     public StatsImpl getStats() {
@@ -294,7 +295,7 @@ public class ActiveMQSession implements 
 
     /**
      * Returns the session's statistics.
-     * 
+     *
      * @return stats - session's statistics.
      */
     public JMSSessionStatsImpl getSessionStats() {
@@ -305,7 +306,7 @@ public class ActiveMQSession implements 
      * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
      * object is used to send a message containing a stream of uninterpreted
      * bytes.
-     * 
+     *
      * @return the an ActiveMQBytesMessage
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
@@ -321,7 +322,7 @@ public class ActiveMQSession implements 
      * object is used to send a self-defining set of name-value pairs, where
      * names are <CODE>String</CODE> objects and values are primitive values
      * in the Java programming language.
-     * 
+     *
      * @return an ActiveMQMapMessage
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
@@ -338,7 +339,7 @@ public class ActiveMQSession implements 
      * <CODE>Message</CODE> object holds all the standard message header
      * information. It can be sent when a message containing only header
      * information is sufficient.
-     * 
+     *
      * @return an ActiveMQMessage
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
@@ -353,7 +354,7 @@ public class ActiveMQSession implements 
      * Creates an <CODE>ObjectMessage</CODE> object. An
      * <CODE>ObjectMessage</CODE> object is used to send a message that
      * contains a serializable Java object.
-     * 
+     *
      * @return an ActiveMQObjectMessage
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
@@ -368,7 +369,7 @@ public class ActiveMQSession implements 
      * Creates an initialized <CODE>ObjectMessage</CODE> object. An
      * <CODE>ObjectMessage</CODE> object is used to send a message that
      * contains a serializable Java object.
-     * 
+     *
      * @param object the object to use to initialize this message
      * @return an ActiveMQObjectMessage
      * @throws JMSException if the JMS provider fails to create this message due
@@ -385,7 +386,7 @@ public class ActiveMQSession implements 
      * Creates a <CODE>StreamMessage</CODE> object. A
      * <CODE>StreamMessage</CODE> object is used to send a self-defining
      * stream of primitive values in the Java programming language.
-     * 
+     *
      * @return an ActiveMQStreamMessage
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
@@ -400,7 +401,7 @@ public class ActiveMQSession implements 
      * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
      * object is used to send a message containing a <CODE>String</CODE>
      * object.
-     * 
+     *
      * @return an ActiveMQTextMessage
      * @throws JMSException if the JMS provider fails to create this message due
      *                 to some internal error.
@@ -415,7 +416,7 @@ public class ActiveMQSession implements 
      * Creates an initialized <CODE>TextMessage</CODE> object. A
      * <CODE>TextMessage</CODE> object is used to send a message containing a
      * <CODE>String</CODE>.
-     * 
+     *
      * @param text the string used to initialize this message
      * @return an ActiveMQTextMessage
      * @throws JMSException if the JMS provider fails to create this message due
@@ -432,7 +433,7 @@ public class ActiveMQSession implements 
      * Creates an initialized <CODE>BlobMessage</CODE> object. A
      * <CODE>BlobMessage</CODE> object is used to send a message containing a
      * <CODE>URL</CODE> which points to some network addressible BLOB.
-     * 
+     *
      * @param url the network addressable URL used to pass directly to the
      *                consumer
      * @return a BlobMessage
@@ -447,7 +448,7 @@ public class ActiveMQSession implements 
      * Creates an initialized <CODE>BlobMessage</CODE> object. A
      * <CODE>BlobMessage</CODE> object is used to send a message containing a
      * <CODE>URL</CODE> which points to some network addressible BLOB.
-     * 
+     *
      * @param url the network addressable URL used to pass directly to the
      *                consumer
      * @param deletedByBroker indicates whether or not the resource is deleted
@@ -471,7 +472,7 @@ public class ActiveMQSession implements 
      * the <CODE>File</CODE> content. Before the message is sent the file
      * conent will be uploaded to the broker or some other remote repository
      * depending on the {@link #getBlobTransferPolicy()}.
-     * 
+     *
      * @param file the file to be uploaded to some remote repo (or the broker)
      *                depending on the strategy
      * @return a BlobMessage
@@ -494,7 +495,7 @@ public class ActiveMQSession implements 
      * the <CODE>File</CODE> content. Before the message is sent the file
      * conent will be uploaded to the broker or some other remote repository
      * depending on the {@link #getBlobTransferPolicy()}.
-     * 
+     *
      * @param in the stream to be uploaded to some remote repo (or the broker)
      *                depending on the strategy
      * @return a BlobMessage
@@ -512,7 +513,7 @@ public class ActiveMQSession implements 
 
     /**
      * Indicates whether the session is in transacted mode.
-     * 
+     *
      * @return true if the session is in transacted mode
      * @throws JMSException if there is some internal error.
      */
@@ -525,7 +526,7 @@ public class ActiveMQSession implements 
      * Returns the acknowledgement mode of the session. The acknowledgement mode
      * is set at the time that the session is created. If the session is
      * transacted, the acknowledgement mode is ignored.
-     * 
+     *
      * @return If the session is not transacted, returns the current
      *         acknowledgement mode for the session. If the session is
      *         transacted, returns SESSION_TRANSACTED.
@@ -541,7 +542,7 @@ public class ActiveMQSession implements 
     /**
      * Commits all messages done in this transaction and releases any locks
      * currently held.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to commit the transaction
      *                 due to some internal error.
      * @throws TransactionRolledBackException if the transaction is rolled back
@@ -563,7 +564,7 @@ public class ActiveMQSession implements 
     /**
      * Rolls back any messages done in this transaction and releases any locks
      * currently held.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to roll back the
      *                 transaction due to some internal error.
      * @throws javax.jms.IllegalStateException if the method is not called by a
@@ -604,7 +605,7 @@ public class ActiveMQSession implements 
      * Invoking any other <CODE>Session</CODE> method on a closed session must
      * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
      * closed session must <I>not </I> throw an exception.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to close the session due
      *                 to some internal error.
      */
@@ -643,13 +644,13 @@ public class ActiveMQSession implements 
     }
 
     void clearMessagesInProgress() {
-        executor.clearMessagesInProgress();        
+        executor.clearMessagesInProgress();
         // we are called from inside the transport reconnection logic
         // which involves us clearing all the connections' consumers
-        // dispatch and delivered lists. So rather than trying to 
-        // grab a mutex (which could be already owned by the message 
-        // listener calling the send or an ack) we allow it to complete in 
-        // a separate thread via the scheduler and notify us via 
+        // dispatch and delivered lists. So rather than trying to
+        // grab a mutex (which could be already owned by the message
+        // listener calling the send or an ack) we allow it to complete in
+        // a separate thread via the scheduler and notify us via
         // connection.transportInterruptionProcessingComplete()
         //
         for (final ActiveMQMessageConsumer consumer : consumers) {
@@ -714,7 +715,7 @@ public class ActiveMQSession implements 
     /**
      * Check if the session is closed. It is used for ensuring that the session
      * is open before performing various operations.
-     * 
+     *
      * @throws IllegalStateException if the Session is closed
      */
     protected void checkClosed() throws IllegalStateException {
@@ -749,7 +750,7 @@ public class ActiveMQSession implements 
      * that had been previously delivered. Redelivered messages do not have to
      * be delivered in exactly their original delivery order.
      * </UL>
-     * 
+     *
      * @throws JMSException if the JMS provider fails to stop and restart
      *                 message delivery due to some internal error.
      * @throws IllegalStateException if the method is called by a transacted
@@ -771,7 +772,7 @@ public class ActiveMQSession implements 
 
     /**
      * Returns the session's distinguished message listener (optional).
-     * 
+     *
      * @return the message listener associated with this session
      * @throws JMSException if the JMS provider fails to get the message
      *                 listener due to an internal error.
@@ -792,7 +793,7 @@ public class ActiveMQSession implements 
      * messages are still supported.
      * <P>
      * This is an expert facility not used by regular JMS clients.
-     * 
+     *
      * @param listener the message listener to associate with this session
      * @throws JMSException if the JMS provider fails to set the message
      *                 listener due to an internal error.
@@ -812,7 +813,7 @@ public class ActiveMQSession implements 
     /**
      * Optional operation, intended to be used only by Application Servers, not
      * by ordinary JMS clients.
-     * 
+     *
      * @see javax.jms.ServerSession
      */
     public void run() {
@@ -876,7 +877,7 @@ public class ActiveMQSession implements 
                                 ack.setFirstMessageId(md.getMessage().getMessageId());
                                 asyncSendPacket(ack);
                             } else {
-                                
+
                                 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE,
1);
                                 ack.setFirstMessageId(md.getMessage().getMessageId());
                                 asyncSendPacket(ack);
@@ -916,7 +917,7 @@ public class ActiveMQSession implements 
      * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE>
both
      * inherit from <CODE>Destination</CODE>, they can be used in the
      * destination parameter to create a <CODE>MessageProducer</CODE> object.
-     * 
+     *
      * @param destination the <CODE>Destination</CODE> to send to, or null if
      *                this is a producer which does not have a specified
      *                destination.
@@ -942,7 +943,7 @@ public class ActiveMQSession implements 
      * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit
from
      * <CODE>Destination</CODE>, they can be used in the destination
      * parameter to create a <CODE>MessageConsumer</CODE>.
-     * 
+     *
      * @param destination the <CODE>Destination</CODE> to access.
      * @return the MessageConsumer
      * @throws JMSException if the session fails to create a consumer due to
@@ -964,7 +965,7 @@ public class ActiveMQSession implements 
      * <P>
      * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
      * that have been sent to a destination.
-     * 
+     *
      * @param destination the <CODE>Destination</CODE> to access
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1047,7 +1048,7 @@ public class ActiveMQSession implements 
      * inhibit the delivery of messages published by its own connection. The
      * default value for this attribute is False. The <CODE>noLocal</CODE>
      * value must be supported by destinations that are topics.
-     * 
+     *
      * @param destination the <CODE>Destination</CODE> to access
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1139,7 +1140,7 @@ public class ActiveMQSession implements 
      * initiated by the JMS API. The one exception is the creation of temporary
      * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
      * method.
-     * 
+     *
      * @param queueName the name of this <CODE>Queue</CODE>
      * @return a <CODE>Queue</CODE> with the given name
      * @throws JMSException if the session fails to create a queue due to some
@@ -1167,7 +1168,7 @@ public class ActiveMQSession implements 
      * initiated by the JMS API. The one exception is the creation of temporary
      * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
      * method.
-     * 
+     *
      * @param topicName the name of this <CODE>Topic</CODE>
      * @return a <CODE>Topic</CODE> with the given name
      * @throws JMSException if the session fails to create a topic due to some
@@ -1185,7 +1186,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
      * the specified queue.
-     * 
+     *
      * @param queue the <CODE>queue</CODE> to access
      * @exception InvalidDestinationException if an invalid destination is
      *                    specified
@@ -1216,7 +1217,7 @@ public class ActiveMQSession implements 
      * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
      * inhibit the delivery of messages published by its own connection. The
      * default value for this attribute is false.
-     * 
+     *
      * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
      * @param name the name used to identify this subscription
      * @return the TopicSubscriber
@@ -1254,7 +1255,7 @@ public class ActiveMQSession implements 
      * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
      * and/or message selector. Changing a durable subscriber is equivalent to
      * unsubscribing (deleting) the old one and creating a new one.
-     * 
+     *
      * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
      * @param name the name used to identify this subscription
      * @param messageSelector only messages with properties matching the message
@@ -1273,6 +1274,11 @@ public class ActiveMQSession implements 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector,
boolean noLocal) throws JMSException {
         checkClosed();
 
+        if (isIndividualAcknowledge()) {
+            throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session
in "+
+                                             "INDIVIDUAL_ACKNOWLEDGE mode.", null);
+        }
+
         if (topic instanceof CustomDestination) {
             CustomDestination customDestination = (CustomDestination)topic;
             return customDestination.createDurableSubscriber(this, name, messageSelector,
noLocal);
@@ -1289,7 +1295,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
      * the specified queue.
-     * 
+     *
      * @param queue the <CODE>queue</CODE> to access
      * @return the Queue Browser
      * @throws JMSException if the session fails to create a browser due to some
@@ -1306,7 +1312,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
      * the specified queue using a message selector.
-     * 
+     *
      * @param queue the <CODE>queue</CODE> to access
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1328,7 +1334,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
      * of the <CODE>Connection</CODE> unless it is deleted earlier.
-     * 
+     *
      * @return a temporary queue identity
      * @throws JMSException if the session fails to create a temporary queue due
      *                 to some internal error.
@@ -1342,7 +1348,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
      * of the <CODE>Connection</CODE> unless it is deleted earlier.
-     * 
+     *
      * @return a temporary topic identity
      * @throws JMSException if the session fails to create a temporary topic due
      *                 to some internal error.
@@ -1356,7 +1362,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
      * the specified queue.
-     * 
+     *
      * @param queue the <CODE>Queue</CODE> to access
      * @return
      * @throws JMSException if the session fails to create a receiver due to
@@ -1372,7 +1378,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
      * the specified queue using a message selector.
-     * 
+     *
      * @param queue the <CODE>Queue</CODE> to access
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1400,7 +1406,7 @@ public class ActiveMQSession implements 
     /**
      * Creates a <CODE>QueueSender</CODE> object to send messages to the
      * specified queue.
-     * 
+     *
      * @param queue the <CODE>Queue</CODE> to access, or null if this is an
      *                unidentified producer
      * @return QueueSender
@@ -1431,7 +1437,7 @@ public class ActiveMQSession implements 
      * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
      * inhibit the delivery of messages published by its own connection. The
      * default value for this attribute is false.
-     * 
+     *
      * @param topic the <CODE>Topic</CODE> to subscribe to
      * @return TopicSubscriber
      * @throws JMSException if the session fails to create a subscriber due to
@@ -1462,7 +1468,7 @@ public class ActiveMQSession implements 
      * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
      * inhibit the delivery of messages published by its own connection. The
      * default value for this attribute is false.
-     * 
+     *
      * @param topic the <CODE>Topic</CODE> to subscribe to
      * @param messageSelector only messages with properties matching the message
      *                selector expression are delivered. A value of null or an
@@ -1496,7 +1502,7 @@ public class ActiveMQSession implements 
      * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
      * a topic, it defines a new sequence of messages that have no ordering
      * relationship with the messages it has previously sent.
-     * 
+     *
      * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
      *                an unidentified producer
      * @return TopicPublisher
@@ -1526,7 +1532,7 @@ public class ActiveMQSession implements 
      * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
      * message is part of a pending transaction or has not been acknowledged in
      * the session.
-     * 
+     *
      * @param name the name used to identify this subscription
      * @throws JMSException if the session fails to unsubscribe to the durable
      *                 subscription due to some internal error.
@@ -1567,7 +1573,7 @@ public class ActiveMQSession implements 
      * group, thereby acknowledging all messages consumed by the session.)
      * <P>
      * Messages that have been received but not acknowledged may be redelivered.
-     * 
+     *
      * @throws JMSException if the JMS provider fails to acknowledge the
      *                 messages due to some internal error.
      * @throws javax.jms.IllegalStateException if this method is called on a
@@ -1583,7 +1589,7 @@ public class ActiveMQSession implements 
 
     /**
      * Add a message consumer.
-     * 
+     *
      * @param consumer - message consumer.
      * @throws JMSException
      */
@@ -1597,7 +1603,7 @@ public class ActiveMQSession implements 
 
     /**
      * Remove the message consumer.
-     * 
+     *
      * @param consumer - consumer to be removed.
      * @throws JMSException
      */
@@ -1612,7 +1618,7 @@ public class ActiveMQSession implements 
 
     /**
      * Adds a message producer.
-     * 
+     *
      * @param producer - message producer to be added.
      * @throws JMSException
      */
@@ -1623,7 +1629,7 @@ public class ActiveMQSession implements 
 
     /**
      * Removes a message producer.
-     * 
+     *
      * @param producer - message producer to be removed.
      * @throws JMSException
      */
@@ -1634,7 +1640,7 @@ public class ActiveMQSession implements 
 
     /**
      * Start this Session.
-     * 
+     *
      * @throws JMSException
      */
     protected void start() throws JMSException {
@@ -1648,7 +1654,7 @@ public class ActiveMQSession implements 
 
     /**
      * Stops this session.
-     * 
+     *
      * @throws JMSException
      */
     protected void stop() throws JMSException {
@@ -1664,7 +1670,7 @@ public class ActiveMQSession implements 
 
     /**
      * Returns the session id.
-     * 
+     *
      * @return value - session id.
      */
     protected SessionId getSessionId() {
@@ -1687,7 +1693,7 @@ public class ActiveMQSession implements 
 
     /**
      * Sends the message for dispatch by the broker.
-     * 
+     *
      * @param producer - message producer.
      * @param destination - message destination.
      * @param message - message to be sent.
@@ -1775,7 +1781,7 @@ public class ActiveMQSession implements 
 
     /**
      * Send TransactionInfo to indicate transaction has started
-     * 
+     *
      * @throws JMSException if some internal error occurs
      */
     protected void doStartTransaction() throws JMSException {
@@ -1786,7 +1792,7 @@ public class ActiveMQSession implements 
 
     /**
      * Checks whether the session has unconsumed messages.
-     * 
+     *
      * @return true - if there are unconsumed messages.
      */
     public boolean hasUncomsumedMessages() {
@@ -1795,7 +1801,7 @@ public class ActiveMQSession implements 
 
     /**
      * Checks whether the session uses transactions.
-     * 
+     *
      * @return true - if the session uses transactions.
      */
     public boolean isTransacted() {
@@ -1804,7 +1810,7 @@ public class ActiveMQSession implements 
 
     /**
      * Checks whether the session used client acknowledgment.
-     * 
+     *
      * @return true - if the session uses client acknowledgment.
      */
     protected boolean isClientAcknowledge() {
@@ -1813,7 +1819,7 @@ public class ActiveMQSession implements 
 
     /**
      * Checks whether the session used auto acknowledgment.
-     * 
+     *
      * @return true - if the session uses client acknowledgment.
      */
     public boolean isAutoAcknowledge() {
@@ -1822,20 +1828,20 @@ public class ActiveMQSession implements 
 
     /**
      * Checks whether the session used dup ok acknowledgment.
-     * 
+     *
      * @return true - if the session uses client acknowledgment.
      */
     public boolean isDupsOkAcknowledge() {
         return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
     }
-    
+
     public boolean isIndividualAcknowledge(){
-    	return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
+        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
     }
 
     /**
      * Returns the message delivery listener.
-     * 
+     *
      * @return deliveryListener - message delivery listener.
      */
     public DeliveryListener getDeliveryListener() {
@@ -1844,7 +1850,7 @@ public class ActiveMQSession implements 
 
     /**
      * Sets the message delivery listener.
-     * 
+     *
      * @param deliveryListener - message delivery listener.
      */
     public void setDeliveryListener(DeliveryListener deliveryListener) {
@@ -1853,7 +1859,7 @@ public class ActiveMQSession implements 
 
     /**
      * Returns the SessionInfo bean.
-     * 
+     *
      * @return info - SessionInfo bean.
      * @throws JMSException
      */
@@ -1864,7 +1870,7 @@ public class ActiveMQSession implements 
 
     /**
      * Send the asynchronus command.
-     * 
+     *
      * @param command - command to be executed.
      * @throws JMSException
      */
@@ -1874,7 +1880,7 @@ public class ActiveMQSession implements 
 
     /**
      * Send the synchronus command.
-     * 
+     *
      * @param command - command to be executed.
      * @return Response
      * @throws JMSException
@@ -2019,7 +2025,7 @@ public class ActiveMQSession implements 
         }
         return false;
     }
-    
+
     /**
      * highest sequence id of the last message delivered by this session.
      * Passed to the broker in the close command, maintained by dispose()
@@ -2028,11 +2034,11 @@ public class ActiveMQSession implements 
     public long getLastDeliveredSequenceId() {
         return lastDeliveredSequenceId;
     }
-    
+
     protected void sendAck(MessageAck ack) throws JMSException {
         sendAck(ack,false);
     }
-    
+
     protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
         if (lazy || connection.isSendAcksAsync() || getTransacted()) {
             asyncSendPacket(ack);
@@ -2040,11 +2046,11 @@ public class ActiveMQSession implements 
             syncSendPacket(ack);
         }
     }
-    
+
     protected Scheduler getScheduler() {
         return this.scheduler;
     }
-    
+
     protected ThreadPoolExecutor getConnectionExecutor() {
         return this.connectionExecutor;
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java?rev=1166216&r1=1166215&r2=1166216&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
Wed Sep  7 15:11:10 2011
@@ -24,9 +24,10 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 /**
- * 
+ *
  */
 public class JMSIndividualAckTest extends TestSupport {
 
@@ -100,7 +101,7 @@ public class JMSIndividualAckTest extend
         Message msg = consumer.receive(1000);
         assertNotNull(msg);
         msg = consumer.receive(1000);
-        assertNotNull(msg);        
+        assertNotNull(msg);
         msg = consumer.receive(1000);
         assertNotNull(msg);
         msg.acknowledge();
@@ -121,10 +122,10 @@ public class JMSIndividualAckTest extend
         assertNull(msg);
         session.close();
     }
-    
+
     /**
      * Tests if unacknowledged messages are being re-delivered when the consumer connects
again.
-     * 
+     *
      * @throws JMSException
      */
     public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
@@ -137,22 +138,39 @@ public class JMSIndividualAckTest extend
         // Consume the message...
         MessageConsumer consumer = session.createConsumer(queue);
         Message msg = consumer.receive(1000);
-        assertNotNull(msg);        
+        assertNotNull(msg);
         // Don't ack the message.
-        
+
         // Reset the session.  This should cause the unacknowledged message to be re-delivered.
         session.close();
         session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
-                
+
         // Attempt to Consume the message...
         consumer = session.createConsumer(queue);
         msg = consumer.receive(2000);
-        assertNotNull(msg);        
+        assertNotNull(msg);
         msg.acknowledge();
-        
+
         session.close();
     }
 
+    /**
+     * Tests that a durable consumer cannot be created for Individual Ack mode.
+     *
+     * @throws JMSException
+     */
+    public void testCreateDurableConsumerFails() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Topic dest = session.createTopic(getName());
+
+        try {
+            session.createDurableSubscriber(dest, getName());
+            fail("Should not be able to create duable subscriber.");
+        } catch(Exception e) {
+        }
+    }
+
     protected String getQueueName() {
         return getClass().getName() + "." + getName();
     }



Mime
View raw message