Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 32611 invoked from network); 10 Sep 2007 08:41:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Sep 2007 08:41:40 -0000 Received: (qmail 98696 invoked by uid 500); 10 Sep 2007 08:41:34 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 98660 invoked by uid 500); 10 Sep 2007 08:41:34 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 98651 invoked by uid 99); 10 Sep 2007 08:41:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2007 01:41:33 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2007 08:41:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A63C61A9832; Mon, 10 Sep 2007 01:41:10 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r574176 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/client/ qpid/framing/ tests/ Date: Mon, 10 Sep 2007 08:41:06 -0000 To: qpid-commits@incubator.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070910084110.A63C61A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Mon Sep 10 01:41:05 2007 New Revision: 574176 URL: http://svn.apache.org/viewvc?rev=574176&view=rev Log: Client side support for message and delivery properties in header segments. Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (with props) Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 10 01:41:05 2007 @@ -132,6 +132,7 @@ qpid/framing/Blob.cpp \ qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \ qpid/framing/MethodHolderMaxSize.h \ + qpid/framing/TransferContent.cpp \ qpid/Exception.cpp \ qpid/Plugin.h \ qpid/Plugin.cpp \ @@ -178,6 +179,7 @@ qpid/broker/FanOutExchange.cpp \ qpid/broker/HeadersExchange.cpp \ qpid/broker/Message.cpp \ + qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageDelivery.cpp \ qpid/broker/MessageHandlerImpl.cpp \ @@ -344,6 +346,7 @@ qpid/framing/SequenceNumberSet.h \ qpid/framing/SerializeHandler.h \ qpid/framing/StructHelper.h \ + qpid/framing/TransferContent.h \ qpid/framing/TypeFilter.h \ qpid/framing/Value.h \ qpid/framing/Visitor.h \ Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Mon Sep 10 01:41:05 2007 @@ -68,7 +68,7 @@ if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); - alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders()); } } else { @@ -358,7 +358,7 @@ while(!messages.empty()){ DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), - &(msg.getMessage().getApplicationHeaders())); + msg.getMessage().getApplicationHeaders()); pop(); } alternateExchange->decAlternateUsers(); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Sep 10 01:41:05 2007 @@ -95,7 +95,7 @@ Exchange::shared_ptr alternate = queue->getAlternateExchange(); if (alternate) { DeliverableMessage delivery(msg.payload); - alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders())); + alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders()); QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to " << alternate->getName()); } else { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Sep 10 01:41:05 2007 @@ -38,12 +38,12 @@ Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {} -const std::string& Message::getRoutingKey() const +std::string Message::getRoutingKey() const { return getAdapter().getRoutingKey(frames); } -const std::string& Message::getExchangeName() const +std::string Message::getExchangeName() const { return getAdapter().getExchange(frames); } @@ -61,7 +61,7 @@ return getAdapter().isImmediate(frames); } -const FieldTable& Message::getApplicationHeaders() const +const FieldTable* Message::getApplicationHeaders() const { return getAdapter().getApplicationHeaders(frames); } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Sep 10 01:41:05 2007 @@ -59,11 +59,11 @@ uint64_t contentSize() const; - const std::string& getRoutingKey() const; + std::string getRoutingKey() const; const boost::shared_ptr getExchange(ExchangeRegistry&) const; - const std::string& getExchangeName() const; + std::string getExchangeName() const; bool isImmediate() const; - const framing::FieldTable& getApplicationHeaders() const; + const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); framing::FrameSet& getFrames() { return frames; } Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp?rev=574176&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp Mon Sep 10 01:41:05 2007 @@ -0,0 +1,87 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "MessageAdapter.h" + +namespace { + const std::string empty; +} + +namespace qpid { +namespace broker{ + + std::string PublishAdapter::getRoutingKey(const framing::FrameSet& f) + { + return f.as()->getRoutingKey(); + } + + std::string PublishAdapter::getExchange(const framing::FrameSet& f) + { + return f.as()->getExchange(); + } + + bool PublishAdapter::isImmediate(const framing::FrameSet& f) + { + return f.as()->getImmediate(); + } + + const framing::FieldTable* PublishAdapter::getApplicationHeaders(const framing::FrameSet& f) + { + const framing::BasicHeaderProperties* p = f.getHeaders()->get(); + return p ? &(p->getHeaders()) : 0; + } + + bool PublishAdapter::isPersistent(const framing::FrameSet& f) + { + const framing::BasicHeaderProperties* p = f.getHeaders()->get(); + return p && p->getDeliveryMode() == 2; + } + + std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f) + { + const framing::DeliveryProperties* p = f.getHeaders()->get(); + return p ? p->getRoutingKey() : empty; + } + + std::string TransferAdapter::getExchange(const framing::FrameSet& f) + { + return f.as()->getDestination(); + } + + bool TransferAdapter::isImmediate(const framing::FrameSet&) + { + //TODO: we seem to have lost the immediate flag + return false; + } + + const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f) + { + const framing::MessageProperties* p = f.getHeaders()->get(); + return p ? &(p->getApplicationHeaders()) : 0; + } + + bool TransferAdapter::isPersistent(const framing::FrameSet& f) + { + const framing::DeliveryProperties* p = f.getHeaders()->get(); + return p && p->getDeliveryMode() == 2; + } + +}} Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h Mon Sep 10 01:41:05 2007 @@ -38,68 +38,29 @@ { virtual ~MessageAdapter() {} - virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0; - virtual const std::string& getExchange(const framing::FrameSet& f) = 0; + virtual std::string getRoutingKey(const framing::FrameSet& f) = 0; + virtual std::string getExchange(const framing::FrameSet& f) = 0; virtual bool isImmediate(const framing::FrameSet& f) = 0; - virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0; + virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0; virtual bool isPersistent(const framing::FrameSet& f) = 0; }; struct PublishAdapter : MessageAdapter { - const std::string& getRoutingKey(const framing::FrameSet& f) - { - return f.as()->getRoutingKey(); - } - - const std::string& getExchange(const framing::FrameSet& f) - { - return f.as()->getExchange(); - } - - bool isImmediate(const framing::FrameSet& f) - { - return f.as()->getImmediate(); - } - - const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) - { - return f.getHeaders()->get()->getHeaders(); - } - - bool isPersistent(const framing::FrameSet& f) - { - return f.getHeaders()->get()->getDeliveryMode() == 2; - } + std::string getRoutingKey(const framing::FrameSet& f); + std::string getExchange(const framing::FrameSet& f); + bool isImmediate(const framing::FrameSet& f); + const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); + bool isPersistent(const framing::FrameSet& f); }; struct TransferAdapter : MessageAdapter { - const std::string& getRoutingKey(const framing::FrameSet& f) - { - return f.getHeaders()->get()->getRoutingKey(); - } - - const std::string& getExchange(const framing::FrameSet& f) - { - return f.as()->getDestination(); - } - - bool isImmediate(const framing::FrameSet&) - { - //TODO: we seem to have lost the immediate flag - return false; - } - - const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) - { - return f.getHeaders()->get()->getApplicationHeaders(); - } - - bool isPersistent(const framing::FrameSet& f) - { - return f.getHeaders()->get()->getDeliveryMode() == 2; - } + std::string getRoutingKey(const framing::FrameSet& f); + std::string getExchange(const framing::FrameSet& f); + bool isImmediate(const framing::FrameSet&); + const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f); + bool isPersistent(const framing::FrameSet& f); }; }} Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Sep 10 01:41:05 2007 @@ -22,8 +22,8 @@ #include "Message.h" #include "MessageStore.h" -#include "qpid/Exception.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; using namespace qpid::framing; @@ -46,7 +46,7 @@ checkType(CONTENT_BODY, frame.getBody()->type()); break; default: - throw ConnectionException(504, "Invalid frame sequence for message."); + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); } if (staging) { store->appendContent(*message, frame.castBody()->getData()); @@ -61,13 +61,6 @@ } } -void MessageBuilder::checkType(uint8_t expected, uint8_t actual) -{ - if (expected != actual) { - throw ConnectionException(504, "Invalid frame sequence for message."); - } -} - void MessageBuilder::end() { message.reset(); @@ -80,4 +73,33 @@ message = Message::shared_ptr(new Message(id)); state = METHOD; staging = false; +} + +namespace { + +const std::string HEADER_BODY_S = "HEADER"; +const std::string METHOD_BODY_S = "METHOD"; +const std::string CONTENT_BODY_S = "CONTENT"; +const std::string HEARTBEAT_BODY_S = "HEARTBEAT"; +const std::string UNKNOWN = "unknown"; + +std::string type_str(uint8_t type) +{ + switch(type) { + case METHOD_BODY: return METHOD_BODY_S; + case HEADER_BODY: return HEADER_BODY_S; + case CONTENT_BODY: return CONTENT_BODY_S; + case HEARTBEAT_BODY: return HEARTBEAT_BODY_S; + } + return UNKNOWN; +} + +} + +void MessageBuilder::checkType(uint8_t expected, uint8_t actual) +{ + if (expected != actual) { + throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected " + << type_str(expected) << " got " << type_str(actual) << ")")); + } } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Mon Sep 10 01:41:05 2007 @@ -336,13 +336,13 @@ cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName); } - cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); if (!strategy.delivered) { //TODO:if reject-unroutable, then reject //else route to alternate exchange if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); } } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Mon Sep 10 01:41:05 2007 @@ -58,8 +58,14 @@ bool isRedelivered() const { return redelivered; } void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - const HeaderProperties& getMethodHeaders() const { return *this; } - + framing::AMQHeaderBody getHeader() const + { + framing::AMQHeaderBody header; + BasicHeaderProperties* properties = header.get(true); + BasicHeaderProperties::copy(*properties, *this); + properties->setContentLength(data.size()); + return header; + } //TODO: move this elsewhere (GRS 24/08/2007) void populate(framing::FrameSet& frameset) Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Sep 10 01:41:05 2007 @@ -24,6 +24,7 @@ using namespace qpid::client; using namespace qpid::framing; +using namespace qpid::sys; ConnectionImpl::ConnectionImpl(boost::shared_ptr c) : connector(c) { @@ -38,6 +39,7 @@ void ConnectionImpl::allocated(SessionCore::shared_ptr session) { + Mutex::ScopedLock l(lock); if (sessions.find(session->getId()) != sessions.end()) { throw Exception("Id already in use."); } @@ -46,6 +48,7 @@ void ConnectionImpl::released(SessionCore::shared_ptr session) { + Mutex::ScopedLock l(lock); SessionMap::iterator i = sessions.find(session->getId()); if (i != sessions.end()) { sessions.erase(i); @@ -59,12 +62,7 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) { - uint16_t id = frame.getChannel(); - SessionMap::iterator i = sessions.find(id); - if (i == sessions.end()) { - throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); - } - i->second->handle(frame); + find(frame.getChannel())->handle(frame); } void ConnectionImpl::open(const std::string& host, int port, @@ -93,10 +91,7 @@ void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text) { - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - i->second->closed(code, text); - } - sessions.clear(); + signalClose(code, text); connector->close(); } @@ -114,8 +109,25 @@ void ConnectionImpl::shutdown() { //this indicates that the socket to the server has closed + signalClose(0, "Unexpected socket closure."); +} + +void ConnectionImpl::signalClose(uint16_t code, const std::string& text) +{ + Mutex::ScopedLock l(lock); for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - i->second->closed(0, "Unexpected socket closure."); + Mutex::ScopedUnlock u(lock); + i->second->closed(code, text); } sessions.clear(); +} + +SessionCore::shared_ptr ConnectionImpl::find(uint16_t id) +{ + Mutex::ScopedLock l(lock); + SessionMap::iterator i = sessions.find(id); + if (i == sessions.end()) { + throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); + } + return i->second; } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Mon Sep 10 01:41:05 2007 @@ -25,6 +25,7 @@ #include #include #include "qpid/framing/FrameHandler.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" #include "ConnectionHandler.h" @@ -44,6 +45,7 @@ ConnectionHandler handler; boost::shared_ptr connector; framing::ProtocolVersion version; + sys::Mutex lock; void incoming(framing::AMQFrame& frame); void closed(); @@ -51,6 +53,9 @@ void idleOut(); void idleIn(); void shutdown(); + void signalClose(uint16_t, const std::string&); + SessionCore::shared_ptr find(uint16_t); + public: typedef boost::shared_ptr shared_ptr; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Mon Sep 10 01:41:05 2007 @@ -181,31 +181,28 @@ CompletionTracker::ResultListener l) { SequenceNumber id = send(command, l); - sendContent(dynamic_cast(content.getMethodHeaders()), content.getData()); + sendContent(content); return id; } -void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data) +void ExecutionHandler::sendContent(const MethodContent& content) { - AMQHeaderBody header; - BasicHeaderProperties::copy(*header.get(true), headers); - header.get(true)->setContentLength(data.size()); - AMQFrame h(0, header); - out(h); + AMQFrame header(0, content.getHeader()); + out(header); - u_int64_t data_length = data.length(); + u_int64_t data_length = content.getData().length(); if(data_length > 0){ //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(0, AMQContentBody(data)); + AMQFrame frame(0, AMQContentBody(content.getData())); out(frame); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); + string frag(content.getData().substr(offset, length)); AMQFrame frame(0, AMQContentBody(frag)); out(frame); offset += length; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Mon Sep 10 01:41:05 2007 @@ -59,7 +59,7 @@ void sendCompletion(); - void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data); + void sendContent(const framing::MethodContent&); public: typedef CompletionTracker::ResultListener ResultListener; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Mon Sep 10 01:41:05 2007 @@ -41,6 +41,6 @@ { out << "content (" << size() << " bytes)"; #ifndef NDEBUG - out << data.substr(0,10); + out << " " << data.substr(0,10); #endif } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Mon Sep 10 01:41:05 2007 @@ -56,17 +56,17 @@ const AMQMethodBody* FrameSet::getMethod() const { - return parts.empty() ? 0 : dynamic_cast(parts[0].getBody()); + return parts.empty() ? 0 : parts[0].getMethod(); } const AMQHeaderBody* FrameSet::getHeaders() const { - return parts.size() < 2 ? 0 : dynamic_cast(parts[1].getBody()); + return parts.size() < 2 ? 0 : parts[1].castBody(); } AMQHeaderBody* FrameSet::getHeaders() { - return parts.size() < 2 ? 0 : dynamic_cast(parts[1].getBody()); + return parts.size() < 2 ? 0 : parts[1].castBody(); } uint64_t FrameSet::getContentSize() const @@ -80,4 +80,11 @@ { AccumulateContent accumulator(out); map_if(accumulator, TypeFilter(CONTENT_BODY)); +} + +std::string FrameSet::getContent() const +{ + std::string out; + getContent(out); + return out; } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Mon Sep 10 01:41:05 2007 @@ -48,6 +48,7 @@ uint64_t getContentSize() const; void getContent(std::string&) const; + std::string getContent() const; const AMQMethodBody* getMethod() const; const AMQHeaderBody* getHeaders() const; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h Mon Sep 10 01:41:05 2007 @@ -21,7 +21,8 @@ #ifndef _MethodContent_ #define _MethodContent_ -#include "HeaderProperties.h" +#include +#include "AMQHeaderBody.h" namespace qpid { namespace framing { @@ -31,7 +32,7 @@ public: virtual ~MethodContent() {} //TODO: rethink this interface - virtual const HeaderProperties& getMethodHeaders() const = 0; + virtual AMQHeaderBody getHeader() const = 0; virtual const std::string& getData() const = 0; }; Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=574176&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Mon Sep 10 01:41:05 2007 @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "TransferContent.h" + +namespace qpid { +namespace framing { + +TransferContent::TransferContent(const std::string& _data) +{ + setData(_data); +} + +AMQHeaderBody TransferContent::getHeader() const +{ + return header; +} + +const std::string& TransferContent::getData() const +{ + return data; +} + +void TransferContent::setData(const std::string& _data) +{ + data = _data; + header.get(true)->setContentLength(data.size()); +} + +void TransferContent::appendData(const std::string& _data) +{ + data += _data; + header.get(true)->setContentLength(data.size()); +} + +MessageProperties& TransferContent::getMessageProperties() +{ + return *header.get(true); +} + +DeliveryProperties& TransferContent::getDeliveryProperties() +{ + return *header.get(true); +} + +}} Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h?rev=574176&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h Mon Sep 10 01:41:05 2007 @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _TransferContent_ +#define _TransferContent_ + +#include "MethodContent.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/DeliveryProperties.h" + +namespace qpid { +namespace framing { + +class TransferContent : public MethodContent +{ + AMQHeaderBody header; + std::string data; +public: + TransferContent(const std::string& data); + AMQHeaderBody getHeader() const; + void setData(const std::string&); + void appendData(const std::string&); + const std::string& getData() const; + MessageProperties& getMessageProperties(); + DeliveryProperties& getDeliveryProperties(); +}; + +}} +#endif Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=574176&r1=574175&r2=574176&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Sep 10 01:41:05 2007 @@ -22,6 +22,7 @@ #include "qpid_test_plugin.h" #include "InProcessBroker.h" #include "qpid/client/Session.h" +#include "qpid/framing/TransferContent.h" using namespace qpid::client; using namespace qpid::framing; @@ -29,7 +30,8 @@ class ClientSessionTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ClientSessionTest); - CPPUNIT_TEST(testQueueQuery);; + CPPUNIT_TEST(testQueueQuery); + CPPUNIT_TEST(testTransfer); CPPUNIT_TEST_SUITE_END(); boost::shared_ptr broker; @@ -55,14 +57,24 @@ CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange()); } - void testCompletion() + void testTransfer() { std::string queue("my-queue"); std::string dest("my-dest"); + std::string data("my message"); session.queueDeclare(0, queue, "", false, false, true, true, FieldTable()); - //subcribe to the queue with confirm_mode = 1 + //subcribe to the queue with confirm_mode = 1: session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable()); - //publish some messages + //publish a message: + TransferContent content(data); + content.getDeliveryProperties().setRoutingKey("my-queue"); + session.messageTransfer(0, "", 0, 0, content); + //get & test the message: + FrameSet::shared_ptr msg = session.get(); + CPPUNIT_ASSERT(msg->isA()); + CPPUNIT_ASSERT_EQUAL(data, msg->getContent()); + //confirm receipt: + session.execution().completed(msg->getId(), true, true); } };