qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1597121 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/broker/amqp/ qpid/broker/amqp_0_10/ qpid/management/ tests/
Date Fri, 23 May 2014 16:10:28 GMT
Author: gsim
Date: Fri May 23 16:10:27 2014
New Revision: 1597121

URL: http://svn.apache.org/r1597121
Log:
QPID-5783: Share immutable state between copies of a message. Avoid using memory for annotations
unless actually required.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
    qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri May 23 16:10:27 2014
@@ -46,14 +46,11 @@ using std::string;
 namespace qpid {
 namespace broker {
 
-Message::Message() : deliveryCount(-1), alreadyAcquired(false), publisher(0), expiration(FAR_FUTURE),
timestamp(0),
-                     isManagementMessage(false), replicationId(0)
+Message::Message() : deliveryCount(-1), alreadyAcquired(false), replicationId(0)
 {}
 
-Message::Message(boost::intrusive_ptr<Encoding> e, boost::intrusive_ptr<PersistableMessage>
p)
-    : encoding(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), publisher(0),
-      expiration(FAR_FUTURE), timestamp(0), isManagementMessage(false),
-      replicationId(0)
+Message::Message(boost::intrusive_ptr<SharedState> e, boost::intrusive_ptr<PersistableMessage>
p)
+    : sharedState(e), persistentContext(p), deliveryCount(-1), alreadyAcquired(false), replicationId(0)
 {
     if (persistentContext) persistentContext->setIngressCompletion(e);
 }
@@ -78,7 +75,7 @@ uint64_t Message::getMessageSize() const
 
 boost::intrusive_ptr<AsyncCompletion> Message::getIngressCompletion() const
 {
-    return encoding;
+    return sharedState;
 }
 
 namespace
@@ -106,35 +103,29 @@ void Message::addTraceId(const std::stri
 {
     std::string trace = getEncoding().getAnnotationAsString(X_QPID_TRACE);
     if (trace.empty()) {
-        annotations[X_QPID_TRACE] = id;
+        addAnnotation(X_QPID_TRACE, id);
     } else if (trace.find(id) == std::string::npos) {
         trace += ",";
         trace += id;
-        annotations[X_QPID_TRACE] = trace;
+        addAnnotation(X_QPID_TRACE, trace);
     }
-    annotationsChanged();
 }
 
 void Message::clearTrace()
 {
-    annotations[X_QPID_TRACE] = std::string();
-    annotationsChanged();
+    addAnnotation(X_QPID_TRACE, std::string());
 }
 
 uint64_t Message::getTimestamp() const
 {
-    return encoding ? encoding->getTimestamp() : 0;
+    return sharedState ? sharedState->getTimestamp() : 0;
 }
 
 uint64_t Message::getTtl() const
 {
     uint64_t ttl;
-    if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
-        sys::AbsTime current(
-            expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
-        sys::Duration ttl(current, getExpiration());
-        // convert from ns to ms; set to 1 if expired
-        return (int64_t(ttl) >= 1000000 ? int64_t(ttl)/1000000 : 1);
+    if (getTtl(ttl, 1)/*set to 1 if expired*/) {
+        return ttl;
     } else {
         return 0;
     }
@@ -142,35 +133,24 @@ uint64_t Message::getTtl() const
 
 bool Message::getTtl(uint64_t& ttl) const
 {
-    if (encoding->getTtl(ttl) && expiration < FAR_FUTURE) {
-        sys::Duration remaining(sys::AbsTime::now(), getExpiration());
-        // convert from ns to ms; set to 0 if expired
-        ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : 0);
-        return true;
-    } else {
-        return false;
-    }
+    return getTtl(ttl, 0); //set to 0 if expired
 }
 
-void Message::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& e)
+bool Message::getTtl(uint64_t& ttl, uint64_t expiredValue) const
 {
-    //TODO: this is still quite 0-10 specific...
-    uint64_t ttl;
-    if (getEncoding().getTtl(ttl)) {
-        if (e) {
-            // Use higher resolution time for the internal expiry calculation.
-            // Prevent overflow as a signed int64_t
-            Duration duration(std::min(ttl * TIME_MSEC,
-                                       (uint64_t) std::numeric_limits<int64_t>::max()));
-            expiration = AbsTime(e->getCurrentTime(), duration);
-            setExpiryPolicy(e);
-        }
+    if (sharedState->getTtl(ttl) && sharedState->getExpiration() < FAR_FUTURE)
{
+        sys::Duration remaining = sharedState->getTimeToExpiration();
+        // convert from ns to ms
+        ttl = (int64_t(remaining) >= 1000000 ? int64_t(remaining)/1000000 : expiredValue);
+        return true;
+    } else {
+        return false;
     }
 }
 
 void Message::addAnnotation(const std::string& key, const qpid::types::Variant& value)
 {
-    annotations[key] = value;
+    annotations.get()[key] = value;
     annotationsChanged();
 }
 
@@ -178,19 +158,15 @@ void Message::annotationsChanged()
 {
     if (persistentContext) {
         uint64_t id = persistentContext->getPersistenceId();
-        persistentContext = persistentContext->merge(annotations);
-        persistentContext->setIngressCompletion(encoding);
+        persistentContext = persistentContext->merge(getAnnotations());
+        persistentContext->setIngressCompletion(sharedState);
         persistentContext->setPersistenceId(id);
     }
 }
 
-void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
-    expiryPolicy = e;
-}
-
 bool Message::hasExpired() const
 {
-    return expiryPolicy && expiryPolicy->hasExpired(*this);
+    return sharedState->hasExpired(*this);
 }
 
 uint8_t Message::getPriority() const
@@ -198,12 +174,12 @@ uint8_t Message::getPriority() const
     return getEncoding().getPriority();
 }
 
-bool Message::getIsManagementMessage() const { return isManagementMessage; }
-void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
+bool Message::getIsManagementMessage() const { return sharedState->getIsManagementMessage();
}
 
-const Connection* Message::getPublisher() const { return publisher; }
-void Message::setPublisher(const Connection& p) { publisher = &p; }
-bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher
&& token->isLocal(publisher); }
+const Connection* Message::getPublisher() const { return sharedState->getPublisher();
}
+bool Message::isLocalTo(const OwnershipToken* token) const {
+    return token && sharedState->getPublisher() && token->isLocal(sharedState->getPublisher());
+}
 
 
 qpid::framing::SequenceNumber Message::getSequence() const
@@ -223,16 +199,20 @@ void Message::setState(MessageState s)
 {
     state = s;
 }
+namespace {
+const qpid::types::Variant::Map EMPTY_MAP;
+}
 
 const qpid::types::Variant::Map& Message::getAnnotations() const
 {
-    return annotations;
+    return annotations ? *annotations : EMPTY_MAP;
 }
 
 qpid::types::Variant Message::getAnnotation(const std::string& key) const
 {
-    qpid::types::Variant::Map::const_iterator i = annotations.find(key);
-    if (i != annotations.end()) return i->second;
+    const qpid::types::Variant::Map& a = getAnnotations();
+    qpid::types::Variant::Map::const_iterator i = a.find(key);
+    if (i != a.end()) return i->second;
     //FIXME: modify Encoding interface to allow retrieval of
     //annotations of different types from the message data as received
     //off the wire
@@ -241,30 +221,30 @@ qpid::types::Variant Message::getAnnotat
 
 std::string Message::getUserId() const
 {
-    return encoding->getUserId();
+    return sharedState->getUserId();
 }
 
-Message::Encoding& Message::getEncoding()
+Message::SharedState& Message::getSharedState()
 {
-    return *encoding;
+    return *sharedState;
 }
 const Message::Encoding& Message::getEncoding() const
 {
-    return *encoding;
+    return *sharedState;
 }
 Message::operator bool() const
 {
-    return !!encoding;
+    return !!sharedState;
 }
 
 std::string Message::getContent() const
 {
-    return encoding->getContent();
+    return sharedState->getContent();
 }
 
 std::string Message::getPropertyAsString(const std::string& key) const
 {
-    return encoding->getPropertyAsString(key);
+    return sharedState->getPropertyAsString(key);
 }
 namespace {
 class PropertyRetriever : public MapHandler
@@ -308,7 +288,7 @@ class PropertyRetriever : public MapHand
 qpid::types::Variant Message::getProperty(const std::string& key) const
 {
     PropertyRetriever r(key);
-    encoding->processProperties(r);
+    sharedState->processProperties(r);
     return r.getResult();
 }
 
@@ -319,12 +299,79 @@ boost::intrusive_ptr<PersistableMessage>
 
 void Message::processProperties(MapHandler& handler) const
 {
-    encoding->processProperties(handler);
+    sharedState->processProperties(handler);
 }
 
 uint64_t Message::getReplicationId() const { return replicationId; }
 
 void Message::setReplicationId(framing::SequenceNumber id) { replicationId = id; }
 
+sys::AbsTime Message::getExpiration() const
+{
+    return sharedState->getExpiration();
+}
+
+Message::SharedStateImpl::SharedStateImpl() : publisher(0), expiration(qpid::sys::FAR_FUTURE),
isManagementMessage(false) {}
+
+const Connection* Message::SharedStateImpl::getPublisher() const
+{
+    return publisher;
+}
+
+void Message::SharedStateImpl::setPublisher(const Connection* p)
+{
+    publisher = p;
+}
+
+sys::AbsTime Message::SharedStateImpl::getExpiration() const
+{
+    return expiration;
+}
+
+void Message::SharedStateImpl::setExpiration(sys::AbsTime e)
+{
+    expiration = e;
+}
+
+sys::Duration Message::SharedStateImpl::getTimeToExpiration() const
+{
+    sys::AbsTime current(expiryPolicy ? expiryPolicy->getCurrentTime() : sys::AbsTime::now());
+    return sys::Duration(current, expiration);
+}
+
+void Message::SharedStateImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&
e)
+{
+    //TODO: this is still quite 0-10 specific...
+    uint64_t ttl;
+    if (getTtl(ttl)) {
+        if (e) {
+            // Use higher resolution time for the internal expiry calculation.
+            // Prevent overflow as a signed int64_t
+            Duration duration(std::min(ttl * TIME_MSEC,
+                                       (uint64_t) std::numeric_limits<int64_t>::max()));
+            expiration = AbsTime(e->getCurrentTime(), duration);
+            expiryPolicy = e;
+        }
+    }
+}
+
+bool Message::SharedStateImpl::hasExpired(const Message& m) const
+{
+    return expiryPolicy && expiryPolicy->hasExpired(m);
+}
+
+void Message::SharedStateImpl::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>&
e)
+{
+    expiryPolicy = e;
+}
+
+bool Message::SharedStateImpl::getIsManagementMessage() const
+{
+    return isManagementMessage;
+}
+void Message::SharedStateImpl::setIsManagementMessage(bool b)
+{
+    isManagementMessage = b;
+}
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri May 23 16:10:27 2014
@@ -35,6 +35,7 @@
 #include <string>
 #include <vector>
 #include <boost/intrusive_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
 
 namespace qpid {
 namespace amqp {
@@ -80,7 +81,46 @@ public:
         virtual uint64_t getTimestamp() const = 0;
     };
 
-    QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>);
+    class SharedState : public Encoding
+    {
+      public:
+        virtual ~SharedState() {}
+        virtual const Connection* getPublisher() const = 0;
+        virtual void setPublisher(const Connection* p) = 0;
+
+        virtual void setExpiration(sys::AbsTime e) = 0;
+        virtual sys::AbsTime getExpiration() const = 0;
+        virtual sys::Duration getTimeToExpiration() const = 0;
+        virtual void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&)
= 0;
+        virtual bool hasExpired(const Message& m) const = 0;
+        virtual void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>&
e) = 0;
+
+        virtual bool getIsManagementMessage() const = 0;
+        virtual void setIsManagementMessage(bool b) = 0;
+    };
+
+    class SharedStateImpl : public SharedState
+    {
+        const Connection* publisher;
+        qpid::sys::AbsTime expiration;
+        boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+        bool isManagementMessage;
+      public:
+        SharedStateImpl();
+        virtual ~SharedStateImpl() {}
+        QPID_BROKER_EXTERN const Connection* getPublisher() const;
+        QPID_BROKER_EXTERN void setPublisher(const Connection* p);
+        QPID_BROKER_EXTERN void setExpiration(sys::AbsTime e);
+        QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
+        QPID_BROKER_EXTERN sys::Duration getTimeToExpiration() const;
+        QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&
e);
+        QPID_BROKER_EXTERN bool hasExpired(const Message& m) const;
+        QPID_BROKER_EXTERN void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>&
e);
+        QPID_BROKER_EXTERN bool getIsManagementMessage() const;
+        QPID_BROKER_EXTERN void setIsManagementMessage(bool b);
+    };
+
+    QPID_BROKER_EXTERN Message(boost::intrusive_ptr<SharedState>, boost::intrusive_ptr<PersistableMessage>);
     QPID_BROKER_EXTERN Message();
     QPID_BROKER_EXTERN ~Message();
 
@@ -91,20 +131,14 @@ public:
     int getDeliveryCount() const { return deliveryCount; }
     void resetDeliveryCount() { deliveryCount = -1; alreadyAcquired = false; }
 
-    QPID_BROKER_EXTERN void setPublisher(const Connection& p);
     const Connection* getPublisher() const;
     bool isLocalTo(const OwnershipToken*) const;
 
     QPID_BROKER_EXTERN std::string getRoutingKey() const;
     QPID_BROKER_EXTERN bool isPersistent() const;
 
-    /** determine msg expiration time using the TTL value if present */
-    QPID_BROKER_EXTERN void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&
e);
-    void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
-
     bool hasExpired() const;
-    sys::AbsTime getExpiration() const { return expiration; }
-    void setExpiration(sys::AbsTime exp) { expiration = exp; }
+    QPID_BROKER_EXTERN sys::AbsTime getExpiration() const;
     uint64_t getTtl() const;
     QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
 
@@ -121,12 +155,11 @@ public:
 
     QPID_BROKER_EXTERN uint64_t getMessageSize() const;
 
-    QPID_BROKER_EXTERN Encoding& getEncoding();
     QPID_BROKER_EXTERN const Encoding& getEncoding() const;
     QPID_BROKER_EXTERN operator bool() const;
+    QPID_BROKER_EXTERN SharedState& getSharedState();
 
     bool getIsManagementMessage() const;
-    void setIsManagementMessage(bool b);
 
     QPID_BROKER_EXTERN qpid::framing::SequenceNumber getSequence() const;
     QPID_BROKER_EXTERN void setSequence(const qpid::framing::SequenceNumber&);
@@ -146,21 +179,51 @@ public:
     QPID_BROKER_EXTERN void setReplicationId(framing::SequenceNumber id);
 
   private:
-    boost::intrusive_ptr<Encoding> encoding;
+    /**
+     * Template for optional members that are only constructed when
+     * if/when needed, to conserve memory. (Boost::optional doesn't
+     * help here).
+     */
+    template <typename T> class Optional
+    {
+        boost::scoped_ptr<T> value;
+      public:
+        Optional() : value(0) {}
+        Optional(const Optional<T>& o) : value(o.value ? new T(*o.value) : 0) {}
+        T& get()
+        {
+            if (!value) value.reset(new T);
+            return *value;
+        }
+        const T& operator*() const
+        {
+            return *value;
+        }
+        Optional<T>& operator=(const Optional<T>& o)
+        {
+            if (o.value) {
+                if (!value) value.reset(new T(*o.value));
+            }
+            return *this;
+        }
+        operator bool() const
+        {
+            return value;
+        }
+    };
+
+
+    boost::intrusive_ptr<SharedState> sharedState;
     boost::intrusive_ptr<PersistableMessage> persistentContext;
     int deliveryCount;
     bool alreadyAcquired;
-    const Connection* publisher;
-    qpid::sys::AbsTime expiration;
-    boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    uint64_t timestamp;
-    qpid::types::Variant::Map annotations;
-    bool isManagementMessage;
+    Optional<qpid::types::Variant::Map> annotations;
     MessageState state;
     qpid::framing::SequenceNumber sequence;
     framing::SequenceNumber replicationId;
 
     void annotationsChanged();
+    bool getTtl(uint64_t&, uint64_t expiredValue) const;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/PagedQueue.cpp Fri May 23 16:10:27 2014
@@ -78,8 +78,8 @@ size_t decode(ProtocolRegistry& protocol
     msg.getPersistentContext()->setPersistenceId(persistenceId);
     if (t) {
         sys::AbsTime expiration(EPOCH, t);
-        msg.setExpiryPolicy(expiryPolicy);
-        msg.setExpiration(expiration);
+        msg.getSharedState().setExpiryPolicy(expiryPolicy);
+        msg.getSharedState().setExpiration(expiration);
     }
     return encoded + metadata.getPosition();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Fri May 23 16:10:27 2014
@@ -185,7 +185,7 @@ void RecoverableMessageImpl::setRedelive
 
 void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>&
ep)
 {
-    msg.computeExpiration(ep);
+    msg.getSharedState().computeExpiration(ep);
 }
 
 Message RecoverableMessageImpl::getMessage()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri May 23 16:10:27 2014
@@ -482,8 +482,6 @@ TxBuffer* SemanticState::getTxBuffer()
 }
 
 void SemanticState::route(Message& msg, Deliverable& strategy) {
-    msg.computeExpiration(getSession().getBroker().getExpiryPolicy());
-
     std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName();
     if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed())
         cacheExchange = session.getBroker().getExchanges().get(exchangeName);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri May 23 16:10:27 2014
@@ -219,7 +219,8 @@ void SessionState::handleContent(AMQFram
         DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
         if (broker.isTimestamping())
             msg->setTimestamp();
-        deliverable.getMessage().setPublisher(getConnection());
+        msg->setPublisher(&(getConnection()));
+        msg->computeExpiration(getBroker().getExpiryPolicy());
 
 
         IncompleteIngressMsgXfer xfer(this, msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp Fri May 23 16:10:27 2014
@@ -134,11 +134,11 @@ void DecodingIncoming::readable(pn_deliv
 
         received->scan();
         pn_link_advance(link);
+        received->setPublisher(&session->getParent());
+        received->computeExpiration(expiryPolicy);
 
         qpid::broker::Message message(received, received);
-        message.setPublisher(session->getParent());
         userid.verify(message.getUserId());
-        message.computeExpiration(expiryPolicy);
         handle(message);
         --window;
         received->begin();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h Fri May 23 16:10:27 2014
@@ -37,7 +37,7 @@ namespace amqp {
 /**
  * Represents an AMQP 1.0 format message
  */
-class Message : public qpid::broker::Message::Encoding, private qpid::amqp::MessageReader,
public qpid::broker::PersistableMessage
+class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amqp::MessageReader,
public qpid::broker::PersistableMessage
 {
   public:
     //Encoding interface:

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Fri May 23 16:10:27 2014
@@ -35,7 +35,7 @@ namespace amqp_0_10 {
 /**
  *
  */
-class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::broker::PersistableMessage
+class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qpid::broker::PersistableMessage
 {
   public:
     QPID_BROKER_EXTERN MessageTransfer();
@@ -116,7 +116,7 @@ class MessageTransfer : public qpid::bro
 
     static bool isImmediateDeliveryRequired(const qpid::broker::Message& message);
     static MessageTransfer& get(qpid::broker::Message& message) {
-        return *dynamic_cast<MessageTransfer*>(&message.getEncoding());
+        return *dynamic_cast<MessageTransfer*>(&message.getSharedState());
     }
     static const MessageTransfer& get(const qpid::broker::Message& message) {
         return *dynamic_cast<const MessageTransfer*>(&message.getEncoding());

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Fri May 23 16:10:27 2014
@@ -565,8 +565,8 @@ void ManagementAgent::sendBuffer(Buffer&
     dp->setRoutingKey(routingKey);
 
     transfer->getFrames().append(content);
+    transfer->setIsManagementMessage(true);
     Message msg(transfer, transfer);
-    msg.setIsManagementMessage(true);
     sendQueue->push(make_pair(exchange, msg));
     buf.reset();
 }
@@ -632,9 +632,9 @@ void ManagementAgent::sendBuffer(const s
     }
     transfer->getFrames().append(content);
     transfer->computeRequiredCredit();
+    transfer->setIsManagementMessage(true);
+    transfer->computeExpiration(broker->getExpiryPolicy());
     Message msg(transfer, transfer);
-    msg.setIsManagementMessage(true);
-    msg.computeExpiration(broker->getExpiryPolicy());
 
     sendQueue->push(make_pair(exchange, msg));
 }

Modified: qpid/trunk/qpid/cpp/src/tests/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessageUtils.h?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessageUtils.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessageUtils.h Fri May 23 16:10:27 2014
@@ -107,6 +107,7 @@ struct MessageUtils
             AMQFrame data((AMQContentBody(content)));
             msg->getFrames().append(data);
         }
+        if (ttl) msg->computeExpiration(new broker::ExpiryPolicy);
         return Message(msg, msg);
     }
 };

Modified: qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=1597121&r1=1597120&r2=1597121&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri May 23 16:10:27 2014
@@ -188,7 +188,6 @@ void addMessagesToQueue(uint count, Queu
 {
     for (uint i = 0; i < count; i++) {
         Message m = MessageUtils::createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
-        m.computeExpiration(new broker::ExpiryPolicy);
         queue.deliver(m);
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message