Author: rajdavies
Date: Fri May 16 02:21:44 2008
New Revision: 656980
URL: http://svn.apache.org/viewvc?rev=656980&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1732
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri May 16 02:21:44 2008
@@ -477,7 +477,7 @@
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
}
}
- if (session.isClientAcknowledge()) {
+ if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
@@ -767,7 +767,14 @@
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
- } else {
+ } else if (session.isIndividualAcknowledge()){
+ MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+ session.asyncSendPacket(ack);
+ synchronized(deliveredMessages){
+ deliveredMessages.remove(md);
+ }
+ }
+ else {
throw new IllegalStateException("Invalid session state.");
}
}
@@ -968,7 +975,7 @@
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
- if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge())
{
+ if (session.isDupsOkAcknowledge() || session.isAutoAcknowledge()
|| session.isIndividualAcknowledge()) {
// Redeliver the message
} else {
// Transacted or Client ack: Deliver the
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Fri
May 16 02:21:44 2008
@@ -132,6 +132,8 @@
* @see javax.jms.XASession
*/
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable,
ActiveMQDispatcher {
+
+ public static final int INDIVIDUAL_ACKNOWLEDGE=4;
public static interface DeliveryListener {
void beforeDelivery(ActiveMQSession session, Message msg);
@@ -710,7 +712,7 @@
continue;
}
- if (isClientAcknowledge()) {
+ if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
}
@@ -1705,6 +1707,10 @@
public boolean isDupsOkAcknowledge() {
return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
}
+
+ public boolean isIndividualAcknowledge(){
+ return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
+ }
/**
* Returns the message delivery listener.
|