activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r657817 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ test/java/org/apache/activemq/
Date Mon, 19 May 2008 13:06:37 GMT
Author: rajdavies
Date: Mon May 19 06:06:37 2008
New Revision: 657817

URL: http://svn.apache.org/viewvc?rev=657817&view=rev
Log:
more for https://issues.apache.org/activemq/browse/AMQ-1736

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon May 19 06:06:37 2008
@@ -858,7 +858,7 @@
     }
     
     void acknowledge(MessageDispatch md) throws JMSException {
-        MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+        MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
         session.asyncSendPacket(ack);
         synchronized(deliveredMessages){
             deliveredMessages.remove(md);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon
May 19 06:06:37 2008
@@ -133,6 +133,12 @@
  */
 public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable,
ActiveMQDispatcher {
 	
+	/**
+	 * Only acknowledge an individual message - using message.acknowledge()
+	 * as opposed to CLIENT_ACKNOWLEDGE which 
+	 * acknowledges all messages consumed by a session at when acknowledge()
+	 * is called
+	 */
 	public static final int INDIVIDUAL_ACKNOWLEDGE=4;
 
     public static interface DeliveryListener {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon May 19 06:06:37 2008
@@ -252,7 +252,24 @@
                                         + ack);
                     }
                 }
-            } else if (ack.isDeliveredAck()) {
+            } else if (ack.isIndividualAck()) {
+                // Message was delivered and acknowledge - but only delete the
+                // individual message
+                for (final MessageReference node : dispatched) {
+                    MessageId messageId = node.getMessageId();
+                    if (ack.getLastMessageId().equals(messageId)) {
+                        // this should never be within a transaction
+                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                        destination = node.getRegionDestination();
+                        acknowledge(context, ack, node);
+                        dispatched.remove(node);
+                        prefetchExtension = Math.max(0, prefetchExtension - 1);
+                        callDispatchMatched = true;
+                        break;
+                    }
+                }
+            }else if (ack.isDeliveredAck()) {
                 // Message was delivered but not acknowledged: update pre-fetch
                 // counters.
                 // Acknowledge all dispatched messages up till the message id of

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon May 19 06:06:37 2008
@@ -182,7 +182,7 @@
 
     public synchronized void acknowledge(final ConnectionContext context, final MessageAck
ack) throws Exception {
         // Handle the standard acknowledgment case.
-        if (ack.isStandardAck() || ack.isPoisonAck()) {
+        if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
             if (context.isInTransaction()) {
                 context.getTransaction().addSynchronization(new Synchronization() {
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
Mon May 19 06:06:37 2008
@@ -54,6 +54,11 @@
      */
     public static final byte REDELIVERED_ACK_TYPE = 3;
     
+    /**
+     * The  ack case where a client wants only an individual message to be discarded.
+     */
+    public static final byte INDIVIDUAL_ACK_TYPE = 4;
+    
     protected byte ackType;
     protected ConsumerId consumerId;
     protected MessageId firstMessageId;
@@ -108,6 +113,10 @@
     public boolean isRedeliveredAck() {
         return ackType == REDELIVERED_ACK_TYPE;
     }
+    
+    public boolean isIndividualAck() {
+        return ackType == INDIVIDUAL_ACK_TYPE;
+    }
 
     /**
      * @openwire:property version=1 cache=true

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java?rev=657817&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
Mon May 19 06:06:37 2008
@@ -0,0 +1,144 @@
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JMSIndividualAckTest extends TestSupport {
+
+    private Connection connection;
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        connection = createConnection();
+    }
+
+    /**
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testAckedMessageAreConsumed() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+
+        session.close();
+    }
+
+    /**
+     * Tests if acknowledged messages are being consumed.
+     *
+     * @throws JMSException
+     */
+    public void testLastMessageAcked() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage msg1 = session.createTextMessage("msg1");
+        TextMessage msg2 = session.createTextMessage("msg2");
+        TextMessage msg3 = session.createTextMessage("msg3");
+        producer.send(msg1);
+        producer.send(msg2);
+        producer.send(msg3);
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        // Reset the session.
+        session.close();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals(msg1,msg);
+        msg = consumer.receive(1000);
+        assertNotNull(msg);
+        assertEquals(msg2,msg);
+        msg = consumer.receive(1000);
+        assertNull(msg);
+        session.close();
+    }
+    
+    /**
+     * Tests if unacknowledged messages are being re-delivered when the consumer connects
again.
+     * 
+     * @throws JMSException
+     */
+    public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
+        connection.start();
+        Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+        Queue queue = session.createQueue(getQueueName());
+        MessageProducer producer = session.createProducer(queue);
+        producer.send(session.createTextMessage("Hello"));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);        
+        // Don't ack the message.
+        
+        // Reset the session.  This should cause the unacknowledged message to be re-delivered.
+        session.close();
+        session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+                
+        // Attempt to Consume the message...
+        consumer = session.createConsumer(queue);
+        msg = consumer.receive(2000);
+        assertNotNull(msg);        
+        msg.acknowledge();
+        
+        session.close();
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + getName();
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message