qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1412242 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/Session.cpp messaging/amqp/SenderContext.cpp
Date Wed, 21 Nov 2012 19:31:29 GMT
Author: gsim
Date: Wed Nov 21 19:31:28 2012
New Revision: 1412242

URL: http://svn.apache.org/viewvc?rev=1412242&view=rev
Log:
QPID-4452: fix credit for producers

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1412242&r1=1412241&r2=1412242&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Nov 21 19:31:28 2012
@@ -53,32 +53,33 @@ namespace amqp {
 class Target
 {
   public:
+    Target(pn_link_t* l) : credit(100), window(0), link(l) {}
     virtual ~Target() {}
-    virtual void flow() = 0;
+    bool flow();
+    bool needFlow();
     virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper
message
-  private:
+  protected:
+    const uint32_t credit;
+    uint32_t window;
+    pn_link_t* link;
 };
 
 class Queue : public Target
 {
   public:
-    Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : queue(q), link(l)
{}
-    void flow();
+    Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q)
{}
     void handle(qpid::broker::Message& m);
   private:
     boost::shared_ptr<qpid::broker::Queue> queue;
-    pn_link_t* link;
 };
 
 class Exchange : public Target
 {
   public:
-    Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : exchange(e),
link(l) {}
-    void flow();
+    Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l),
exchange(e) {}
     void handle(qpid::broker::Message& m);
   private:
     boost::shared_ptr<qpid::broker::Exchange> exchange;
-    pn_link_t* link;
 };
 
 Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c,
qpid::sys::OutputControl& o)
@@ -169,11 +170,9 @@ void Session::attach(pn_link_t* link)
         if (node.queue) {
             boost::shared_ptr<Target> q(new Queue(node.queue, link));
             targets[link] = q;
-            q->flow();
         } else if (node.exchange) {
             boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
             targets[link] = e;
-            e->flow();
         } else {
             pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
             throw qpid::Exception("Node not found: " + name);/*not-found*/
@@ -253,7 +252,7 @@ void Session::incoming(pn_link_t* link, 
         received->begin();
         Transfer t(delivery, shared_from_this());
         received->end(t);
-        target->second->flow();
+        if (target->second->needFlow()) out.activateOutput();
     }
 }
 void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery)
@@ -283,6 +282,9 @@ bool Session::dispatch()
             accepted(*i, true);
         }
     }
+    for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) {
+        if (t->second->flow()) output = true;
+    }
 
     return output;
 }
@@ -299,24 +301,32 @@ void Session::close()
     deleted = true;
 }
 
-void Queue::flow()
+void Queue::handle(qpid::broker::Message& message)
 {
-    pn_link_flow(link, 1);//TODO: proper flow control
+    queue->deliver(message);
+    --window;
 }
 
-void Queue::handle(qpid::broker::Message& message)
+void Exchange::handle(qpid::broker::Message& message)
 {
-    queue->deliver(message);
+    DeliverableMessage deliverable(message, 0);
+    exchange->route(deliverable);
+    --window;
 }
 
-void Exchange::flow()
+bool Target::flow()
 {
-    pn_link_flow(link, 1);//TODO: proper flow control
+    bool issue = window < credit;
+    if (issue) {
+        pn_link_flow(link, credit - window);//TODO: proper flow control
+        window = credit;
+    }
+    return issue;
 }
 
-void Exchange::handle(qpid::broker::Message& message)
+bool Target::needFlow()
 {
-    DeliverableMessage deliverable(message, 0);
-    exchange->route(deliverable);
+    return window <= (credit/2);
 }
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1412242&r1=1412241&r2=1412242&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Wed Nov 21 19:31:28 2012
@@ -80,7 +80,7 @@ const std::string& SenderContext::getTar
 
 SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
 {
-    if (processUnsettled() < capacity) {
+    if (processUnsettled() < capacity && pn_link_credit(sender)) {
         deliveries.push_back(Delivery(nextId++));
         Delivery& delivery = deliveries.back();
         delivery.encode(MessageImplAccess::get(message), address);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message