qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r585655 - in /incubator/qpid/branches/M2: ./ java/broker/src/main/java/org/apache/qpid/server/queue/ java/broker/src/main/java/org/apache/qpid/server/txn/ java/systests/src/main/java/org/apache/qpid/server/txn/
Date Wed, 17 Oct 2007 19:59:59 GMT
Author: rgodfrey
Date: Wed Oct 17 12:59:58 2007
New Revision: 585655

URL: http://svn.apache.org/viewvc?rev=585655&view=rev
Log:
Merged revisions 573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-581967,581969-582197,582199-582200,582203-582204,582206-582262,582264,582267-583084,583087,583089-583104,583106-583146,583148-583153,583155-583169,583171-583172,583174-583398,583400-583414,583416-583417,583419-583437,583439-583482,583484-583517,583519-583545,583547,583549-
 583774,583777-583807,583809-583881,583883-584107,584109-584112,584114-584123,584125-585653
via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r585565 | ritchiem | 2007-10-17 17:39:20 +0100 (Wed, 17 Oct 2007) | 1 line
  
  QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages that have been
taken.
........
  r585570 | ritchiem | 2007-10-17 17:48:01 +0100 (Wed, 17 Oct 2007) | 1 line
  
  Update to AMQMessage to reset the deliveredToConsumer flag(false) when the message is released.
This flag is now used by more than the immediate delivery. It is also used to understand if
the message has been delivered so that we can tell the message should not be purged.
........
  r585575 | ritchiem | 2007-10-17 17:59:42 +0100 (Wed, 17 Oct 2007) | 1 line
  
  QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async process if a msg
is queued that has the potential to be delivered.
........
  r585642 | rgodfrey | 2007-10-17 20:42:14 +0100 (Wed, 17 Oct 2007) | 1 line
  
  QPID-645 : TxnBuffer should rethrow exceptions encountered on commit
........

Modified:
    incubator/qpid/branches/M2/   (props changed)
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java

Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Oct 17 12:59:58 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-584113,584124
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-585653

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Wed Oct 17 12:59:58 2007
@@ -133,7 +133,7 @@
     public boolean isReferenced()
     {
         return _referenceCount.get() > 0;
-    }    
+    }
 
     /**
      * Used to iterate through all the body frames associated with this message. Will not
keep all the data in memory
@@ -558,6 +558,7 @@
                 taken.set(false);
             }
 
+            _deliveredToConsumer = false;
             _takenMap.put(queue, taken);
             _takenBySubcriptionMap.put(queue, null);
         }
@@ -694,7 +695,10 @@
         return false;
     }
 
-    /** Called when this message is delivered to a consumer. (used to implement the 'immediate'
flag functionality). */
+    /**
+     * Called when this message is delivered to a consumer. (used to implement the 'immediate'
flag functionality).
+     * And for selector efficiency.
+     */
     public void setDeliveredToConsumer()
     {
         _deliveredToConsumer = true;

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Oct 17 12:59:58 2007
@@ -212,6 +212,15 @@
     }
 
     /**
+     *
+     * @return the state of the async processor.
+     */
+    public boolean isProcessingAsync()
+    {
+        return _processing.get();
+    }
+
+    /**
      * Returns all the messages in the Queue
      *
      * @return List of messages
@@ -821,6 +830,12 @@
                 {
                     addMessageToQueue(msg, deliverFirst);
 
+                    //if  we have a non-filtering subscriber but queued messages &&
we're not Async && we have other Active subs then something is wrong!
+                    if ((s != null && hasQueuedMessages()) && !isProcessingAsync()
&& _subscriptions.hasActiveSubscribers())
+                    {
+                        _queue.deliverAsync();
+                    }
+
                     //release lock now message is on queue.
                     _lock.unlock();
 
@@ -883,7 +898,36 @@
                             _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity()
+ " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
-                        msg.taken(_queue, s);
+
+                        if (msg.taken(_queue, s))
+                        {
+                            //Message has been delivered so don't redeliver.
+                            // This can currently occur because of the recursive call below
+                            // During unit tests the send can occur
+                            // client then rejects
+                            // this reject then releases the message by the time the
+                            // if(!msg.isTaken()) call is made below
+                            // the message has been released so that thread loops to send
the message again
+                            // of course by the time it gets back to here. the thread that
released the
+                            // message is now ready to send it. Here is a sample trace for
reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 msg:(HC:5529738 ID:145
Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message by:[channel=Channel: id 1,
transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel: id 1,
transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, session=anonymous(5050419),
resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: Message((HC:5529738 ID:145
Ref:1)) has not been taken so recursing!: Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues: {Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false}
by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 msg:(HC:5529738 ID:145
Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]:this:Message[(HC:5529738 ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, session=anonymous(26960027),
resendQueue=false]}
+                            // Note: In the last request to take the message from thread
4,5 the message has been
+                            // taken by the previous call done by thread 2,5
+
+
+                            return;
+                        }
                         //Deliver the message
                         s.send(msg, _queue);
                     }
@@ -897,6 +941,10 @@
                     }
                 }
 
+                //
+                // Why do we do this? What was the reasoning? We should have a better approach
+                // than recursion and rejecting if someone else sends it before we do.
+                //
                 if (!msg.isTaken(_queue))
                 {
                     if (debugEnabled)
@@ -942,6 +990,8 @@
     {
         public void run()
         {
+            String startName = Thread.currentThread().getName();
+            Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
             boolean running = true;
             while (running && !_movingMessages.get())
             {
@@ -957,6 +1007,7 @@
                     _processing.set(false);
                 }
             }
+            Thread.currentThread().setName(startName);
         }
     }
 
@@ -983,8 +1034,9 @@
 
     private String currentStatus()
     {
-        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") +
-               "(" + _messages.size() + ":" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()
+ ") " +
+        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") +
+               "(" + ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() +
+               ":" + (_messages.size() - ((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize())
+ ") " +
                " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
                "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
                " Active:" + _subscriptions.hasActiveSubscribers() +

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
(original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
Wed Oct 17 12:59:58 2007
@@ -54,7 +54,7 @@
         _ops.clear();
     }
 
-    private boolean prepare(StoreContext context)
+    private boolean prepare(StoreContext context) throws AMQException
     {
         for (int i = 0; i < _ops.size(); i++)
         {
@@ -63,19 +63,31 @@
             {
                 op.prepare(context);
             }
-            catch (Exception e)
+            catch (AMQException e)
             {
-                //compensate previously prepared ops
-                for (int j = 0; j < i; j++)
-                {
-                    _ops.get(j).undoPrepare();
-                }
-                return false;
+                undoPrepare(i);
+                throw e;
+            }
+            catch (RuntimeException e)
+            {
+                undoPrepare(i);
+                throw e;
             }
         }
         return true;
     }
 
+    private void undoPrepare(int lastPrepared)
+    {
+        //compensate previously prepared ops
+        for (int j = 0; j < lastPrepared; j++)
+        {
+            _ops.get(j).undoPrepare();
+        }
+    }
+
+	
+	
     public void rollback(StoreContext context) throws AMQException
     {
         for (TxnOp op : _ops)

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
(original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
Wed Oct 17 12:59:58 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.store.StoreContext;
 
 import java.util.LinkedList;
+import java.util.NoSuchElementException;
 
 public class TxnBufferTest extends TestCase
 {
@@ -78,7 +79,16 @@
         buffer.enlist(new FailedPrepare());
         buffer.enlist(new MockOp());
 
-        buffer.commit(null);
+		try
+        {
+            buffer.commit(null);
+            
+        }
+        catch (NoSuchElementException e)
+        {
+
+        }
+
         validateOps();
         store.validate();
     }



Mime
View raw message