activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r552713 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQMessageConsumer.java ActiveMQSession.java ActiveMQSessionExecutor.java MessageDispatchChannel.java
Date Tue, 03 Jul 2007 08:19:34 GMT
Author: rajdavies
Date: Tue Jul  3 01:19:33 2007
New Revision: 552713

URL: http://svn.apache.org/viewvc?view=rev&rev=552713
Log:
Added duplicate detection on the client

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=552713&r1=552712&r2=552713
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Tue Jul  3 01:19:33 2007
@@ -178,6 +178,7 @@
     // version when a WireFormatInfo is received.
     private AtomicInteger protocolVersion=new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
     private long timeCreated;
+    private ConnectionAudit connectionAudit = new ConnectionAudit();
 
     /**
      * Construct an <code>ActiveMQConnection</code>
@@ -210,6 +211,7 @@
         this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
         this.factoryStats.addConnection(this);
         this.timeCreated = System.currentTimeMillis();
+        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
     }
 
 
@@ -947,6 +949,7 @@
      */
     protected void removeSession(ActiveMQSession session) {
         this.sessions.remove(session);
+        this.removeDispatcher(session);
     }
 
     /**
@@ -966,6 +969,7 @@
      */
     protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer)
{
         this.connectionConsumers.remove(connectionConsumer);
+        this.removeDispatcher(connectionConsumer);
     }
 
     /**
@@ -2083,6 +2087,18 @@
 	public void setProducerWindowSize(int producerWindowSize) {
 		this.producerWindowSize = producerWindowSize;
 	}
+    
+    protected void removeDispatcher(ActiveMQDispatcher dispatcher){
+       connectionAudit.removeDispatcher(dispatcher);
+    }
+
+    protected boolean isDuplicate(ActiveMQDispatcher dispatcher,Message message){
+       return connectionAudit.isDuplicate(dispatcher,message);
+    }
+
+    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher,Message message){
+       connectionAudit.rollbackDuplicate(dispatcher,message);
+    }
 
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=552713&r1=552712&r2=552713
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Jul  3 01:19:33 2007
@@ -36,6 +36,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -89,7 +90,7 @@
     // The are the messages that were delivered to the consumer but that have
     // not been acknowledged. It's kept in reverse order since we
     // Always walk list in reverse order. Only used when session is client ack.
-    private final LinkedList deliveredMessages = new LinkedList();
+    private final LinkedList <MessageDispatch>deliveredMessages = new LinkedList<MessageDispatch>();
     private int deliveredCounter = 0;
     private int additionalWindowSize = 0;
     private int rollbackCounter = 0;
@@ -342,7 +343,7 @@
             if (wasRunning)
                 session.stop();
 
-            session.redispatch(unconsumedMessages);
+            session.redispatch(this,unconsumedMessages);
 
             if (wasRunning)
                 session.start();
@@ -574,7 +575,7 @@
         if(deliveryingAcknowledgements.compareAndSet(false,true)){
             if(this.optimizeAcknowledge){
                 if(!deliveredMessages.isEmpty()){
-                    MessageDispatch md=(MessageDispatch) deliveredMessages.getFirst();
+                    MessageDispatch md=deliveredMessages.getFirst();
                     ack=new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
                     deliveredMessages.clear();
                     ackCounter=0;
@@ -602,24 +603,39 @@
         }
     }
 
-    public void dispose() throws JMSException {
-        if (!unconsumedMessages.isClosed()) {
+    public void dispose() throws JMSException{
+        if(!unconsumedMessages.isClosed()){
             // Do we have any acks we need to send out before closing?
             // Ack any delivered messages now. (session may still
             // commit/rollback the acks).
-            deliverAcks();//only processes optimized acknowledgements
-            if (executorService!=null){
+            deliverAcks();// only processes optimized acknowledgements
+            if(executorService!=null){
                 executorService.shutdown();
-                try {
-                    executorService.awaitTermination(60, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
+                try{
+                    executorService.awaitTermination(60,TimeUnit.SECONDS);
+                }catch(InterruptedException e){
                     Thread.currentThread().interrupt();
                 }
             }
-            if ((session.isTransacted() || session.isDupsOkAcknowledge())) {
+            if((session.isTransacted()||session.isDupsOkAcknowledge())){
                 acknowledge();
             }
+            if (session.isClientAcknowledge()) {
+                if(!this.info.isBrowser()){
+                    // rollback duplicates that aren't acknowledged
+                    for(MessageDispatch old:deliveredMessages){
+                        session.connection.rollbackDuplicate(this,old.getMessage());
+                    }
+                }
+            }
             deliveredMessages.clear();
+            List<MessageDispatch> list=unconsumedMessages.removeAll();
+            if(!this.info.isBrowser()){
+                for(MessageDispatch old:list){
+                    // ensure we don't filter this as a duplicate
+                    session.connection.rollbackDuplicate(this,old.getMessage());
+                }
+            }
             unconsumedMessages.close();
             this.session.removeConsumer(this);
         }
@@ -766,7 +782,7 @@
             return;
 
         // Acknowledge the last message.
-        MessageDispatch lastMd = (MessageDispatch) deliveredMessages.get(0);
+        MessageDispatch lastMd = deliveredMessages.get(0);
         MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
         if (session.isTransacted()) {
             session.doStartTransaction();
@@ -793,8 +809,12 @@
         synchronized(unconsumedMessages.getMutex()){
             if(optimizeAcknowledge){
                 // remove messages read but not acked at the broker yet through optimizeAcknowledge
-                for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
-                    deliveredMessages.removeLast();
+                if(!this.info.isBrowser()){
+                    for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
+                        // ensure we don't filter this as a duplicate
+                        MessageDispatch md=deliveredMessages.removeLast();
+                        session.connection.rollbackDuplicate(this,md.getMessage());
+                    }
                 }
             }
             if(deliveredMessages.isEmpty())
@@ -810,9 +830,11 @@
                 // We need to NACK the messages so that they get sent to the
                 // DLQ.
                 // Acknowledge the last message.
-                MessageDispatch lastMd=(MessageDispatch) deliveredMessages.get(0);
+                MessageDispatch lastMd=deliveredMessages.get(0);
                 MessageAck ack=new MessageAck(lastMd,MessageAck.POSION_ACK_TYPE,deliveredMessages.size());
                 session.asyncSendPacket(ack);
+                //ensure we don't filter this as a duplicate
+                session.connection.rollbackDuplicate(this,lastMd.getMessage()); 
                 // Adjust the window size.
                 additionalWindowSize=Math.max(0,additionalWindowSize-deliveredMessages.size());
                 rollbackCounter=0;
@@ -848,50 +870,63 @@
             deliveredMessages.clear();
         }
         if(messageListener!=null){
-            session.redispatch(unconsumedMessages);
+            session.redispatch(this,unconsumedMessages);
         }
     }
 
-    public void dispatch(MessageDispatch md) {
-        MessageListener listener = this.messageListener;
-        try {
+    public void dispatch(MessageDispatch md){
+        MessageListener listener=this.messageListener;
+        try{
             synchronized(unconsumedMessages.getMutex()){
-                if (clearDispatchList) {
+                if(clearDispatchList){
                     // we are reconnecting so lets flush the in progress messages
-                    clearDispatchList = false;
-                    unconsumedMessages.clear();
+                    clearDispatchList=false;
+                    List<MessageDispatch> list=unconsumedMessages.removeAll();
+                    if(!this.info.isBrowser()){
+                        for(MessageDispatch old:list){
+                            // ensure we don't filter this as a duplicate
+                            session.connection.rollbackDuplicate(this,old.getMessage());
+                        }
+                    }
+                }
+                if(!unconsumedMessages.isClosed()){
+                    if(this.info.isBrowser() || session.connection.isDuplicate(this,md.getMessage())==false){
+                        if(listener!=null&&unconsumedMessages.isRunning()){
+                            ActiveMQMessage message=createActiveMQMessage(md);
+                            beforeMessageIsConsumed(md);
+                            try{
+                                listener.onMessage(message);
+                                afterMessageIsConsumed(md,false);
+                            }catch(RuntimeException e){
+                                if(session.isDupsOkAcknowledge()||session.isAutoAcknowledge()){
+                                    // Redeliver the message
+                                }else{
+                                    // Transacted or Client ack: Deliver the next message.
+                                    afterMessageIsConsumed(md,false);
+                                }
+                                log.error("Exception while processing message: "+e,e);
+                            }
+                        }else{
+                            unconsumedMessages.enqueue(md);
+                            if(availableListener!=null){
+                                availableListener.onMessageAvailable(this);
+                            }
+                        }
+                    }else {
+                        //ignore duplicate
+                        if (log.isDebugEnabled()) {
+                            log.debug("Ignoring Duplicate: " + md.getMessage());
+                        }
+                        ackLater(md,MessageAck.STANDARD_ACK_TYPE);
+                    }
                 }
-
-                if (!unconsumedMessages.isClosed()) {
-	                if (listener != null && unconsumedMessages.isRunning() ) {
-	                    ActiveMQMessage message = createActiveMQMessage(md);
-	                    beforeMessageIsConsumed(md);
-	                    try {
-	                        listener.onMessage(message);
-	                        afterMessageIsConsumed(md, false);
-	                    } catch (RuntimeException e) {
-	                        if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge()
) {
-	                            // Redeliver the message
-	                        } else {
-	                            // Transacted or Client ack: Deliver the next message.
-	                            afterMessageIsConsumed(md, false);
-	                        }
-	                        log.error("Exception while processing message: " + e, e);
-	                    }
-	                } else {
-	                    unconsumedMessages.enqueue(md);
-	                    if (availableListener != null) {
-	                        availableListener.onMessageAvailable(this);
-	                    }
-	                }
-	            }
             }
-            if (++dispatchedCount%1000==0) {
+            if(++dispatchedCount%1000==0){
                 dispatchedCount=0;
-            Thread.yield();
+                Thread.yield();
             }
-        } catch (Exception e) {
-        	session.connection.onAsyncException(e);
+        }catch(Exception e){
+            session.connection.onAsyncException(e);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=552713&r1=552712&r2=552713
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Tue
Jul  3 01:19:33 2007
@@ -754,7 +754,7 @@
         while ((messageDispatch = executor.dequeueNoWait()) != null) {
             final MessageDispatch md = messageDispatch;
             ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
-            if( message.isExpired() ) {
+            if( message.isExpired() || connection.isDuplicate(ActiveMQSession.this,message))
{
                 //TODO: Ack it without delivery to client
                 continue;
             }
@@ -785,39 +785,35 @@
                 ack.setFirstMessageId(md.getMessage().getMessageId());
                 doStartTransaction();
                 ack.setTransactionId(getTransactionContext().getTransactionId());
-                if( ack.getTransactionId()!=null ) {
+                if(ack.getTransactionId()!=null){
                     getTransactionContext().addSynchronization(new Synchronization(){
-                        public void afterRollback() throws Exception {
 
+                        public void afterRollback() throws Exception{
                             md.getMessage().onMessageRolledBack();
-                            
-                            RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
-                            int redeliveryCounter = md.getMessage().getRedeliveryCounter();
-                            if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
-                            		&& redeliveryCounter > redeliveryPolicy.getMaximumRedeliveries())
{
-                                
+                            // ensure we don't filter this as a duplicate
+                            connection.rollbackDuplicate(ActiveMQSession.this,md.getMessage());
+                            RedeliveryPolicy redeliveryPolicy=connection.getRedeliveryPolicy();
+                            int redeliveryCounter=md.getMessage().getRedeliveryCounter();
+                            if(redeliveryPolicy.getMaximumRedeliveries()!=RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
+                                    &&redeliveryCounter>redeliveryPolicy.getMaximumRedeliveries()){
                                 // We need to NACK the messages so that they get sent to
the
                                 // DLQ.
-
                                 // Acknowledge the last message.
-                                MessageAck ack = new MessageAck(md,MessageAck.POSION_ACK_TYPE,1);
+                                MessageAck ack=new MessageAck(md,MessageAck.POSION_ACK_TYPE,1);
                                 ack.setFirstMessageId(md.getMessage().getMessageId());
                                 asyncSendPacket(ack);
-
-                            } else {
-                                
+                            }else{
                                 // Figure out how long we should wait to resend this message.
                                 long redeliveryDelay=0;
-                                for( int i=0; i < redeliveryCounter; i++) {
-                                    redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+                                for(int i=0;i<redeliveryCounter;i++){
+                                    redeliveryDelay=redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
                                 }
-                                
-                                Scheduler.executeAfterDelay(new Runnable() {
-                                    public void run() {
+                                Scheduler.executeAfterDelay(new Runnable(){
+
+                                    public void run(){
                                         ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
                                     }
-                                }, redeliveryDelay);
-                                
+                                },redeliveryDelay);
                             }
                         }
                     });
@@ -1499,6 +1495,7 @@
             stats.onRemoveDurableSubscriber();
         }
         this.consumers.remove(consumer);
+        this.connection.removeDispatcher(consumer);
     }
 
     /**
@@ -1765,9 +1762,12 @@
         return deliveryIdGenerator.getNextSequenceId();
     }
 
-    public void redispatch(MessageDispatchChannel unconsumedMessages) throws JMSException
{
+    public void redispatch(ActiveMQDispatcher dispatcher,MessageDispatchChannel unconsumedMessages)
throws JMSException {
         
-        List c = unconsumedMessages.removeAll();
+        List <MessageDispatch>c = unconsumedMessages.removeAll();
+        for (MessageDispatch md: c) {
+            this.connection.rollbackDuplicate(dispatcher,md.getMessage());
+        }
         Collections.reverse(c);
         
         for (Iterator iter = c.iterator(); iter.hasNext();) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?view=diff&rev=552713&r1=552712&r2=552713
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Tue Jul  3 01:19:33 2007
@@ -57,6 +57,7 @@
     
 
     void execute(MessageDispatch message) throws InterruptedException {
+        
         if (!startedOrWarnedThatNotStarted) {
 
             ActiveMQConnection connection = session.connection;
@@ -119,6 +120,7 @@
             ConsumerId consumerId = message.getConsumerId();
             if( consumerId.equals(consumer.getConsumerId()) ) {
                 consumer.dispatch(message);
+                break;
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java?view=diff&rev=552713&r1=552712&r2=552713
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java
Tue Jul  3 01:19:33 2007
@@ -28,12 +28,12 @@
 public class MessageDispatchChannel {
 
     private final Object mutex = new Object();
-    private final LinkedList list;
+    private final LinkedList<MessageDispatch> list;
     private boolean closed;
     private boolean running;
 
     public MessageDispatchChannel() {
-        this.list = new LinkedList();
+        this.list = new LinkedList<MessageDispatch>();
     }
 
     public void enqueue(MessageDispatch message) {
@@ -84,7 +84,7 @@
             if (closed || !running || list.isEmpty()) {
                 return null;
             }
-            return (MessageDispatch) list.removeFirst();
+            return list.removeFirst();
         }
     }
     
@@ -93,7 +93,7 @@
             if (closed || !running || list.isEmpty()) {
                 return null;
             }
-            return (MessageDispatch) list.removeFirst();
+            return list.removeFirst();
         }
     }
     
@@ -102,7 +102,7 @@
             if (closed || !running || list.isEmpty()) {
                 return null;
             }
-            return (MessageDispatch) list.getFirst();
+            return list.getFirst();
         }
     }
 
@@ -154,9 +154,9 @@
         return running;
     }
 
-    public List removeAll() {
+    public List<MessageDispatch> removeAll() {
         synchronized(mutex) {
-            ArrayList rc = new ArrayList(list);
+            ArrayList <MessageDispatch>rc = new ArrayList<MessageDispatch>(list);
             list.clear();
             return rc;
         }



Mime
View raw message