Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 12918 invoked from network); 4 Jul 2008 19:08:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Jul 2008 19:08:08 -0000 Received: (qmail 25242 invoked by uid 500); 4 Jul 2008 19:08:09 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 25230 invoked by uid 500); 4 Jul 2008 19:08:09 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 25221 invoked by uid 99); 4 Jul 2008 19:08:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jul 2008 12:08:09 -0700 X-ASF-Spam-Status: No, hits=-1999.3 required=10.0 tests=ALL_TRUSTED,FRT_LEVITRA X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jul 2008 19:07:15 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 37E8F23889BB; Fri, 4 Jul 2008 12:07:36 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r674107 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/ qpid/amqp_0_10/ qpid/broker/ qpid/cluster/ qpid/framing/ qpid/log/ qpid/sys/ qpid/sys/posix/ tests/ Date: Fri, 04 Jul 2008 19:07:34 -0000 To: qpid-commits@incubator.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080704190736.37E8F23889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Fri Jul 4 12:07:33 2008 New Revision: 674107 URL: http://svn.apache.org/viewvc?rev=674107&view=rev Log: Cluster prototype: handles client-initiated commands (not dequeues) Details - Cluster.cpp: serializes all frames thru cluster (see below) - broker/ConnectionManager: Added handler chain in front of Connection::received. - sys::Fork and ForkWithMessage - abstractions for forking with posix impl. - tests/ForkedBroker.h: test utility to fork a broker process. - broker/SignalHandler: Encapsulated signal handling from qpidd.cpp - Various minor logging & error message improvements to aid debugging. NB: current impl will not scale. It is functional working starting point so we can start testing & profiling to find the right optimizations. Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h (with props) incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (with props) Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am incubator/qpid/trunk/qpid/cpp/src/cluster.mk incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jul 4 12:07:33 2008 @@ -77,14 +77,16 @@ qpid/sys/posix/Time.cpp \ qpid/sys/posix/Thread.cpp \ qpid/sys/posix/Shlib.cpp \ - qpid/sys/posix/Mutex.cpp + qpid/sys/posix/Mutex.cpp \ + qpid/sys/posix/Fork.cpp posix_plat_hdr = \ qpid/sys/posix/check.h \ qpid/sys/posix/Condition.h \ qpid/sys/posix/PrivatePosix.h \ qpid/sys/posix/Mutex.h \ - qpid/sys/posix/Thread.h + qpid/sys/posix/Thread.h \ + qpid/sys/posix/Fork.h platform_src = $(posix_plat_src) platform_hdr = $(posix_plat_hdr) @@ -246,6 +248,8 @@ qpid/amqp_0_10/Connection.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerSingleton.cpp \ + qpid/broker/ConnectionManager.h \ + qpid/broker/ConnectionManager.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ qpid/broker/PersistableMessage.cpp \ @@ -290,9 +294,11 @@ qpid/broker/SessionState.cpp \ qpid/broker/SessionManager.h \ qpid/broker/SessionManager.cpp \ - qpid/broker/SessionHandler.h \ qpid/broker/SessionContext.h \ + qpid/broker/SessionHandler.h \ qpid/broker/SessionHandler.cpp \ + qpid/broker/SignalHandler.h \ + qpid/broker/SignalHandler.cpp \ qpid/broker/System.cpp \ qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ @@ -546,6 +552,7 @@ qpid/sys/Poller.h \ qpid/sys/ProtocolFactory.h \ qpid/sys/Runnable.h \ + qpid/sys/Fork.h \ qpid/sys/ScopedIncrement.h \ qpid/sys/Semaphore.h \ qpid/sys/Serializer.h \ Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Jul 4 12:07:33 2008 @@ -13,7 +13,8 @@ qpid/cluster/Dispatchable.h \ qpid/cluster/ClusterPlugin.cpp \ qpid/cluster/ClassifierHandler.h \ - qpid/cluster/ClassifierHandler.cpp + qpid/cluster/ClassifierHandler.cpp \ + qpid/cluster/ShadowConnectionOutputHandler.h libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Fri Jul 4 12:07:33 2008 @@ -61,8 +61,8 @@ void Plugin::Factory::addOptions(Options& opts) { typedef std::vector::const_iterator Iter; for (Iter i = Factory::getList().begin(); i != Factory::getList().end(); ++i) { - if ((**i).getOptions()) - opts.add(*(**i).getOptions()); + Options* opt=(**i).getOptions(); + if (opt) opts.add(*opt); } } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/Url.cpp Fri Jul 4 12:07:33 2008 @@ -163,7 +163,8 @@ } void Url::throwIfEmpty() const { - throw InvalidUrl("URL contains no addresses"); + if (empty()) + throw InvalidUrl("URL contains no addresses"); } std::istream& operator>>(std::istream& is, Url& url) { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Fri Jul 4 12:07:33 2008 @@ -28,7 +28,8 @@ using sys::Mutex; Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) - : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient), + : frameQueueClosed(false), output(o), + connection(broker.getConnectionManager().create(this, broker, id, _isClient)), identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { @@ -45,13 +46,13 @@ framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection.received(frame); + connection->received(frame); } return in.getPosition(); } bool Connection::canEncode() { - if (!frameQueueClosed) connection.doOutput(); + if (!frameQueueClosed) connection->doOutput(); Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -90,7 +91,7 @@ } void Connection::closed() { - connection.closed(); + connection->closed(); } void Connection::send(framing::AMQFrame& f) { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Fri Jul 4 12:07:33 2008 @@ -27,6 +27,7 @@ #include "Connection.h" #include "qpid/broker/Connection.h" #include +#include namespace qpid { namespace broker { class Broker; } @@ -40,7 +41,7 @@ bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - broker::Connection connection; // FIXME aconway 2008-03-18: + std::auto_ptr connection; // FIXME aconway 2008-03-18: std::string identifier; bool initialized; bool isClient; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Jul 4 12:07:33 2008 @@ -23,6 +23,7 @@ */ #include "ConnectionFactory.h" +#include "ConnectionManager.h" #include "ConnectionToken.h" #include "DirectExchange.h" #include "DtxManager.h" @@ -120,6 +121,7 @@ Options& getOptions() { return config; } SessionManager& getSessionManager() { return sessionManager; } + ConnectionManager& getConnectionManager() { return connectionManager; } management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -158,6 +160,7 @@ ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; + ConnectionManager connectionManager; management::ManagementAgent::shared_ptr managementAgent; management::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jul 4 12:07:33 2008 @@ -53,7 +53,9 @@ isLink(isLink_), mgmtClosing(false), mgmtId(mgmtId_), - links(broker_.getLinks()) + links(broker_.getLinks()), + lastInHandler(*this), + inChain(lastInHandler) { Manageable* parent = broker.GetVhostObject(); @@ -86,7 +88,9 @@ links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ +void Connection::received(framing::AMQFrame& frame){ inChain(frame); } + +void Connection::receivedLast(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Jul 4 12:07:33 2008 @@ -56,6 +56,7 @@ { public: typedef boost::shared_ptr shared_ptr; + Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false); ~Connection (); @@ -90,10 +91,15 @@ void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); + framing::FrameHandler::Chain& getInChain() { return inChain; } + private: typedef boost::ptr_map ChannelMap; typedef std::vector::iterator queue_iterator; + // End of the received handler chain. + void receivedLast(framing::AMQFrame& frame); + ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; @@ -103,6 +109,9 @@ boost::function0 ioCallback; management::Connection::shared_ptr mgmtObject; LinkRegistry& links; + framing::FrameHandler::MemFunRef lastInHandler; + framing::FrameHandler::Chain inChain; + }; }} Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp Fri Jul 4 12:07:33 2008 @@ -0,0 +1,41 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionManager.h" +#include "Connection.h" + +namespace qpid { +namespace broker { + +std::auto_ptr +ConnectionManager::create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient) { + std::auto_ptr c(new Connection(out, broker, mgmtId, isClient)); + sys::Mutex::ScopedLock l(lock); + std::for_each(observers.begin(), observers.end(), + boost::bind(&Observer::created, _1, boost::ref(*c))); + return c; +} + +void ConnectionManager::add(const boost::intrusive_ptr& observer) { + sys::Mutex::ScopedLock l(lock); + observers.push_back(observer); +} + +}} // namespace qpid::broker Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h Fri Jul 4 12:07:33 2008 @@ -0,0 +1,70 @@ +#ifndef QPID_BROKER_CONNECTIONMANAGER_H +#define QPID_BROKER_CONNECTIONMANAGER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" +#include "qpid/sys/Mutex.h" +#include +#include +#include + +namespace qpid { + +namespace sys { +class ConnectionOutputHandler; +} + +namespace broker { + +class Broker; +class Connection; + +/** + * Manages connections and observers. + */ +class ConnectionManager { + public: + + /** + * Observer notified of ConnectionManager events. + */ + struct Observer : public RefCounted { + /** Called when a connection is attached. */ + virtual void created(Connection&) {} + }; + + /** Called to create a new Connection, applies observers. */ + std::auto_ptr create(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isClient = false); + + /** Add an observer */ + void add(const boost::intrusive_ptr&); + + private: + typedef std::vector > Observers; + + sys::Mutex lock; + Observers observers; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_CONNECTIONMANAGER_H*/ Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp Fri Jul 4 12:07:33 2008 @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SignalHandler.h" +#include "Broker.h" +#include + +namespace qpid { +namespace broker { + +boost::shared_ptr SignalHandler::broker; + +void SignalHandler::setBroker(const boost::shared_ptr& b) { + broker = b; + + signal(SIGINT,shutdownHandler); + signal(SIGTERM, shutdownHandler); + + signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config. + + signal(SIGCHLD,SIG_IGN); + signal(SIGTSTP,SIG_IGN); + signal(SIGTTOU,SIG_IGN); + signal(SIGTTIN,SIG_IGN); +} + +void SignalHandler::shutdownHandler(int) { + if (broker.get()) { + broker->shutdown(); + broker.reset(); + } +} + +}} // namespace qpid::broker Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h Fri Jul 4 12:07:33 2008 @@ -0,0 +1,47 @@ +#ifndef QPID_BROKER_SIGNALHANDLER_H +#define QPID_BROKER_SIGNALHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +namespace qpid { +namespace broker { + +class Broker; + +/** + * Handle signals e.g. to shut-down a broker. + */ +class SignalHandler +{ + public: + /** Set the broker to be shutdown on signals */ + static void setBroker(const boost::shared_ptr& broker); + + private: + static void shutdownHandler(int); + static boost::shared_ptr broker; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SIGNALHANDLER_H*/ Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SignalHandler.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 4 12:07:33 2008 @@ -17,7 +17,9 @@ */ #include "Cluster.h" +#include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" +#include "qpid/broker/Connection.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" @@ -32,68 +34,49 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; -using broker::SessionState; +using broker::Connection; namespace { +// FIXME aconway 2008-07-01: sending every frame to cluster, +// serializing all processing in cluster deliver thread. +// This will not perform at all, but provides a correct starting point. +// +// TODO: +// - Fake "Connection" for cluster: owns shadow sessions. +// - Maintain shadow sessions. +// - Apply foreign frames to shadow sessions. +// + + // Beginning of inbound chain: send to cluster. struct ClusterSendHandler : public FrameHandler { - SessionState& session; + Connection& connection; Cluster& cluster; - bool busy; - Monitor lock; - ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {} - - void handle(AMQFrame& f) { - Mutex::ScopedLock l(lock); - assert(!busy); - // FIXME aconway 2008-01-29: refcount Sessions. - // session.addRef(); // Keep the session till the message is self delivered. - cluster.send(f, next); // Indirectly send to next via cluster. - - // FIXME aconway 2008-01-29: need to get this blocking out of the loop. - // But cluster needs to agree on order of side-effects on the shared model. - // OK for wiring to block, for messages use queue tokens? - // Both in & out transfers must be orderd per queue. - // May need out-of-order completion. - busy=true; - while (busy) lock.wait(); - } -}; - -// Next in inbound chain, self delivered from cluster. -struct ClusterDeliverHandler : public FrameHandler { - Cluster& cluster; - ClusterSendHandler& sender; + ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {} - ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {} - void handle(AMQFrame& f) { - next->handle(f); - // FIXME aconway 2008-06-16: solve overtaking problem - async completion of commands. - // Mutex::ScopedLock l(lock); - // senderBusy=false; - // senderLock.notify(); + // FIXME aconway 2008-01-29: Refcount Connections to ensure + // Connection not destroyed till message is self delivered. + cluster.send(f, &connection, next); // Indirectly send to next via cluster. } }; -struct SessionObserver : public broker::SessionManager::Observer { +struct ConnectionObserver : public broker::ConnectionManager::Observer { Cluster& cluster; - SessionObserver(Cluster& c) : cluster(c) {} + ConnectionObserver(Cluster& c) : cluster(c) {} - void opened(SessionState& s) { + void created(Connection& c) { // FIXME aconway 2008-06-16: clean up chaining and observers. - ClusterSendHandler* sender=new ClusterSendHandler(s, cluster); - ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster); - s.getInChain().insert(deliverer); - s.getOutChain().insert(sender); + ClusterSendHandler* sender=new ClusterSendHandler(c, cluster); + c.getInChain().insert(sender); } }; } ostream& operator <<(ostream& out, const Cluster& cluster) { - return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]"; + return out << cluster.name.str() << "-" << cluster.self; } ostream& operator<<(ostream& out, const Cluster::MemberMap::value_type& m) { @@ -106,13 +89,16 @@ return out; } -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) : +// FIXME aconway 2008-07-02: create a Connection for the cluster. +Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : + broker(b), cpg(*this), name(name_), url(url_), - observer(new SessionObserver(*this)) + observer(new ConnectionObserver(*this)), + self(cpg.self()) { - QPID_LOG(trace, *this << " Joining cluster: " << name_); + QPID_LOG(trace, "Joining cluster: " << name_); cpg.join(name); notify(); dispatcher=Thread(*this); @@ -136,19 +122,32 @@ } } -void Cluster::send(AMQFrame& frame, FrameHandler* next) { - QPID_LOG(trace, *this << " SEND: " << frame); - char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling. - Buffer buf(data); +template void decodePtr(Buffer& buf, T*& ptr) { + uint64_t value = buf.getLongLong(); + ptr = reinterpret_cast(value); +} + +template void encodePtr(Buffer& buf, T* ptr) { + uint64_t value = reinterpret_cast(ptr); + buf.putLongLong(value); +} + +void Cluster::send(AMQFrame& frame, void* connection, FrameHandler* next) { + QPID_LOG(trace, "MCAST [" << connection << "] " << frame); + // TODO aconway 2008-07-03: More efficient buffer management. + // Cache coded form of decoded frames for re-encoding? + Buffer buf(buffer); + assert(frame.size() + 128 < sizeof(buffer)); frame.encode(buf); - buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer. - iovec iov = { data, frame.size()+sizeof(next) }; + encodePtr(buf, connection); + encodePtr(buf, next); + iovec iov = { buffer, buf.getPosition() }; cpg.mcast(name, &iov, 1); } void Cluster::notify() { AMQFrame frame(in_place(ProtocolVersion(), url.str())); - send(frame, 0); + send(frame, 0, 0); } size_t Cluster::size() const { @@ -164,6 +163,21 @@ return result; } +boost::shared_ptr +Cluster::getShadowConnection(const Cpg::Id& member, void* connectionPtr) { + // FIXME aconway 2008-07-02: locking - called by deliver in + // cluster thread so no locks but may need to revisit as model + // changes. + ShadowConnectionId id(member, connectionPtr); + boost::shared_ptr& ptr = shadowConnectionMap[id]; + if (!ptr) { + std::ostringstream os; + os << name << ":" << member << ":" << std::hex << connectionPtr; + ptr.reset(new broker::Connection(&shadowOut, broker, os.str())); + } + return ptr; +} + void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, @@ -172,20 +186,28 @@ void* msg, int msg_len) { + Id from(nodeid, pid); try { - Id from(nodeid, pid); Buffer buf(static_cast(msg), msg_len); AMQFrame frame; frame.decode(buf); - QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from); - if (frame.getChannel() == 0) + void* connectionId; + decodePtr(buf, connectionId); + + QPID_LOG(trace, "DLVR [" << from << " " << connectionId << "] " << frame); + + if (connectionId == 0) // A cluster control frame. handleClusterFrame(from, frame); - else if (from == self) { - FrameHandler* next; - buf.getRawData((uint8_t*)&next, sizeof(next)); + else if (from == self) { // My own frame, carries a next pointer. + FrameHandler* next; + decodePtr(buf, next); next->handle(frame); } - // FIXME aconway 2008-01-30: apply frames from foreign sessions. + else { // Foreign frame, forward to shadow connection. + // FIXME aconway 2008-07-02: ptr_map instead of shared_ptr. + boost::shared_ptr shadow = getShadowConnection(from, connectionId); + shadow->received(frame); + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -203,7 +225,7 @@ return (predicate(*this)); } -// Handle cluster control frame from the null session. +// Handle cluster control frame . void Cluster::handleClusterFrame(Id from, AMQFrame& frame) { // TODO aconway 2007-06-20: use visitor pattern here. ClusterNotifyBody* notifyIn= @@ -213,10 +235,8 @@ { Mutex::ScopedLock l(lock); members[from].url=notifyIn->getUrl(); - if (!self.id && notifyIn->getUrl() == url.str()) - self=from; lock.notifyAll(); - QPID_LOG(trace, *this << ": members joined: " << members); + QPID_LOG(debug, "Cluster join: " << members); } } @@ -234,7 +254,7 @@ if (nLeft) { for (int i = 0; i < nLeft; ++i) members.erase(Id(left[i])); - QPID_LOG(trace, *this << ": members left: " << members); + QPID_LOG(debug, "Cluster leave: " << members); lock.notifyAll(); } newMembers = nJoined > 1 || (nJoined==1 && Id(joined[0]) != self); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jul 4 12:07:33 2008 @@ -19,7 +19,8 @@ * */ -#include "Cpg.h" +#include "qpid/cluster/Cpg.h" +#include "qpid/cluster/ShadowConnectionOutputHandler.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" @@ -36,7 +37,8 @@ #include #include -namespace qpid { namespace cluster { +namespace qpid { +namespace cluster { /** * Connection to the cluster. @@ -63,7 +65,7 @@ virtual ~Cluster(); // FIXME aconway 2008-01-29: - boost::intrusive_ptr getObserver() { return observer; } + boost::intrusive_ptr getObserver() { return observer; } /** Get the current cluster membership. */ MemberList getMembers() const; @@ -82,11 +84,13 @@ sys::Duration timeout=sys::TIME_INFINITE) const; /** Send frame to the cluster */ - void send(framing::AMQFrame&, framing::FrameHandler*); + void send(framing::AMQFrame&, void* connection, framing::FrameHandler*); private: typedef Cpg::Id Id; typedef std::map MemberMap; + typedef boost::tuple ShadowConnectionId; + typedef std::map > ShadowConnectionMap; void notify(); ///< Notify cluster of my details. @@ -107,17 +111,24 @@ ); void run(); + void handleClusterFrame(Id from, framing::AMQFrame&); + boost::shared_ptr getShadowConnection(const Cpg::Id&, void*); + mutable sys::Monitor lock; + broker::Broker& broker; Cpg cpg; Cpg::Name name; Url url; - Id self; MemberMap members; sys::Thread dispatcher; boost::function callback; - boost::intrusive_ptr observer; + boost::intrusive_ptr observer; + Id self; + ShadowConnectionMap shadowConnectionMap; + ShadowConnectionOutputHandler shadowOut; + char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management. friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Jul 4 12:07:33 2008 @@ -35,15 +35,6 @@ using namespace std; using broker::Broker; -struct OptionValues { - string name; - string url; - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } -}; // Note we update the values in a separate struct. // This is to work around boost::program_options differences, @@ -51,43 +42,44 @@ // ones take a copy (or require a shared_ptr) // struct ClusterOptions : public Options { + std::string name; + std::string url; - ClusterOptions(OptionValues* v) : Options("Cluster Options") { + ClusterOptions() : Options("Cluster Options") { addOptions() - ("cluster-name", optValue(v->name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(v->url,"URL"), + ("cluster-name", optValue(name,""), "Cluster identifier") + ("cluster-url", optValue(url,"URL"), "URL of this broker, advertized to the cluster.\n" - "Defaults to a URL listing all the local IP addresses\n"); + "Defaults to a URL listing all the local IP addresses\n") + ; } }; struct ClusterPlugin : public PluginT { - OptionValues values; + ClusterOptions options; boost::optional cluster; - ClusterPlugin(const OptionValues& v) : values(v) {} + ClusterPlugin(const ClusterOptions& opts) : options(opts) {} - void initializeT(Broker& broker) { - cluster = boost::in_place(values.name, values.getUrl(broker.getPort()), boost::ref(broker)); - broker.getSessionManager().add(cluster->getObserver()); + void initializeT(Broker& broker) { // FIXME aconway 2008-07-01: drop T suffix. + Url url = options.url.empty() ? Url::getIpAddressesUrl(broker.getPort()) : Url(options.url); + cluster = boost::in_place(options.name, url, boost::ref(broker)); + broker.getConnectionManager().add(cluster->getObserver()); // FIXME aconway 2008-07-01: to Cluster ctor } }; struct PluginFactory : public Plugin::FactoryT { - OptionValues values; ClusterOptions options; - PluginFactory() : options(&values) {} - Options* getOptions() { return &options; } boost::shared_ptr createT(Broker&) { - // Only provide to a Broker, and only if the --cluster config is set. - if (values.name.empty()) + if (options.name.empty()) { // No cluster name, don't initialize cluster. return boost::shared_ptr(); + } else - return make_shared_ptr(new ClusterPlugin(values)); + return make_shared_ptr(new ClusterPlugin(options)); } }; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Jul 4 12:07:33 2008 @@ -144,24 +144,20 @@ return "Cannot mcast to CPG group "+group.str(); } +Cpg::Id Cpg::self() const { + unsigned int nodeid; + check(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); + return Id(nodeid, getpid()); +} + ostream& operator<<(ostream& o, std::pair a) { ostream_iterator i(o, " "); std::copy(a.first, a.first+a.second, i); return o; } -static int popbyte(uint32_t& n) { - uint8_t b=n&0xff; - n>>=8; - return b; -} - ostream& operator <<(ostream& out, const Cpg::Id& id) { - uint32_t node=id.nodeId(); - out << popbyte(node); - for (int i = 0; i < 3; i++) - out << "." << popbyte(node); - return out << ":" << id.pid(); + return out << id.getNodeId() << "-" << id.getPid(); } ostream& operator <<(ostream& out, const cpg_name& name) { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Jul 4 12:07:33 2008 @@ -22,6 +22,8 @@ #include "qpid/Exception.h" #include "qpid/cluster/Dispatchable.h" +#include +#include #include #include @@ -55,16 +57,14 @@ std::string str() const { return std::string(value, length); } }; - - struct Id { - uint64_t id; - Id(uint64_t n=0) : id(n) {} - Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; } - Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {} - - operator uint64_t() const { return id; } - uint32_t nodeId() const { return id >> 32; } - pid_t pid() const { return id & 0xFFFF; } + + + // boost::tuple gives us == and < for free. + struct Id : public boost::tuple { + Id(uint32_t n=0, uint32_t p=0) : boost::tuple(n, p) {} + Id(const cpg_address& addr) : boost::tuple(addr.nodeid, addr.pid) {} + uint32_t getNodeId() const { return boost::get<0>(*this); } + uint32_t getPid() const { return boost::get<1>(*this); } }; static std::string str(const cpg_name& n) { @@ -131,6 +131,8 @@ cpg_handle_t getHandle() const { return handle; } + Id self() const; + private: class Handles; struct ClearHandleOnExit; Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h Fri Jul 4 12:07:33 2008 @@ -0,0 +1,46 @@ +#ifndef QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H +#define QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include + +namespace qpid { + +namespace framing { class AMQFrame; } + +namespace cluster { + +/** + * Output handler for frames sent to shadow connections. + * Simply discards frames. + */ +class ShadowConnectionOutputHandler : public sys::ConnectionOutputHandler +{ + public: + virtual void send(framing::AMQFrame&) {} + virtual void close() {} + virtual void activateOutput() {} +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_SHADOWCONNECTIONOUTPUTHANDLER_H*/ Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ShadowConnectionOutputHandler.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Fri Jul 4 12:07:33 2008 @@ -47,7 +47,7 @@ Handler* next; /** A Chain is a handler holding a linked list of sub-handlers. - * Chain::next is invoked after the full, it is not itself part of the chain. + * Chain::next is invoked after the full chain, it is not itself part of the chain. * Handlers inserted into the chain are deleted by the Chain dtor. */ class Chain : public Handler { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp Fri Jul 4 12:07:33 2008 @@ -111,6 +111,8 @@ void Logger::log(const Statement& s, const std::string& msg) { // Format the message outside the lock. std::ostringstream os; + if (!prefix.empty()) + os << prefix << ": "; if (flags&TIME) { const char * month_abbrevs[] = { "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec" }; @@ -134,7 +136,7 @@ if (flags&LEVEL) os << LevelTraits::name(s.level) << " "; if (flags&THREAD) - os << "[" << qpid::sys::Thread::logId() << "] "; + os << "[0x" << hex << qpid::sys::Thread::logId() << "] "; if (flags&FILE) os << s.file << ":"; if (flags&LINE) @@ -145,6 +147,7 @@ os << " "; os << msg << endl; std::string formatted=os.str(); + std::cout << "FORMATTED: " << formatted << std::endl; // FIXME aconway 2008-07-04: { ScopedLock l(lock); @@ -220,6 +223,9 @@ void (Logger::* outputFn)(const std::string&, const Options&) = &Logger::output; for_each(o.outputs.begin(), o.outputs.end(), boost::bind(outputFn, this, _1, boost::cref(o))); + setPrefix(opts.prefix); } +void Logger::setPrefix(const std::string& p) { prefix = p; } + }} // namespace qpid::log Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h Fri Jul 4 12:07:33 2008 @@ -90,8 +90,12 @@ /** Add an output destination for messages */ void output(std::auto_ptr out); + /** Set a prefix for all messages */ + void setPrefix(const std::string& prefix); + /** Reset the logger to it's original state. */ void clear(); + private: typedef boost::ptr_vector Outputs; @@ -104,6 +108,7 @@ Outputs outputs; Selector selector; int flags; + std::string prefix; }; }} // namespace qpid::log Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.cpp Fri Jul 4 12:07:33 2008 @@ -142,6 +142,7 @@ ("log-source", optValue(source,"yes|no"), "Include source file:line in log messages") ("log-thread", optValue(thread,"yes|no"), "Include thread ID in log messages") ("log-function", optValue(function,"yes|no"), "Include function signature in log messages") + ("log-prefix", optValue(prefix,"STRING"), "Prefix to append to all log messages") ("syslog-name", optValue(syslogName, "NAME"), "Name to use in syslog messages") ("syslog-facility", optValue(syslogFacility,"LOG_XXX"), "Facility to use in syslog messages") ; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Options.h Fri Jul 4 12:07:33 2008 @@ -45,6 +45,7 @@ bool trace; std::string syslogName; SyslogFacility syslogFacility; + std::string prefix; }; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp Fri Jul 4 12:07:33 2008 @@ -30,7 +30,7 @@ namespace { using namespace std; -struct NonPrint { bool operator()(unsigned char c) { return !isprint(c); } }; +struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } }; const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' }; @@ -43,6 +43,7 @@ for (string::const_iterator i = str.begin(); i != str.end(); ++i) { if (nonPrint(*i)) { ret.push_back('\\'); + ret.push_back('x'); ret.push_back(hex[((*i) >> 4)&0xf]); ret.push_back(hex[(*i) & 0xf]); } Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h Fri Jul 4 12:07:33 2008 @@ -0,0 +1,24 @@ +#ifndef QPID_SYS_FORK_H +#define QPID_SYS_FORK_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "posix/Fork.h" + +#endif /*!QPID_SYS_FORK_H*/ Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Fork.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Socket.h Fri Jul 4 12:07:33 2008 @@ -47,7 +47,7 @@ /** Set socket non blocking */ void setNonblocking() const; - void connect(const std::string& host, int port) const; + void connect(const std::string& host, uint16_t port) const; void close() const; @@ -67,7 +67,7 @@ *@param backlog maximum number of pending connections. *@return The bound port. */ - int listen(int port = 0, int backlog = 10) const; + int listen(uint16_t port = 0, int backlog = 10) const; /** Returns the "socket name" ie the address bound to * the near end of the socket Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp Fri Jul 4 12:07:33 2008 @@ -0,0 +1,132 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/Fork.h" +#include "qpid/log/Statement.h" +#include "qpid/Exception.h" + +#include +#include +#include +#include +#include +#include + +namespace qpid { +namespace sys { + +using namespace std; + +namespace { +/** Throw an exception containing msg and strerror if condition is true. */ +void throwIf(bool condition, const string& msg) { + if (condition) + throw Exception(msg + (errno? ": "+strError(errno) : string()) + "."); +} + +void writeStr(int fd, const std::string& str) { + const char* WRITE_ERR = "Error writing to parent process"; + int size = str.size(); + throwIf(int(sizeof(size)) > ::write(fd, &size, sizeof(size)), WRITE_ERR); + throwIf(size > ::write(fd, str.data(), size), WRITE_ERR); +} + +string readStr(int fd) { + string value; + const char* READ_ERR = "Error reading from forked process"; + int size; + throwIf(int(sizeof(size)) > ::read(fd, &size, sizeof(size)), READ_ERR); + if (size > 0) { // Read string message + value.resize(size); + throwIf(size > ::read(fd, const_cast(value.data()), size), READ_ERR); + } + return value; +} + +} // namespace + +Fork::Fork() {} +Fork::~Fork() {} + +void Fork::fork() { + pid_t pid = ::fork(); + throwIf(pid < 0, "Failed to fork the process"); + if (pid == 0) child(); + else parent(pid); +} + +ForkWithMessage::ForkWithMessage() { + pipeFds[0] = pipeFds[1] = -1; +} + +struct AutoCloseFd { + int fd; + AutoCloseFd(int d) : fd(d) {} + ~AutoCloseFd() { ::close(fd); } +}; + +void ForkWithMessage::fork() { + throwIf(::pipe(pipeFds) < 0, "Can't create pipe"); + pid_t pid = ::fork(); + throwIf(pid < 0, "Fork fork failed"); + if (pid == 0) { // Child + AutoCloseFd ac(pipeFds[1]); // Write side. + ::close(pipeFds[0]); // Read side + try { + child(); + } + catch (const std::exception& e) { + QPID_LOG(error, "Error in forked child: " << e.what()); + std::string msg = e.what(); + if (msg.empty()) msg = " "; // Make sure we send a non-empty error string. + writeStr(pipeFds[1], msg); + } + } + else { // Parent + close(pipeFds[1]); // Write side. + AutoCloseFd ac(pipeFds[0]); // Read side + parent(pid); + } +} + +string ForkWithMessage::wait(int timeout) { // parent waits for child. + errno = 0; + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + + fd_set fds; + FD_ZERO(&fds); + FD_SET(pipeFds[0], &fds); + int n=select(FD_SETSIZE, &fds, 0, 0, &tv); + throwIf(n==0, "Timed out waiting for fork"); + throwIf(n<0, "Error waiting for fork"); + + string error = readStr(pipeFds[0]); + if (error.empty()) return readStr(pipeFds[0]); + else throw Exception("Error in forked process: " + error); +} + +// Write empty error string followed by value string to pipe. +void ForkWithMessage::ready(const string& value) { // child + // Write empty string for error followed by value. + writeStr(pipeFds[1], string()); // No error + writeStr(pipeFds[1], value); +} + + +}} // namespace qpid::sys Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h Fri Jul 4 12:07:33 2008 @@ -0,0 +1,81 @@ +#ifndef QPID_SYS_POSIX_FORK_H +#define QPID_SYS_POSIX_FORK_H + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include + +namespace qpid { +namespace sys { + +/** + * Fork the process. Call parent() in parent and child() in child. + */ +class Fork { + public: + Fork(); + virtual ~Fork(); + + /** + * Fork the process. + * Calls parent() in the parent process, child() in the child. + */ + virtual void fork(); + + protected: + + /** Called in parent process. + *@child pid of child process + */ + virtual void parent(pid_t child) = 0; + + /** Called in child process */ + virtual void child() = 0; +}; + +/** + * Like Fork but also allows the child to send a string message + * or throw an exception to the parent. + */ +class ForkWithMessage : public Fork { + public: + ForkWithMessage(); + void fork(); + + protected: + /** Call from parent(): wait for child to send a value or throw exception. + * @timeout in seconds to wait for response. + * @return value passed by child to ready(). + */ + std::string wait(int timeout); + + /** Call from child(): Send a value to the parent. + *@param value returned by parent call to wait(). + */ + void ready(const std::string& value); + + private: + int pipeFds[2]; +}; + +}} // namespace qpid::sys + + + +#endif /*!QPID_SYS_POSIX_FORK_H*/ Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Fork.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp Fri Jul 4 12:07:33 2008 @@ -137,7 +137,7 @@ } } -void Socket::connect(const std::string& host, int port) const +void Socket::connect(const std::string& host, uint16_t port) const { std::stringstream namestream; namestream << host << ":" << port; @@ -192,7 +192,7 @@ return received; } -int Socket::listen(int port, int backlog) const +int Socket::listen(uint16_t port, int backlog) const { const int& socket = impl->fd; int yes=1; @@ -202,9 +202,9 @@ name.sin_port = htons(port); name.sin_addr.s_addr = 0; if (::bind(socket, (struct sockaddr*)&name, sizeof(name)) < 0) - throw QPID_POSIX_ERROR(errno); + throw Exception(QPID_MSG("Can't bind to port " << port << ": " << strError(errno))); if (::listen(socket, backlog) < 0) - throw QPID_POSIX_ERROR(errno); + throw Exception(QPID_MSG("Can't listen on port " << port << ": " << strError(errno))); socklen_t namelen = sizeof(name); if (::getsockname(socket, (struct sockaddr*)&name, &namelen) < 0) Modified: incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpidd.cpp Fri Jul 4 12:07:33 2008 @@ -19,6 +19,7 @@ * */ #include "qpid/broker/Broker.h" +#include "qpid/broker/SignalHandler.h" #include "qpid/sys/posix/check.h" #include "qpid/broker/Daemon.h" #include "qpid/log/Statement.h" @@ -131,12 +132,6 @@ shared_ptr brokerPtr; auto_ptr options; -void shutdownHandler(int /*signal*/){ - // Note: do not call any async-signal unsafe functions here. - // Do any extra shutdown actions in main() after broker->run() - brokerPtr->shutdown(); -} - struct QpiddDaemon : public Daemon { QpiddDaemon(std::string pidDir) : Daemon(pidDir) {} @@ -153,7 +148,6 @@ uint16_t port=brokerPtr->getPort(); ready(port); // Notify parent. brokerPtr->run(); - brokerPtr.reset(); } }; @@ -240,17 +234,7 @@ } // Starting the broker. - - // Signal handling - signal(SIGINT,shutdownHandler); - signal(SIGTERM,shutdownHandler); - signal(SIGHUP,SIG_IGN); // TODO aconway 2007-07-18: reload config. - - signal(SIGCHLD,SIG_IGN); - signal(SIGTSTP,SIG_IGN); - signal(SIGTTOU,SIG_IGN); - signal(SIGTTIN,SIG_IGN); - + broker::SignalHandler::setBroker(brokerPtr); // Set up signal handling. if (options->daemon.daemon) { // For daemon mode replace default stderr with syslog. if (options->log.outputs.size() == 1 && options->log.outputs[0] == "stderr") { Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp Fri Jul 4 12:07:33 2008 @@ -192,3 +192,10 @@ fun:_ZN4qpid7Options5parseEiPPcRKSsb } +{ + CPG related errors - seem benign but should invesgitate. + Memcheck:Param + socketcall.sendmsg(msg.msg_iov[i]) + fun:sendmsg + obj:/usr/lib/openais/libcpg.so.2.0.0 +} Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Fri Jul 4 12:07:33 2008 @@ -86,16 +86,13 @@ /** Convenience class to create and open a connection and session * and some related useful objects. */ -template +template struct ClientT { ConnectionType connection; SessionType session; qpid::client::SubscriptionManager subs; qpid::client::LocalQueue lq; - ClientT(uint16_t port) : connection(port), - session(connection.newSession("Client")), - subs(session) - {} + ClientT(uint16_t port) : connection(port), session(connection.newSession()), subs(session) {} ~ClientT() { connection.close(); } }; Added: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=674107&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Fri Jul 4 12:07:33 2008 @@ -0,0 +1,91 @@ +#ifndef TESTS_FORKEDBROKER_H +#define TESTS_FORKEDBROKER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Fork.h" +#include "qpid/log/Logger.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/SignalHandler.h" + +#include + +#include + +#include +#include + +/** + * Class to fork a broker child process. + * + * For most tests a BrokerFixture may be more convenient as it starts + * a broker in the same process which allows you to easily debug into + * the broker. + * + * This useful for tests that need to start multiple brokers where + * those brokers can't coexist in the same process (e.g. for cluster + * tests where CPG doesn't allow multiple group members in a single + * process.) + * + */ +class ForkedBroker : public qpid::sys::ForkWithMessage { + pid_t childPid; + uint16_t port; + qpid::broker::Broker::Options opts; + std::string prefix; + + public: + ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string()) + : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } + + ~ForkedBroker() { stop(); } + + void stop() { + if (childPid > 0) { + ::kill(childPid, SIGINT); + //FIXME aconway 2008-07-04: ::waitpid(childPid, 0, 0); + } + } + + void parent(pid_t pid) { + childPid = pid; + qpid::log::Logger::instance().setPrefix("parent"); + std::string portStr = wait(2); + port = boost::lexical_cast(portStr); + } + + void child() { + prefix += boost::lexical_cast(long(getpid())); + qpid::log::Logger::instance().setPrefix(prefix); + opts.port = 0; + boost::shared_ptr broker(new qpid::broker::Broker(opts)); + qpid::broker::SignalHandler::setBroker(broker); + QPID_LOG(info, "ForkedBroker started on " << broker->getPort()); + ready(boost::lexical_cast(broker->getPort())); // Notify parent. + broker->run(); + QPID_LOG(notice, "ForkedBroker exiting."); + } + + uint16_t getPort() { return port; } +}; + +#endif /*!TESTS_FORKEDBROKER_H*/ Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jul 4 12:07:33 2008 @@ -62,7 +62,8 @@ TxBufferTest.cpp \ TxPublishTest.cpp \ MessageBuilderTest.cpp \ - ConnectionOptions.h + ConnectionOptions.h \ + ForkedBroker.h if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Jul 4 12:07:33 2008 @@ -19,10 +19,14 @@ #include "test_tools.h" #include "unit_test.h" +#include "ForkedBroker.h" #include "BrokerFixture.h" #include "qpid/cluster/Cpg.h" #include "qpid/framing/AMQBody.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Session.h" +#include "qpid/framing/Uuid.h" #include #include @@ -33,15 +37,41 @@ #include #include +#include + QPID_AUTO_TEST_SUITE(CpgTestSuite) using namespace std; +using namespace qpid; using namespace qpid::cluster; using namespace qpid::framing; using namespace qpid::client; +using qpid::broker::Broker; using boost::ptr_vector; +struct ClusterFixture : public ptr_vector { + string name; + + ClusterFixture(size_t n=0) : name(Uuid(true).str()) { add(n); } + void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } + void add(); +}; + +void ClusterFixture::add() { + broker::Broker::Options opts; + Plugin::Factory::addOptions(opts); // For cluster options. + const char* argv[] = { + "", "--cluster-name", name.c_str(), "--auth=no", "--no-data-dir" + }; + opts.parse(sizeof(argv)/sizeof(*argv), const_cast(argv)); + ostringstream prefix; + prefix << "b" << size() << "-"; + QPID_LOG(info, "ClusterFixture adding broker " << prefix.str()); + push_back(new ForkedBroker(opts, prefix.str())); + QPID_LOG(info, "ClusterFixture added broker " << prefix.str()); +} + // For debugging: op << for CPG types. ostream& operator<<(ostream& o, const cpg_name* n) { @@ -117,56 +147,8 @@ } -QPID_AUTO_TEST_CASE(CpgMulti) { - // Verify using multiple handles in one process. - // - Cpg::Name group("CpgMulti"); - Callback cb1(group.str()); - Cpg cpg1(cb1); - - Callback cb2(group.str()); - Cpg cpg2(cb2); - - cpg1.join(group); - cpg2.join(group); - iovec iov1 = { (void*)"Hello1", 6 }; - iovec iov2 = { (void*)"Hello2", 6 }; - cpg1.mcast(group, &iov1, 1); - cpg2.mcast(group, &iov2, 1); - cpg1.leave(group); - cpg2.leave(group); - - cpg1.dispatchSome(); - BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size()); - BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]); - BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]); - - cpg2.dispatchSome(); - BOOST_REQUIRE_EQUAL(2u, cb1.delivered.size()); - BOOST_CHECK_EQUAL("Hello1", cb1.delivered[0]); - BOOST_CHECK_EQUAL("Hello2", cb1.delivered[1]); -} - -// Test cluster of BrokerFixtures. -struct ClusterFixture : public ptr_vector { - ClusterFixture(size_t n=0) { add(n); } - void add(size_t n) { for (size_t i=0; i < n; ++i) add(); } - void add(); -}; - -void ClusterFixture::add() { - qpid::broker::Broker::Options opts; - // Assumes the cluster plugin is loaded. - qpid::Plugin::Factory::addOptions(opts); - const char* argv[] = { "--cluster-name", ::getenv("USERNAME") }; - // FIXME aconway 2008-06-26: fix parse() signature, should not need cast. - opts.parse(sizeof(argv)/sizeof(*argv), const_cast(argv)); - push_back(new BrokerFixture(opts)); -} - -#if 0 QPID_AUTO_TEST_CASE(testWiringReplication) { - ClusterFixture cluster(3); + ClusterFixture cluster(2); // FIXME aconway 2008-07-02: 3 brokers Client c0(cluster[0].getPort()); BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty()); BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty()); @@ -187,16 +169,17 @@ ClusterFixture cluster(2); Client c0(cluster[0].getPort()); c0.session.queueDeclare("q"); - c0.session.messageTransfer(arg::content=TransferContent("data", "q")); + c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); + c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); c0.session.close(); Client c1(cluster[1].getPort()); Message msg; BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); - BOOST_CHECK_EQUAL(string("data"), msg.getData()); + BOOST_CHECK_EQUAL(string("foo"), msg.getData()); + BOOST_CHECK(c1.subs.get(msg, "q", qpid::sys::TIME_SEC)); + BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } -// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover. - -#endif +// TODO aconway 2008-06-25: dequeue replication, failover. QPID_AUTO_TEST_SUITE_END() Modified: incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp?rev=674107&r1=674106&r2=674107&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp Fri Jul 4 12:07:33 2008 @@ -374,8 +374,8 @@ QPID_LOG(critical, str); ifstream log("logging.tmp"); string line; - getline(log, line); - string expect="critical null\\00tab\\09space newline\\0Aret\\0D\\80\\99\\FF\\00"; + getline(log, line, '\0'); + string expect="critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n"; BOOST_CHECK_EQUAL(expect, line); log.close(); unlink("logging.tmp");