qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r592869 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/client/ qpid/sys/ tests/
Date Wed, 07 Nov 2007 19:57:52 GMT
Author: aconway
Date: Wed Nov  7 11:57:46 2007
New Revision: 592869

URL: http://svn.apache.org/viewvc?rev=592869&view=rev
Log:


client::SubscriptionManager:
 - Added autoStop support.
 - Added LocalQueue subscriptions.
 - Expose AckPolicy settings to user.

client::Message:
 - incoming Messages carry their session for acknowledge

perftest: (see perftest --help for details...)
 - allow multiple consumers.
 - 3 queue modes: shared, fanout, topic.
 - set size of messages

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Nov  7 11:57:46 2007
@@ -208,6 +208,7 @@
   qpid/client/Connector.cpp			\
   qpid/client/Demux.cpp				\
   qpid/client/Dispatcher.cpp			\
+  qpid/client/LocalQueue.cpp			\
   qpid/client/MessageListener.cpp		\
   qpid/client/Correlator.cpp			\
   qpid/client/CompletionTracker.cpp		\
@@ -307,6 +308,7 @@
   qpid/client/Exchange.h \
   qpid/client/Message.h \
   qpid/client/Queue.h \
+  qpid/client/AckPolicy.h			\
   qpid/client/Completion.h \
   qpid/client/CompletionTracker.h \
   qpid/client/Connection.h \
@@ -316,6 +318,7 @@
   qpid/client/Correlator.h \
   qpid/client/Demux.h \
   qpid/client/Dispatcher.h \
+  qpid/client/LocalQueue.h \
   qpid/client/Execution.h \
   qpid/client/ExecutionHandler.h \
   qpid/client/Future.h \

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h?rev=592869&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h Wed Nov  7 11:57:46 2007
@@ -0,0 +1,54 @@
+#ifndef QPID_CLIENT_ACKPOLICY_H
+#define QPID_CLIENT_ACKPOLICY_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * Policy for automatic acknowledgement of messages.
+ */
+class AckPolicy
+{
+    size_t interval;
+    size_t count;
+
+  public:
+    /**
+     *@param n: acknowledge every n messages.
+     *n==0 means no automatick acknowledgement.
+     */
+    AckPolicy(size_t n=1) : interval(n), count(n) {}
+
+    void ack(const Message& msg) {
+        if (!interval) return;
+        bool send=(--count==0);
+        msg.acknowledge(true, send);
+        if (send) count = interval;
+    }
+};
+
+}} // namespace qpid::client
+
+
+
+#endif  /*!QPID_CLIENT_ACKPOLICY_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/AckPolicy.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Wed Nov  7 11:57:46 2007
@@ -52,9 +52,8 @@
 }
 
 Connector::~Connector(){
-    if (receiver.id()) {
+    if (receiver.id() && receiver.id() != Thread::current().id())
         receiver.join();
-    }
 }
 
 void Connector::connect(const std::string& host, int port){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Wed Nov  7 11:57:46 2007
@@ -27,6 +27,8 @@
 #include "qpid/sys/BlockingQueue.h"
 #include "Message.h"
 
+#include <boost/state_saver.hpp>
+
 using qpid::framing::FrameSet;
 using qpid::framing::MessageTransferBody;
 using qpid::sys::Mutex;
@@ -36,23 +38,22 @@
 namespace qpid {
 namespace client {
 
-    Subscriber::Subscriber(Session_0_10& s, MessageListener* l, bool a, uint f) : session(s),
listener(l), autoAck(a), ackBatchSize(f), count(0) {}
+Subscriber::Subscriber(Session_0_10& s, MessageListener* l, AckPolicy a) : session(s),
listener(l), autoAck(a) {}
 
 void Subscriber::received(Message& msg)
 {
     if (listener) {
         listener->received(msg);
-        if (autoAck) {
-            bool send = (++count >= ackBatchSize);
-            msg.acknowledge(session, true, send);
-            if (send) count = 0;
-        }
+        autoAck.ack(msg);
     }
 }
 
-
-    Dispatcher::Dispatcher(Session_0_10& s, const std::string& q) : session(s), queue(q),
running(false), stopped(false)
+Dispatcher::Dispatcher(Session_0_10& s, const std::string& q)
+    : session(s), running(false)
 {
+    queue = q.empty() ? 
+        session.execution().getDemux().getDefault() : 
+        session.execution().getDemux().get(q); 
 }
 
 void Dispatcher::start()
@@ -62,19 +63,22 @@
 
 void Dispatcher::run()
 {    
-    Demux::QueuePtr q = queue.empty() ? 
-        session.execution().getDemux().getDefault() : 
-        session.execution().getDemux().get(queue); 
-
-    startRunning();
-    stopped = false;
-    while (!isStopped()) {
-        FrameSet::shared_ptr content = q->pop();
+    Mutex::ScopedLock l(lock);
+    if (running) 
+        throw Exception("Dispatcher is already running.");
+    boost::state_saver<bool>  reset(running); // Reset to false on exit.
+    running = true;
+    queue->open();
+    while (!queue->isClosed()) {
+        Mutex::ScopedUnlock u(lock);
+        FrameSet::shared_ptr content = queue->pop();
         if (content->isA<MessageTransferBody>()) {
-            Message msg(*content);
+            Message msg(*content, session);
             Subscriber::shared_ptr listener = find(msg.getDestination());
             if (!listener) {
-                QPID_LOG(error, "No message listener set: " << content->getMethod());
                                       
+                // FIXME aconway 2007-11-07: Should close session & throw here?
+                QPID_LOG(error, "No message listener for "
+                         << content->getMethod());
             } else {
                 listener->received(msg);
             }
@@ -82,41 +86,23 @@
             if (handler.get()) {
                 handler->handle(*content);
             } else {
+                // FIXME aconway 2007-11-07: Should close session & throw here?
                 QPID_LOG(error, "Unhandled method: " << content->getMethod()); 
                                      
             }
         }
     }
-    stopRunning();
 }
 
 void Dispatcher::stop()
 {
     ScopedLock<Mutex> l(lock);
-    stopped = true;
-}
-
-bool Dispatcher::isStopped()
-{
-    ScopedLock<Mutex> l(lock);
-    return stopped;
-}
-
-/**
- * Prevent concurrent threads invoking run.
- */
-void Dispatcher::startRunning()
-{
-    ScopedLock<Mutex> l(lock);
-    if (running) {
-        throw Exception("Dispatcher is already running.");
-    }
-    running = true;
+    queue->close();             // Will interrupt thread blocked in pop()
 }
 
-void Dispatcher::stopRunning()
+void Dispatcher::setAutoStop(bool b)
 {
     ScopedLock<Mutex> l(lock);
-    running = false;
+    autoStop = b;
 }
 
 Subscriber::shared_ptr Dispatcher::find(const std::string& name)
@@ -129,22 +115,28 @@
     return i->second;
 }
 
-void Dispatcher::listen(MessageListener* listener, bool autoAck, uint ackBatchSize)
+void Dispatcher::listen(
+    MessageListener* listener, AckPolicy autoAck
+)
 {
     ScopedLock<Mutex> l(lock);
-    defaultListener = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck, ackBatchSize));
+    defaultListener = Subscriber::shared_ptr(
+        new Subscriber(session, listener, autoAck));
 }
 
-void Dispatcher::listen(const std::string& destination, MessageListener* listener, bool
autoAck, uint ackBatchSize)
+void Dispatcher::listen(const std::string& destination, MessageListener* listener, AckPolicy
autoAck)
 {
     ScopedLock<Mutex> l(lock);
-    listeners[destination] = Subscriber::shared_ptr(new Subscriber(session, listener, autoAck,
ackBatchSize));
+    listeners[destination] = Subscriber::shared_ptr(
+        new Subscriber(session, listener, autoAck));
 }
 
 void Dispatcher::cancel(const std::string& destination)
 {
     ScopedLock<Mutex> l(lock);
     listeners.erase(destination);
+    if (autoStop && listeners.empty())
+        queue->close();
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.h Wed Nov  7 11:57:46 2007
@@ -29,6 +29,7 @@
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "MessageListener.h"
+#include "AckPolicy.h"
 
 namespace qpid {
 namespace client {
@@ -39,13 +40,11 @@
 {
     Session_0_10& session;
     MessageListener* const listener;
-    const bool autoAck;
-    const uint ackBatchSize;
-    uint count;
+    AckPolicy autoAck;
 
 public:
     typedef boost::shared_ptr<Subscriber> shared_ptr;
-    Subscriber(Session_0_10& session, MessageListener* listener, bool autoAck = true,
uint ackBatchSize = 1);
+    Subscriber(Session_0_10& session, MessageListener* listener, AckPolicy);
     void received(Message& msg);
     
 };
@@ -58,16 +57,14 @@
     sys::Mutex lock;
     sys::Thread worker;
     Session_0_10& session;
-    const std::string queue;
+    Demux::QueuePtr queue;
     bool running;
-    bool stopped;
+    bool autoStop;
     Listeners listeners;
     Subscriber::shared_ptr defaultListener;
     std::auto_ptr<FrameSetHandler> handler;
 
     Subscriber::shared_ptr find(const std::string& name);
-    void startRunning();
-    void stopRunning();
     bool isStopped();
 
 public:
@@ -76,9 +73,10 @@
     void start();
     void run();
     void stop();
+    void setAutoStop(bool b);
 
-    void listen(MessageListener* listener, bool autoAck = true, uint ackBatchSize = 1);
-    void listen(const std::string& destination, MessageListener* listener, bool autoAck
= true, uint ackBatchSize = 1);
+    void listen(MessageListener* listener, AckPolicy autoAck=AckPolicy());
+    void listen(const std::string& destination, MessageListener* listener, AckPolicy
autoAck=AckPolicy());
     void cancel(const std::string& destination);
 };
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=592869&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp Wed Nov  7 11:57:46 2007
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LocalQueue.h"
+#include "qpid/Exception.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/reply_exceptions.h"
+
+namespace qpid {
+namespace client {
+
+using namespace framing;
+
+LocalQueue::LocalQueue(AckPolicy a) : autoAck(a) {}
+LocalQueue::~LocalQueue() {}
+
+Message LocalQueue::pop() {
+    if (!queue)
+        throw ClosedException();
+    FrameSet::shared_ptr content = queue->pop();
+    if (content->isA<MessageTransferBody>()) 
+        return Message(*content, session);
+    else
+        throw CommandInvalidException(
+            QPID_MSG("Unexpected method: " << content->getMethod()));
+}
+
+void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
+
+}} // namespace qpid::client

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/LocalQueue.h Wed Nov  7 11:57:46 2007
@@ -23,8 +23,8 @@
  */
 
 #include "qpid/client/Message.h"
-#include "qpid/Exception.h"
-#include "qpid/sys/BlockingQueue.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/AckPolicy.h"
 
 namespace qpid {
 namespace client {
@@ -35,16 +35,21 @@
 class LocalQueue
 {
   public:
-    LocalQueue(BlockingQueue& q) : queue(q) {}
+    LocalQueue(AckPolicy=AckPolicy());
     ~LocalQueue();
 
     /** Pop the next message off the queue.
      *@exception ClosedException if subscription has been closed.
      */
-    Message pop() { reurn queue->pop(); }
+    Message pop();
+
+    void setAckPolicy(AckPolicy);
 
   private:
-    BlockingQueue& queue;
+  friend class SubscriptionManager;
+    Session_0_10 session;
+    Demux::QueuePtr queue;
+    AckPolicy autoAck;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Wed Nov  7 11:57:46 2007
@@ -68,7 +68,14 @@
         session.execution().completed(id, cumulative, send);
     }
 
-    Message(const framing::FrameSet& frameset) : method(*frameset.as<framing::MessageTransferBody>()),
id(frameset.getId())
+    void acknowledge(bool cumulative = true, bool send = true) const
+    {
+        const_cast<Session_0_10&>(session).execution().completed(id, cumulative,
send);
+    }
+
+    /**@internal for incoming messages */
+    Message(const framing::FrameSet& frameset, Session_0_10 s) :
+        method(*frameset.as<framing::MessageTransferBody>()), id(frameset.getId()),
session(s)
     {
         populate(frameset);
     }
@@ -83,10 +90,13 @@
         return id;
     }
 
+    /**@internal use for incoming messages. */
+    void setSession(Session_0_10 s) { session=s; }
 private:
     //method and id are only set for received messages:
-    const framing::MessageTransferBody method;
-    const framing::SequenceNumber id;
+    framing::MessageTransferBody method;
+    framing::SequenceNumber id;
+    Session_0_10 session;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp Wed Nov  7 11:57:46 2007
@@ -116,7 +116,6 @@
 
 SessionCore::~SessionCore() {
     Lock l(state);
-    invariant();
     detach(COMMAND_INVALID, "Session deleted");
     state.waitWaiters();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Wed Nov  7 11:57:46
2007
@@ -33,57 +33,54 @@
 namespace client {
 
 SubscriptionManager::SubscriptionManager(Session_0_10& s)
-    : dispatcher(s), session(s), messages(1), bytes(UNLIMITED), autoStop(true)
+    : dispatcher(s), session(s),
+      messages(UNLIMITED), bytes(UNLIMITED), window(true)
 {}
 
-std::string SubscriptionManager::uniqueTag(const std::string& tag) {
-    // Make unique tag.
-    int count=1;
-    std::string unique=tag;
-    while (subscriptions.find(tag) != subscriptions.end()) {
-        std::ostringstream s;
-        s << tag << "-" << count++;
-        unique=s.str();
-    }
-    subscriptions.insert(unique);
-    return tag;
-}
-
-std::string SubscriptionManager::subscribe(
+void SubscriptionManager::subscribe(
     MessageListener& listener, const std::string& q, const std::string& t)
 {
-    std::string tag=uniqueTag(t);
-    using namespace arg;
+    std::string tag=t.empty() ? q:t;
     session.messageSubscribe(arg::queue=q, arg::destination=tag);
-    flowLimits(tag, messages, bytes);
     dispatcher.listen(tag, &listener);
-    return tag;
+    setFlowControl(tag, messages, bytes, window);
+}
+
+void SubscriptionManager::subscribe(
+    LocalQueue& lq, const std::string& q, const std::string& t)
+{
+    std::string tag=t.empty() ? q:t;
+    lq.session=session;
+    lq.queue=session.execution().getDemux().add(tag, ByTransferDest(tag));
+    session.messageSubscribe(arg::queue=q, arg::destination=tag);
+    setFlowControl(tag, messages, bytes, window);
 }
 
-void SubscriptionManager::flowLimits(
-    const std::string& tag, uint32_t messages,  uint32_t bytes) {
+void SubscriptionManager::setFlowControl(
+    const std::string& tag, uint32_t messages,  uint32_t bytes, bool window)
+{
+    session.messageFlowMode(tag, window); 
     session.messageFlow(tag, 0, messages); 
     session.messageFlow(tag, 1, bytes);
 }
 
-void SubscriptionManager::flowLimits(uint32_t m,  uint32_t b) {
-    messages=m;
-    bytes=b;
+void SubscriptionManager::setFlowControl(
+    uint32_t messages_,  uint32_t bytes_, bool window_)
+{
+    messages=messages_;
+    bytes=bytes_;
+    window=window_;
 }
 
 void SubscriptionManager::cancel(const std::string tag)
 {
-    if (subscriptions.erase(tag)) {
-        dispatcher.cancel(tag);
-        session.messageCancel(tag);
-        if (autoStop && subscriptions.empty()) stop();
-    }
+    dispatcher.cancel(tag);
+    session.messageCancel(tag);
 }
 
-void SubscriptionManager::run(bool autoStop_)
+void SubscriptionManager::run(bool autoStop)
 {
-    autoStop=autoStop_;
-    if (autoStop && subscriptions.empty()) return;
+    dispatcher.setAutoStop(autoStop);
     dispatcher.run();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Wed Nov  7 11:57:46
2007
@@ -21,55 +21,59 @@
  * under the License.
  *
  */
-
+#include "qpid/sys/Mutex.h"
 #include <qpid/client/Dispatcher.h>
 #include <qpid/client/Session_0_10.h>
 #include <qpid/client/MessageListener.h>
+#include <qpid/client/LocalQueue.h>
 #include <set>
 #include <sstream>
 
 namespace qpid {
 namespace client {
 
-struct TagNotUniqueException : public qpid::Exception {
-    TagNotUniqueException() {}
-};
-
 class SubscriptionManager
 {
-    std::set<std::string> subscriptions;
+    typedef sys::Mutex::ScopedLock Lock;
+    typedef sys::Mutex::ScopedUnlock Unlock;
+
     qpid::client::Dispatcher dispatcher;
     qpid::client::Session_0_10& session;
-    std::string uniqueTag(const std::string&);
     uint32_t messages;
     uint32_t bytes;
-    bool autoStop;
+    bool window;
 
 public:
     SubscriptionManager(Session_0_10& session);
     
     /**
-     * Subscribe listener to receive messages from queue.
+     * Subscribe a MessagesListener to receive messages from queue.
+     * 
      *@param listener Listener object to receive messages.
      *@param queue Name of the queue to subscribe to.
      *@param tag Unique destination tag for the listener.
-     * If not specified a unique tag will be generted based on the queue name.
-     *@return Destination tag.
-     *@exception TagNotUniqueException if there is already a subscription
-     * with the same tag.
+     * If not specified, the queue name is used.
+     */
+    void subscribe(MessageListener& listener,
+                   const std::string& queue,
+                   const std::string& tag=std::string());
+
+    /**
+     * Subscribe a LocalQueue to receive messages from queue.
+     * 
+     *@param queue Name of the queue to subscribe to.
+     *@param tag Unique destination tag for the listener.
+     * If not specified, the queue name is used.
      */
-    std::string subscribe(MessageListener& listener,
-                          const std::string& queue,
-                          const std::string& tag=std::string());
+    void subscribe(LocalQueue& localQueue,
+                   const std::string& queue,
+                   const std::string& tag=std::string());
 
     /** Cancel a subscription. */
     void cancel(const std::string tag);
-    
-    qpid::client::Dispatcher& getDispatcher() { return dispatcher; }
-    size_t size() { return subscriptions.size(); }
 
     /** Deliver messages until stop() is called.
-     *@param autoStop If true, return when all subscriptions are cancelled.
+     *@param autoStop If true, return when all listeners are cancelled.
      */
     void run(bool autoStop=true);
 
@@ -78,13 +82,20 @@
 
     static const uint32_t UNLIMITED=0xFFFFFFFF;
 
-    /** Set the flow control limits for subscriber with tag.
-     * UNLIMITED means no limit.
+    /** Set the flow control for destination tag.
+     *@param tag: name of the destination.
+     *@param messages: message credit.
+     *@param bytes: byte credit.
+     *@param window: if true use window-based flow control.
      */
-    void flowLimits(const std::string& tag, uint32_t messages,  uint32_t bytes);
+    void setFlowControl(const std::string& tag, uint32_t messages,  uint32_t bytes, bool
window=true);
 
-    /** Set the initial flow control limits for new subscribers */
-    void flowLimits(uint32_t messages,  uint32_t bytes);
+    /** Set the initial flow control settings to be applied to each new subscribtion.
+     *@param messages: message credit.
+     *@param bytes: byte credit.
+     *@param window: if true use window-based flow control.
+     */
+    void setFlowControl(uint32_t messages,  uint32_t bytes, bool window=true);
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/BlockingQueue.h Wed Nov  7 11:57:46 2007
@@ -32,7 +32,7 @@
 template <class T>
 class BlockingQueue
 {
-    sys::Waitable lock;
+    mutable sys::Waitable lock;
     std::queue<T> queue;
     bool closed;
 
@@ -95,6 +95,11 @@
         closed=false;
     }
 
+    bool isClosed() const { 
+        Waitable::ScopedLock l(lock);
+        return closed;
+    }
+    
   private:
 
     void queueNotify(size_t ignore) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Wed Nov  7 11:57:46 2007
@@ -48,7 +48,7 @@
 
     void listen()
     {
-        dispatcher.listen(name, this, true, 1);
+        dispatcher.listen(name, this);
         dispatcher.run();
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp?rev=592869&r1=592868&r2=592869&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp Wed Nov  7 11:57:46 2007
@@ -21,44 +21,48 @@
 
 #include "TestOptions.h"
 
-#include "qpid/client/Channel.h"
-#include "qpid/client/Exchange.h"
-#include "qpid/client/Queue.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
 #include "qpid/client/Connection.h"
-#include "qpid/client/MessageListener.h"
 #include "qpid/client/Message.h"
-#include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 
 #include <iostream>
-#include <cstdlib>
-#include <iomanip>
-#include <time.h>
-#include <unistd.h>
+#include <sstream>
 
-
-using namespace qpid;
-using namespace qpid::client;
-using namespace qpid::sys;
 using namespace std;
+using namespace qpid;
+using namespace client;
+using namespace sys;
 
 struct Opts : public TestOptions {
 
     bool listen;
     bool publish;
     int count;
-	bool durable;
+    int size;
+    bool durable;
+    int consumers;
+    std::string mode;
     
-    Opts() : listen(false), publish(false), count(500000) {
+    Opts() :
+        listen(false), publish(false), count(500000), size(64), consumers(1),
+        mode("shared")
+    {
         addOptions() 
             ("listen", optValue(listen), "Consume messages.")
             ("publish", optValue(publish), "Produce messages.")
-            ("count", optValue(count, "N"), "Messages to send/receive.")
-            ("durable", optValue(durable, "N"), "Publish messages as durable.");
+            ("count", optValue(count, "N"), "Messages to send.")
+            ("size", optValue(size, "BYTES"), "Size of messages.")
+            ("durable", optValue(durable, "N"), "Publish messages as durable.")
+            ("consumers", optValue(consumers, "N"), "Number of consumers.")
+            ("mode", optValue(mode, "shared|fanout|topic"), "consume mode");
     }
 };
 
 Opts opts;
+enum Mode { SHARED, FANOUT, TOPIC };
+Mode mode;
 
 struct ListenThread : public Runnable { Thread thread; void run(); };
 struct PublishThread : public Runnable { Thread thread; void run(); };
@@ -66,16 +70,22 @@
 int main(int argc, char** argv) {
     try {
         opts.parse(argc, argv);
+        if (opts.mode=="shared") mode=SHARED;
+        else if (opts.mode=="fanout") mode = FANOUT;
+        else if (opts.mode=="topic") mode = TOPIC;
+        else throw Exception("Invalid mode");
         if (!opts.listen && !opts.publish)
             opts.listen = opts.publish = true;
-        ListenThread listen;
+        std::vector<ListenThread> listen(opts.consumers);
         PublishThread publish;
-        if (opts.listen)
-            listen.thread=Thread(listen);
+        if (opts.listen) 
+            for (int i = 0; i < opts.consumers; ++i)
+                listen[i].thread=Thread(listen[i]);
         if (opts.publish)
             publish.thread=Thread(publish);
         if (opts.listen)
-            listen.thread.join();
+            for (int i = 0; i < opts.consumers; ++i)
+                listen[i].thread.join();
         if (opts.publish)
             publish.thread.join();
     }
@@ -84,223 +94,149 @@
     }
 }
 
-// ================================================================
-// Publish client
-//
-
-struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs)
{
-    timespec r;
-    r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec;
-    r.tv_sec = lhs.tv_sec - rhs.tv_sec;
-    if (r.tv_nsec < 0) {
-        r.tv_nsec += 1000000000;
-        r.tv_sec -= 1;
-    }
-    return r;
-}
-
-ostream& operator<<(ostream& o, const struct timespec& ts) {
-    o << ts.tv_sec << "." << setw(9) << setfill('0') << right
<< ts.tv_nsec;
-    return o;
-}
-
-double toDouble(const struct timespec& ts) {
-    return double(ts.tv_nsec)/1000000000 + ts.tv_sec;
-}
-
-class PublishListener : public MessageListener {
+double secs(Duration d) { return double(d)/TIME_SEC; }
+double secs(AbsTime start, AbsTime finish) { return secs(Duration(start,finish)); }
 
-    void set_time() {
-        timespec ts;
-        if (::clock_gettime(CLOCK_REALTIME, &ts))
-            throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-        startTime = ts;
-    }
 
-    void print_time() {
-        timespec ts;
-        if (::clock_gettime(CLOCK_REALTIME, &ts))
-            throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-        cout << "Total Time:" << ts-startTime << endl;
-        double rate = messageCount*2/toDouble(ts-startTime);
-        cout << "returned Messages:" << messageCount  << endl;
-        cout << "round trip Rate:" << rate  << endl;
-    }
+void expect(string actual, string expect) {
+    if (expect != actual)
+        throw Exception("Expecting "+expect+" but received "+actual);
 
-    struct timespec startTime;
-    int messageCount;
-    bool done;
-    Monitor lock;
-    
-  public:
- 
-    PublishListener(int mcount): messageCount(mcount), done(false) {
-        set_time();
-    }
-	
-    void received(Message& msg) {
-	print_time();
-	QPID_LOG(info, "Publisher: received: " << msg.getData());
-        Mutex::ScopedLock l(lock);
-        QPID_LOG(info, "Publisher: done.");
-        done = true;
-        lock.notify();
-    }
+}
 
-    void wait() {
-        Mutex::ScopedLock l(lock);
-        while (!done)
-            lock.wait();
+const char* exchange() {
+    switch (mode) {
+      case SHARED: return "";   // Deafult exchange.
+      case FANOUT: return "amq.fanout"; 
+      case TOPIC: return "amq.topic"; 
     }
-};
-
+    assert(0);
+    return 0;
+}
 
 void PublishThread::run() {
-    Connection connection;
-    Channel channel;
-    Message msg;
-    opts.open(connection);
-    connection.openChannel(channel);
-    channel.start();
-
-    cout << "Started publisher." << endl;
-    string queueControl = "control";
-    Queue response(queueControl);
-    channel.declareQueue(response);
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);
+    try {
+        Connection connection;
+        opts.open(connection);
+        Session_0_10 session = connection.newSession();
+
+        session.queueDeclare(arg::queue="control"); // Control queue
+        session.queuePurge(arg::queue="control");
+        if (mode==SHARED) {
+            session.queueDeclare(arg::queue="perftest"); // Shared data queue
+            session.queuePurge(arg::queue="perftest");
+        }
         
-    string  queueName ="queue01";
-    string  queueNameC =queueName+"-1";
-
-    // create publish queue
-    Queue publish(queueName);
-    channel.declareQueue(publish);
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName);
-  
-    // create completion queue
-    Queue completion(queueNameC);
-    channel.declareQueue(completion);
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
-      
-    // pass queue name
-    msg.setData(queueName);
-    channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl);
-
-    QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC);
-	
-    int count = opts.count;
-    PublishListener listener(count);
-    channel.consume(completion, queueNameC, &listener);
-    QPID_LOG(info, "Publisher setup consumer: "<< queueNameC);
-
-    struct timespec startTime;
-    if (::clock_gettime(CLOCK_REALTIME, &startTime))
-        throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-
-	bool durable = opts.durable;
-	if (durable)
+        // Wait for consumers.
+        SubscriptionManager subs(session);
+        LocalQueue control;
+        subs.subscribe(control, "control");
+        for (int i = 0; i < opts.consumers; ++i) 
+            expect(control.pop().getData(), "ready");
+
+        // Create test message
+        size_t msgSize=max(opts.size, 32);
+        Message msg(string(msgSize, 'X'), "perftest");
+        char* msgBuf = const_cast<char*>(msg.getData().data());
+        if (opts.durable)
 	    msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+        // Time sending message.
+        AbsTime start=now();
+        cout << "Publishing " << opts.count << " messages " << flush;
+        for (int i=0; i<opts.count; i++) {
+            sprintf(msgBuf, "%d", i);
+            session.messageTransfer(arg::destination=exchange(),
+                                    arg::content=msg);
+            if ((i%10000)==0) cout << "." << flush;
+        }
+        cout << " done." << endl;
+        msg.setData("done");    // Send done messages.
+        if (mode==SHARED)
+            for (int i = 0; i < opts.consumers; ++i)
+                session.messageTransfer(arg::destination=exchange(), arg::content=msg);
+        else
+            session.messageTransfer(arg::destination=exchange(), arg::content=msg);
+        AbsTime end=now();
+
+        // Report
+        cout << endl;
+        cout << "publish count:" << opts.count << endl;
+        cout << "publish secs:" << secs(start,end) << endl;
+        cout << "publish rate:" << (opts.count)/secs(start,end) << endl;
+
+        //  Wait for consumer(s) to finish.
+        for (int i = 0; i < opts.consumers; ++i) {
+            string report=control.pop().getData();
+            if (report.find("consume") != 0)
+                throw Exception("Expected consumer report, got: "+report);
+            cout << endl << report;
+        }
+        end=now();
+
+        // Count total transfers from publisher and to subscribers.
+        int transfers;
+        if (mode==SHARED)       // each message sent/receivd once.
+            transfers=2*opts.count; 
+        else                    // sent once, received N times.
+            transfers=opts.count*(opts.consumers + 1);
+        
+        cout << endl
+             << "total transfers:" << transfers << endl
+             << "total secs:" << secs(start, end) << endl
+             << "total transfers/sec:" << transfers/secs(start, end) <<
endl;
 		
-    for (int i=0; i<count; i++) {
-        msg.setData("Message 0123456789 ");
-        channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
+        connection.close();
+    }
+    catch (const std::exception& e) {
+        cout << "PublishThread exception: " << e.what() << endl;
     }
-
-    struct timespec endTime;
-    if (::clock_gettime(CLOCK_REALTIME, &endTime))
-        throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno)));
-
-    cout << "publish Time:" << endTime-startTime << endl;
-    double rate = count/toDouble(endTime-startTime);
-    cout << "publish Messages:" << count  << endl;
-    cout << "publish Rate:" << rate  << endl;
-
-    msg.setData(queueName);  // last message to queue.
-    channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName);
-	
-    listener.wait();
- 
-    channel.close();
-    connection.close();
 }
 
-
-
-// ================================================================
-// Listen client
-//
-
-class Listener : public MessageListener{
-    string queueName;
-    Monitor lock;
-    bool done;
-
-  public:
-    Listener (string& _queueName): queueName(_queueName), done(false) {};
-
-    void received(Message& msg) {
-        if (msg.getData() == queueName)
-        {
-            Mutex::ScopedLock l(lock);
-            QPID_LOG(info, "Listener: done. " << queueName);
-            done = true;
-            lock.notify();
+void ListenThread::run() {
+    try {
+        Connection connection;
+        opts.open(connection);
+        Session_0_10 session = connection.newSession();
+
+        string consumeQueue;
+        switch (mode) {
+          case SHARED:
+            consumeQueue="perftest";
+            session.queueDeclare(arg::queue="perftest"); 
+            break;
+          case FANOUT:
+          case TOPIC:
+            consumeQueue=session.getId().str(); // Unique
+            session.queueDeclare(arg::queue=consumeQueue,
+                                 arg::exclusive=true,
+                                 arg::autoDelete=true);
+            session.queueBind(arg::queue=consumeQueue,
+                              arg::exchange=exchange(),
+                              arg::routingKey="perftest");
         }
+        // Notify publisher we are ready.
+        session.queueDeclare(arg::queue="control"); // Control queue
+        session.messageTransfer(arg::content=Message("ready", "control"));
+
+        SubscriptionManager subs(session);
+        LocalQueue consume;
+        subs.subscribe(consume, consumeQueue);
+        int consumed=0;
+        AbsTime start=now();
+        while (consume.pop().getData() != "done")
+            ++consumed;
+        AbsTime end=now();
+
+        // Report to publisher.
+        ostringstream report;
+        report << "consume count: " << consumed << endl
+               << "consume secs: " << secs(start, end) << endl
+               << "consume rate: " << consumed/secs(start,end) << endl;
+        session.messageTransfer(arg::content=Message(report.str(), "control"));
+        connection.close();
     }
-    
-    void wait() {
-        Mutex::ScopedLock l(lock);
-        while (!done)
-            lock.wait();
-    }
-};
-
-void ListenThread::run() {
-    Connection connection;
-    Channel channel;
-    Message msg;
-    Message msg1;
-    cout << "Started listener." << endl;;
-    opts.open(connection);
-    connection.openChannel(channel);
-    channel.start();
-
-    string queueControl = "control";
-    Queue response(queueControl);
-    channel.declareQueue(response);
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl);	
-    while (!channel.get(msg, response, AUTO_ACK))	{
-        QPID_LOG(info, "Listener: waiting for queue name.");
-        sleep(1);
+    catch (const std::exception& e) {
+        cout << "PublishThread exception: " << e.what() << endl;
     }
-    string  queueName =msg.getData();
-    string  queueNameC =queueName+ "-1";
-
-    QPID_LOG(info, "Listener: Using Queue:" << queueName);
-    QPID_LOG(info, "Listener: Reply Queue:" << queueNameC);
-    // create consume queue
-    Queue consume(queueName);
-    channel.declareQueue(consume);
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName);
- 
-    // create completion queue
-    Queue completion(queueNameC);
-    channel.declareQueue(completion);
-    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC);
-
-    Listener listener(queueName);
-    channel.consume(consume, queueName, &listener);
-    QPID_LOG(info, "Listener: consuming...");
-
-    listener.wait();
-      
-    QPID_LOG(info, "Listener: send final message.");
-    // complete.
-    msg1.setData(queueName);
-    channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC);
-
-    channel.close();
-    connection.close();
 }
-
 



Mime
View raw message