qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r587525 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/broker: BrokerAdapter.cpp Consumer.h Queue.cpp Queue.h SemanticState.cpp SemanticState.h
Date Tue, 23 Oct 2007 14:50:58 GMT
Author: gsim
Date: Tue Oct 23 07:50:56 2007
New Revision: 587525

URL: http://svn.apache.org/viewvc?rev=587525&view=rev
Log:
Hack for no-local when used with jms topics
Fix for releasing of exclusive ownership of queues second time around


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Tue Oct 23 07:50:56 2007
@@ -202,8 +202,8 @@
 		getConnection().exclusiveQueues.push_back(queue);
 	    }
 	} else {
-            if (exclusive && !queue->hasExclusiveOwner()) {
-                queue->setExclusiveOwner(&getConnection());
+            if (exclusive && queue->setExclusiveOwner(&getConnection())) {
+		getConnection().exclusiveQueues.push_back(queue);
             }
         }
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Tue Oct 23 07:50:56 2007
@@ -46,6 +46,7 @@
             Consumer(bool preAcquires = true) : acquires(preAcquires) {}
             bool preAcquires() const { return acquires; }
             virtual bool deliver(QueuedMessage& msg) = 0;
+            virtual bool filter(Message::shared_ptr) { return true; }
             virtual ~Consumer(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 23 07:50:56 2007
@@ -151,15 +151,33 @@
     serializer.execute(f);
 }
 
+/**
+ * Return true if the message can be excluded. This is currently the
+ * case if the queue has an exclusive consumer that will never want
+ * the message, or if the queue is exclusive to a single connection
+ * and has a single consumer (covers the JMS topic case).
+ */
+bool Queue::exclude(Message::shared_ptr msg)
+{
+    RWlock::ScopedWlock locker(consumerLock);
+    if (exclusive) {
+        return !exclusive->filter(msg);
+    } else if (hasExclusiveOwner() && acquirers.size() == 1) {
+        return !acquirers[0]->filter(msg);
+    } else {
+        return false;
+    }
+}
+
 Consumer::ptr Queue::allocate()
 {
     RWlock::ScopedWlock locker(consumerLock);
  
-    if(acquirers.empty()){
+    if (acquirers.empty()) {
         return Consumer::ptr();
-    }else if(exclusive){
+    } else if (exclusive){
         return exclusive;
-    }else{
+    } else {
         next = next % acquirers.size();
         return acquirers[next++];
     }
@@ -171,9 +189,9 @@
     //request, so won't result in anyone being missed
     uint counter = getAcquirerCount();
     Consumer::ptr c = allocate();
-    while(c && counter--){
-        if(c->deliver(msg)) {
-            return true;            
+    while (c && counter--){
+        if (c->deliver(msg)) {
+            return true;
         } else {
             c = allocate();
         }
@@ -181,22 +199,31 @@
     return false;
 }
 
-void Queue::dispatch(){
+bool Queue::getNextMessage(QueuedMessage& msg)
+{
+    Mutex::ScopedLock locker(messageLock);
+    if (messages.empty()) { 
+        QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+        return false;
+    } else {
+        msg = messages.front();
+        return true;
+    }
+}
+
+void Queue::dispatch()
+{
      QueuedMessage msg;
-     while(true){
-        {
-	    Mutex::ScopedLock locker(messageLock);
-	    if (messages.empty()) { 
-                QPID_LOG(debug, "No messages to dispatch on queue '" << name <<
"'");
-                break; 
-            }
-	    msg = messages.front();
-	}
-        if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
-            pop();
-        } else {            
-            break;
-        }	
+     while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
+         if (dispatch(msg)) {
+             pop();
+         } else if (exclude(msg.payload)) {
+             pop();
+             dequeue(0, msg.payload);
+             QPID_LOG(debug, "Message " << msg.payload << " filtered out of "
<< name << "[" << this << "]");        
+         } else {            
+             break;
+         }	
      }
      serviceAllBrowsers();
 }
@@ -478,4 +505,38 @@
         queue->destroy();
     }
 
+}
+
+bool Queue::isExclusiveOwner(const ConnectionToken* const o) const 
+{ 
+    Mutex::ScopedLock locker(ownershipLock);
+    return o == owner; 
+}
+
+void Queue::releaseExclusiveOwnership() 
+{ 
+    Mutex::ScopedLock locker(ownershipLock);
+    owner = 0; 
+}
+
+bool Queue::setExclusiveOwner(const ConnectionToken* const o) 
+{ 
+    Mutex::ScopedLock locker(ownershipLock);
+    if (owner) {
+        return false;
+    } else {
+        owner = o; 
+        return true;
+    }
+}
+
+bool Queue::hasExclusiveOwner() const 
+{ 
+    Mutex::ScopedLock locker(ownershipLock);
+    return owner != 0; 
+}
+
+bool Queue::hasExclusiveConsumer() const 
+{ 
+    return exclusive; 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct 23 07:50:56 2007
@@ -85,6 +85,7 @@
             int next;
             mutable qpid::sys::RWlock consumerLock;
             mutable qpid::sys::Mutex messageLock;
+            mutable qpid::sys::Mutex ownershipLock;
             Consumer::ptr exclusive;
             mutable uint64_t persistenceId;
             framing::FieldTable settings;
@@ -110,6 +111,8 @@
             Consumer::ptr allocate();
             bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
             uint32_t getAcquirerCount() const;
+            bool getNextMessage(QueuedMessage& msg);
+            bool exclude(Message::shared_ptr msg);
  
         protected:
 	   /**
@@ -172,11 +175,11 @@
             uint32_t getMessageCount() const;
             uint32_t getConsumerCount() const;
             inline const string& getName() const { return name; }
-            inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return
o == owner; }
-            inline void releaseExclusiveOwnership() { owner = 0; }
-            inline void setExclusiveOwner(const ConnectionToken* const o) { owner = o; }
-            inline bool hasExclusiveConsumer() const { return exclusive; }
-            inline bool hasExclusiveOwner() const { return owner != 0; }
+            bool isExclusiveOwner(const ConnectionToken* const o) const;
+            void releaseExclusiveOwnership();
+            bool setExclusiveOwner(const ConnectionToken* const o);
+            bool hasExclusiveConsumer() const;
+            bool hasExclusiveOwner() const;
             inline bool isDurable() const { return store != 0; }
             inline const framing::FieldTable& getSettings() const { return settings;
}
             inline bool isAutoDelete() const { return autodelete; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct 23 07:50:56 2007
@@ -260,10 +260,18 @@
                 parent->deliveryAdapter.deliver(msg.payload, token);
             if (windowing || ackExpected) {
                 parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire,
!ackExpected));
+            } else if (!ackExpected) {
+                queue->dequeue(0, msg.payload);
             }
         }
         return !blocked;
     }
+}
+
+bool SemanticState::ConsumerImpl::filter(Message::shared_ptr msg)
+{
+    return !(nolocal &&
+             &parent->getSession().getConnection() == msg->getPublisher());
 }
 
 bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Oct 23 07:50:56 2007
@@ -78,6 +78,7 @@
                      bool ack, bool nolocal, bool acquire);
         ~ConsumerImpl();
         bool deliver(QueuedMessage& msg);            
+        bool filter(Message::shared_ptr msg);            
 
         void setWindowMode();
         void setCreditMode();



Mime
View raw message