qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1165875 - in /qpid/branches/qpid-2920-1/qpid/cpp/src: ./ qpid/ qpid/cluster/exp/ tests/
Date Tue, 06 Sep 2011 21:45:55 GMT
Author: aconway
Date: Tue Sep  6 21:45:54 2011
New Revision: 1165875

URL: http://svn.apache.org/viewvc?rev=1165875&view=rev
Log:
QPID-2920: Cluster batch multicaster.

Send multiple multicast events in a single call to CPG using iovec.
Encoding in the sending thread using reference counted buffers.

Added:
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp   (with props)
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.h   (with props)
Modified:
    qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/BufferRef.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-build-rinstall
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk?rev=1165875&r1=1165874&r2=1165875&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/cluster.mk Tue Sep  6 21:45:54 2011
@@ -112,6 +112,7 @@ cluster2_la_SOURCES =				\
 	qpid/cluster/PollerDispatch.h		\
 	qpid/cluster/exp/BrokerHandler.cpp	\
 	qpid/cluster/exp/BrokerHandler.h	\
+	qpid/cluster/exp/BufferFactory.h	\
 	qpid/cluster/exp/Cluster2Plugin.cpp	\
 	qpid/cluster/exp/Core.cpp		\
 	qpid/cluster/exp/Core.h			\
@@ -121,6 +122,8 @@ cluster2_la_SOURCES =				\
 	qpid/cluster/exp/HandlerBase.h		\
 	qpid/cluster/exp/MessageHandler.cpp	\
 	qpid/cluster/exp/MessageHandler.h	\
+	qpid/cluster/exp/Multicaster.cpp	\
+	qpid/cluster/exp/Multicaster.h		\
 	qpid/cluster/exp/WiringHandler.cpp	\
 	qpid/cluster/exp/WiringHandler.h
 

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/BufferRef.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/BufferRef.h?rev=1165875&r1=1165874&r2=1165875&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/BufferRef.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/BufferRef.h Tue Sep  6 21:45:54 2011
@@ -27,7 +27,11 @@
 
 namespace qpid {
 
-/** Template for mutable or const buffer references */
+/** Reference to a ref-counted buffer of T.
+ *  Template for mutable or const buffer references.
+ *  Gathers together an intrusive_ptr for refcounting and begin and end pointers
+ *  for the buffer space. For use with RefCountedBuffer and similar classes.
+ */
 template <class T> class BufferRefT {
   public:
     BufferRefT() : begin_(0), end_(0) {}
@@ -38,8 +42,15 @@ template <class T> class BufferRefT {
     template <class U> BufferRefT(const BufferRefT<U>& other) :
         counter(other.counter), begin_(other.begin_), end_(other.end_) {}
 
+    template <class U> BufferRefT& operator=(const BufferRefT<U>& other)
{
+        counter = other.counter; begin_ = other.begin_; end_ = other.end_;
+    }
+
     T* begin() const { return begin_; }
     T* end() const { return end_; }
+    size_t size() const { return end_ - begin_; }
+    operator bool() const { return begin_; }
+    bool operator!() const { return !begin_; }
 
     /** Return a sub-buffer of the current buffer */
     BufferRefT sub_buffer(T* begin, T* end) {

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h?rev=1165875&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h Tue Sep  6 21:45:54
2011
@@ -0,0 +1,67 @@
+#ifndef QPID_CLUSTER_EXP_BUFFERFACTORY_H
+#define QPID_CLUSTER_EXP_BUFFERFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/BufferRef.h"
+#include "qpid/RefCountedBuffer.h"
+#include "qpid/sys/Mutex.h"
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Factory to allocate sub-buffers of RefCountedBuffers.
+ * Thread safe.
+ */
+class BufferFactory
+{
+  public:
+    /** @param min_size minimum size of underlying buffers. */
+    inline BufferFactory(size_t min_size);
+    inline BufferRef get(size_t size);
+  private:
+    sys::Mutex lock;
+    size_t min_size;
+    BufferRef buf;
+    char* pos;
+};
+
+BufferFactory::BufferFactory(size_t size) : min_size(size) {}
+
+BufferRef BufferFactory::get(size_t size) {
+    sys::Mutex::ScopedLock l(lock);
+    if (!buf || pos + size > buf.end()) {
+        buf = RefCountedBuffer::create(std::max(size, min_size));
+        pos = buf.begin();
+    }
+    assert(buf);
+    assert(buf.begin() <= pos);
+    assert(pos + size <= buf.end());
+    BufferRef ret(buf.sub_buffer(pos, pos+size));
+    pos += size;
+    return ret;
+}
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_BUFFERFACTORY_H*/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/BufferFactory.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1165875&r1=1165874&r2=1165875&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.cpp Tue Sep  6 21:45:54 2011
@@ -36,7 +36,8 @@ namespace cluster {
 
 Core::Core(const Settings& s, broker::Broker& b) :
     broker(b),
-    eventHandler(new EventHandler(*this))
+    eventHandler(new EventHandler(*this)),
+    multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this))
 {
     eventHandler->add(boost::shared_ptr<HandlerBase>(new WiringHandler(*eventHandler)));
     eventHandler->add(boost::shared_ptr<HandlerBase>(new MessageHandler(*eventHandler)));
@@ -61,15 +62,8 @@ void Core::fatal() {
 
 void Core::mcast(const framing::AMQBody& body) {
     QPID_LOG(trace, "cluster multicast: " << body);
-    // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
-    // here we multicast Frames rather than Events.
     framing::AMQFrame f(body);
-    std::string data(f.encodedSize(), char());
-    framing::Buffer buf(&data[0], data.size());
-    f.encode(buf);
-    iovec iov = { buf.getPointer(), buf.getSize() };
-    while (!eventHandler->getCpg().mcast(&iov, 1))
-        ::usleep(1000);      // FIXME aconway 2010-10-20: flow control
+    multicaster.mcast(f);
 }
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h?rev=1165875&r1=1165874&r2=1165875&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Core.h Tue Sep  6 21:45:54 2011
@@ -25,6 +25,7 @@
 #include <string>
 #include <memory>
 #include "LockedMap.h"
+#include "Multicaster.h"
 #include "qpid/cluster/types.h"
 #include "qpid/cluster/Cpg.h"
 #include "qpid/broker/QueuedMessage.h"
@@ -87,6 +88,7 @@ class Core
     std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
     BrokerHandler* brokerHandler; // Handles broker events.
     RoutingMap routingMap;
+    Multicaster multicaster;
 };
 }} // namespace qpid::cluster
 

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp?rev=1165875&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp Tue Sep  6 21:45:54
2011
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Multicaster.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+namespace {
+const size_t MAX_IOV = 60;  // Limit imposed by CPG
+
+struct ::iovec bufToIov(const BufferRef& buf) {
+    ::iovec iov;
+    iov.iov_base = buf.begin();
+    iov.iov_len = buf.size();
+    return iov;
+}
+}
+
+Multicaster::Multicaster(Cpg& cpg_,
+                         const boost::shared_ptr<sys::Poller>& poller,
+                         boost::function<void()> onError_) :
+    onError(onError_), cpg(cpg_),
+    queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
+    ioVector(MAX_IOV),
+    buffers(64*1024)           // TODO aconway 2011-03-15: optimum size?
+{
+    queue.start();
+}
+
+void Multicaster::mcast(const framing::AMQDataBlock& data) {
+    BufferRef bufRef = buffers.get(data.encodedSize());
+    framing::Buffer buf(bufRef.begin(), bufRef.size());
+    data.encode(buf);
+    queue.push(bufRef);
+}
+
+Multicaster::PollableEventQueue::Batch::const_iterator
+Multicaster::sendMcast(const PollableEventQueue::Batch& buffers) {
+    try {
+        PollableEventQueue::Batch::const_iterator i = buffers.begin();
+        while( i != buffers.end()) {
+            size_t len = std::min(size_t(buffers.end() - i), MAX_IOV);
+            PollableEventQueue::Batch::const_iterator j = i + len;
+            std::transform(i, j, ioVector.begin(), &bufToIov);
+            if (!cpg.mcast(&ioVector[0], len)) {
+                // CPG didn't send because of CPG flow control.
+                return i;
+            }
+            i = j;
+        }
+        return i;
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(critical, "Multicast error: " << e.what());
+        queue.stop();
+        onError();
+        return buffers.end();
+    }
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.h?rev=1165875&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.h Tue Sep  6 21:45:54
2011
@@ -0,0 +1,67 @@
+#ifndef QPID_CLUSTER_MULTICASTER_H
+#define QPID_CLUSTER_MULTICASTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BufferFactory.h"
+#include "qpid/framing/AMQDataBlock.h"
+#include "qpid/sys/PollableQueue.h"
+#include <sys/uio.h>            // For struct iovec
+
+namespace qpid {
+
+namespace sys {
+class Poller;
+}
+
+namespace cluster {
+
+class Cpg;
+
+/**
+ * Multicast to a CPG group in poller threads. Shared, thread safe object.
+ */
+class Multicaster
+{
+  public:
+    Multicaster(Cpg& cpg_,
+                const boost::shared_ptr<sys::Poller>&,
+                boost::function<void()> onError
+    );
+
+    /** Multicast an event */
+    void mcast(const framing::AMQDataBlock&);
+
+  private:
+    typedef sys::PollableQueue<BufferRef> PollableEventQueue;
+
+    PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch&
);
+
+    boost::function<void()> onError;
+    Cpg& cpg;
+    PollableEventQueue queue;
+    std::vector<struct ::iovec> ioVector;
+    BufferFactory buffers;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_MULTICASTER_H*/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/cluster/exp/Multicaster.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/brokertest.py?rev=1165875&r1=1165874&r2=1165875&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/brokertest.py Tue Sep  6 21:45:54 2011
@@ -418,6 +418,8 @@ class Cluster:
         self.args += [ cluster_name,
                        "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
         self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
+        self.args += [ "--log-enable=info+", "--log-enable=trace+:cluster"]
+
         assert cluster_lib, "Cannot locate cluster plug-in"
         self.args += [ "--load-module", cluster_lib ]
         self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-build-rinstall
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-build-rinstall?rev=1165875&r1=1165874&r2=1165875&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-build-rinstall (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-build-rinstall Tue Sep  6 21:45:54 2011
@@ -21,7 +21,7 @@
 # Run "make install"" locally then copy the install tree to each of $HOSTS
 # Must be run in a configured qpid build directory.
 #
-test -f config.status || { echo "Not in a configured build directory."; usage; }
+test -f config.status || { echo "Not in a configured build directory."; exit 1; }
 . src/tests/install_env.sh
 set -ex
 make && make -j1 install

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1165875&r1=1165874&r2=1165875&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Sep  6 21:45:54 2011
@@ -118,7 +118,7 @@ def start_receive(queue, index, opts, re
     return clients.add(Popen(command, stdout=PIPE))
 
 def start_send(queue, opts, broker, host):
-    address="%s;{%s}"%(queue,",".join(opts.send_option))
+    address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:receiver"]))
     command = ["qpid-send",
                "-b", broker,
                "-a", address,
@@ -244,7 +244,9 @@ def main():
         for i in xrange(opts.repeat):
             delete_queues(queues, opts.broker[0])
             ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
-            receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+            time.sleep(.1) # FIXME aconway 2011-03-16: new cluster async wiring
+            receivers = [start_receive(q, j, opts, ready_queue,
+                                       brokers.next(), client_hosts.next())
                          for q in queues for j in xrange(opts.receivers)]
             ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
             senders = [start_send(q, opts,brokers.next(), client_hosts.next())
@@ -254,7 +256,6 @@ def main():
             recv_stats=parse_receivers(receivers)
             if opts.summarize: print_summary(send_stats, recv_stats)
             else: print_data(send_stats, recv_stats)
-            delete_queues(queues, opts.broker[0])
     finally: clients.kill()             # No strays
 
 if __name__ == "__main__": main()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message