Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 97825 invoked from network); 12 Aug 2009 21:09:07 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 12 Aug 2009 21:09:07 -0000 Received: (qmail 16464 invoked by uid 500); 12 Aug 2009 21:09:13 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 16441 invoked by uid 500); 12 Aug 2009 21:09:13 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 16432 invoked by uid 99); 12 Aug 2009 21:09:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Aug 2009 21:09:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Aug 2009 21:09:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9AADD23888BB; Wed, 12 Aug 2009 21:08:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r803713 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Event.cpp Multicaster.cpp Multicaster.h Date: Wed, 12 Aug 2009 21:08:51 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090812210851.9AADD23888BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Wed Aug 12 21:08:51 2009 New Revision: 803713 URL: http://svn.apache.org/viewvc?rev=803713&view=rev Log: Batch multiple events into a single CPG multicast. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=803713&r1=803712&r2=803713&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Aug 12 21:08:51 2009 @@ -323,9 +323,11 @@ { MemberId from(nodeid, pid); framing::Buffer buf(static_cast(msg), msg_len); - Event e(Event::decodeCopy(from, buf)); - LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); - deliverEvent(e); + while (buf.available()) { + Event e(Event::decodeCopy(from, buf)); + LATENCY_TRACK(if (e.getConnectionId().getMember() == self) mcast.cpgLatency.finish()); + deliverEvent(e); + } } LATENCY_TRACK(sys::LatencyTracker eventQueueLatencyTracker("EventQueue");) Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=803713&r1=803712&r2=803713&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Wed Aug 12 21:08:51 2009 @@ -72,7 +72,7 @@ if (buf.available() < e.size) throw Exception("Not enough data for multicast event"); e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); - memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); + buf.getRawData((uint8_t*)(e.getData()), e.size); return e; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=803713&r1=803712&r2=803713&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Wed Aug 12 21:08:51 2009 @@ -24,10 +24,14 @@ #include "qpid/log/Statement.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/AMQFrame.h" +#include +#include namespace qpid { namespace cluster { +static const int MCAST_IOV_MAX=63; // Limit imposed by CPG + Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr& poller, boost::function onError_) : @@ -36,7 +40,8 @@ #endif onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + holding(true), + ioVector(MCAST_IOV_MAX) { queue.start(); } @@ -70,26 +75,29 @@ queue.push(e); } - -Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { +Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast( + const PollableEventQueue::Batch& events) +{ + PollableEventQueue::Batch::const_iterator i = events.begin(); try { - PollableEventQueue::Batch::const_iterator i = values.begin(); - while( i != values.end()) { - iovec iov = i->toIovec(); - if (!cpg.mcast(&iov, 1)) { - // cpg didn't send because of CPG flow control. - break; + while (i < events.end()) { + size_t count = std::min(MCAST_IOV_MAX, int(events.end() - i)); + std::transform(i, i+count, ioVector.begin(), + boost::bind(&Event::toIovec, _1)); + if (!cpg.mcast(&ioVector.front(), count)) { + QPID_LOG(trace, "CPG flow control, will resend " + << events.end() - i << " events"); + break; } - ++i; + i += count; } - return i; } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); queue.stop(); onError(); - return values.end(); } + return i; } void Multicaster::release() { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=803713&r1=803712&r2=803713&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Wed Aug 12 21:08:51 2009 @@ -29,6 +29,7 @@ #include "qpid/sys/LatencyTracker.h" #include #include +#include namespace qpid { @@ -72,7 +73,7 @@ PollableEventQueue queue; bool holding; PlainEventQueue holdingQueue; - std::vector ioVector; + std::vector< ::iovec> ioVector; }; }} // namespace qpid::cluster --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org