qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ritch...@apache.org
Subject svn commit: r585565 - /incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Date Wed, 17 Oct 2007 16:39:22 GMT
Author: ritchiem
Date: Wed Oct 17 09:39:20 2007
New Revision: 585565

URL: http://svn.apache.org/viewvc?rev=585565&view=rev
Log:
QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages that have been taken.

Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585565&r1=585564&r2=585565&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Oct 17 09:39:20 2007
@@ -883,7 +883,36 @@
                             _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity()
+ " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
-                        msg.taken(_queue, s);
+
+                        if (msg.taken(_queue, s))
+                        {
+                            //Message has been delivered so don't redeliver.
+                            // This can currently occur because of the recursive call below
+                            // During unit tests the send can occur
+                            // client then rejects
+                            // this reject then releases the message by the time the
+                            // if(!msg.isTaken()) call is made below
+                            // the message has been released so that thread loops to send
the message again
+                            // of course by the time it gets back to here. the thread that
released the
+                            // message is now ready to send it. Here is a sample trace for
reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145
Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1,
transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1,
transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145
Ref:1)) has not been taken so recursing!: Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145
Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]}
+                            // Note: In the last request to take the message from thread
4,5 the message has been
+                            // taken by the previous call done by thread 2,5
+
+
+                            return;
+                        }
                         //Deliver the message
                         s.send(msg, _queue);
                     }
@@ -897,6 +926,10 @@
                     }
                 }
 
+                //
+                // Why do we do this? What was the reasoning? We should have a better approach
+                // than recursion and rejecting if someone else sends it before we do.
+                //
                 if (!msg.isTaken(_queue))
                 {
                     if (debugEnabled)



Mime
View raw message