activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r656980 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQMessageConsumer.java ActiveMQSession.java
Date Fri, 16 May 2008 09:21:45 GMT
Author: rajdavies
Date: Fri May 16 02:21:44 2008
New Revision: 656980

URL: http://svn.apache.org/viewvc?rev=656980&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1732

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri May 16 02:21:44 2008
@@ -477,7 +477,7 @@
                 m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
             }
         }
-        if (session.isClientAcknowledge()) {
+        if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
                 public void execute() throws Exception {
                     session.checkClosed();
@@ -767,7 +767,14 @@
                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
             } else if (session.isClientAcknowledge()) {
                 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-            } else {
+            } else if (session.isIndividualAcknowledge()){
+            	MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+                session.asyncSendPacket(ack);
+                synchronized(deliveredMessages){
+	                deliveredMessages.remove(md);
+                }
+            }
+            else {
                 throw new IllegalStateException("Invalid session state.");
             }
         }
@@ -968,7 +975,7 @@
                                 }
                                 afterMessageIsConsumed(md, expired);
                             } catch (RuntimeException e) {
-                                if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge())
{
+                                if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge()
|| session.isIndividualAcknowledge()) {
                                     // Redeliver the message
                                 } else {
                                     // Transacted or Client ack: Deliver the

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Fri
May 16 02:21:44 2008
@@ -132,6 +132,8 @@
  * @see javax.jms.XASession
  */
 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable,
ActiveMQDispatcher {
+	
+	public static final int INDIVIDUAL_ACKNOWLEDGE=4;
 
     public static interface DeliveryListener {
         void beforeDelivery(ActiveMQSession session, Message msg);
@@ -710,7 +712,7 @@
                 continue;
             }
 
-            if (isClientAcknowledge()) {
+            if (isClientAcknowledge()||isIndividualAcknowledge()) {
                 message.setAcknowledgeCallback(new Callback() {
                     public void execute() throws Exception {
                     }
@@ -1705,6 +1707,10 @@
     public boolean isDupsOkAcknowledge() {
         return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
     }
+    
+    public boolean isIndividualAcknowledge(){
+    	return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
+    }
 
     /**
      * Returns the message delivery listener.



Mime
View raw message