qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1537889 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/broker/amqp/ qpid/broker/amqp_0_10/ tests/legacystore/
Date Fri, 01 Nov 2013 10:33:31 GMT
Author: gsim
Date: Fri Nov  1 10:33:30 2013
New Revision: 1537889

URL: http://svn.apache.org/r1537889
Log:
QPID-5284: ensure timestamp is added to the data that is persisted

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
    qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Nov  1 10:33:30 2013
@@ -121,14 +121,9 @@ void Message::clearTrace()
     annotationsChanged();
 }
 
-void Message::setTimestamp()
-{
-    timestamp = ::time(0);   // AMQP-0.10: posix time_t - secs since Epoch
-}
-
 uint64_t Message::getTimestamp() const
 {
-    return timestamp;
+    return encoding ? encoding->getTimestamp() : 0;
 }
 
 uint64_t Message::getTtl() const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Nov  1 10:33:30 2013
@@ -77,6 +77,7 @@ public:
         virtual std::string getContent() const = 0;
         virtual void processProperties(qpid::amqp::MapHandler&) const = 0;
         virtual std::string getUserId() const = 0;
+        virtual uint64_t getTimestamp() const = 0;
     };
 
     QPID_BROKER_EXTERN Message(boost::intrusive_ptr<Encoding>, boost::intrusive_ptr<PersistableMessage>);
@@ -106,8 +107,6 @@ public:
     uint64_t getTtl() const;
     QPID_BROKER_EXTERN bool getTtl(uint64_t&) const;
 
-    /** set the timestamp delivery property to the current time-of-day */
-    QPID_BROKER_EXTERN void setTimestamp();
     QPID_BROKER_EXTERN uint64_t getTimestamp() const;
 
     QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant&
value);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Nov  1 10:33:30 2013
@@ -388,7 +388,7 @@ bool SemanticStateConsumerImpl::deliver(
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
     if (sync) deliveryCount = 0;//reset
 
-    record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(),
msg.getTimestamp(),
+    record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(),
                                          ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE,
                                          acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED,
                                          msg.getAnnotations(),

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Nov  1 10:33:30 2013
@@ -218,7 +218,7 @@ void SessionState::handleContent(AMQFram
         }
         DeliverableMessage deliverable(Message(msg, msg), semanticState.getTxBuffer());
         if (broker.isTimestamping())
-            deliverable.getMessage().setTimestamp();
+            msg->setTimestamp();
         deliverable.getMessage().setPublisher(getConnection());
 
 
@@ -296,7 +296,7 @@ void SessionState::handleOut(AMQFrame& f
 }
 
 DeliveryId SessionState::deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
-                                 const std::string& destination, bool isRedelivered,
uint64_t ttl, uint64_t timestamp,
+                                 const std::string& destination, bool isRedelivered,
uint64_t ttl,
                                  qpid::framing::message::AcceptMode acceptMode, qpid::framing::message::AcquireMode
acquireMode,
                                  const qpid::types::Variant::Map& annotations, bool sync)
 {
@@ -307,7 +307,7 @@ DeliveryId SessionState::deliver(const q
     framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), destination,
acceptMode, acquireMode)));
     method.setEof(false);
     getProxy().getHandler().handle(method);
-    message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, timestamp,
annotations);
+    message.sendHeader(getProxy().getHandler(), maxFrameSize, isRedelivered, ttl, annotations);
     message.sendContent(getProxy().getHandler(), maxFrameSize);
 
     assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved
sendPoint.

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Nov  1 10:33:30 2013
@@ -101,7 +101,7 @@ class SessionState : public qpid::Sessio
     void sendCompletion();
 
     DeliveryId deliver(const qpid::broker::amqp_0_10::MessageTransfer& message,
-                       const std::string& destination, bool isRedelivered, uint64_t ttl,
uint64_t timestamp,
+                       const std::string& destination, bool isRedelivered, uint64_t ttl,
                        qpid::framing::message::AcceptMode, qpid::framing::message::AcquireMode,
                        const qpid::types::Variant::Map& annotations, bool sync);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp Fri Nov  1 10:33:30 2013
@@ -60,6 +60,13 @@ std::string Message::getUserId() const
     return v;
 }
 
+uint64_t Message::getTimestamp() const
+{
+    //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field
+    //TODO: define an annotation for that
+    return 0;
+}
+
 bool Message::isPersistent() const
 {
     return durable && durable.get();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h Fri Nov  1 10:33:30 2013
@@ -51,6 +51,7 @@ class Message : public qpid::broker::Mes
     std::string getContent() const;
     void processProperties(qpid::amqp::MapHandler&) const;
     std::string getUserId() const;
+    uint64_t getTimestamp() const;
     qpid::amqp::MessageId getMessageId() const;
     qpid::amqp::MessageId getCorrelationId() const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Fri Nov  1 10:33:30
2013
@@ -130,6 +130,19 @@ std::string MessageTransfer::getExchange
     return getFrames().as<framing::MessageTransferBody>()->getDestination();
 }
 
+void MessageTransfer::setTimestamp()
+{
+    DeliveryProperties* props = getFrames().getHeaders()->get<DeliveryProperties>(true);
+    time_t now = ::time(0);
+    props->setTimestamp(now);
+}
+
+uint64_t MessageTransfer::getTimestamp() const
+{
+    const DeliveryProperties* props = getProperties<DeliveryProperties>();
+    return props ? props->getTimestamp() : 0;
+}
+
 bool MessageTransfer::requiresAccept() const
 {
     const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();
@@ -174,11 +187,11 @@ void MessageTransfer::sendContent(framin
 class SendHeader
 {
   public:
-    SendHeader(FrameHandler& h, bool r, uint64_t t, uint64_t ts, const qpid::types::Variant::Map&
a) : handler(h), redelivered(r), ttl(t), timestamp(ts), annotations(a) {}
+    SendHeader(FrameHandler& h, bool r, uint64_t t, const qpid::types::Variant::Map&
a) : handler(h), redelivered(r), ttl(t), annotations(a) {}
     void operator()(const AMQFrame& f)
     {
         AMQFrame copy = f;
-        if (redelivered || ttl || timestamp || annotations.size()) {
+        if (redelivered || ttl || annotations.size()) {
             copy.cloneBody();
             if (annotations.size()) {
                 MessageProperties* props =
@@ -188,12 +201,11 @@ class SendHeader
                     props->getApplicationHeaders().set(i->first, qpid::amqp_0_10::translate(i->second));
                 }
             }
-            if (redelivered || ttl || timestamp) {
+            if (redelivered || ttl) {
                 DeliveryProperties* dp =
                     copy.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true);
                 if (ttl) dp->setTtl(ttl);
                 if (redelivered) dp->setRedelivered(redelivered);
-                if (timestamp) dp->setTimestamp(timestamp);
             }
         }
         handler.handle(copy);
@@ -202,15 +214,14 @@ class SendHeader
     FrameHandler& handler;
     bool redelivered;
     uint64_t ttl;
-    uint64_t timestamp;
     const qpid::types::Variant::Map& annotations;
 };
 
 void MessageTransfer::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/,
-                                 bool redelivered, uint64_t ttl, uint64_t timestamp,
+                                 bool redelivered, uint64_t ttl,
                                  const qpid::types::Variant::Map& annotations) const
 {
-    SendHeader f(out, redelivered, ttl, timestamp, annotations);
+    SendHeader f(out, redelivered, ttl, annotations);
     frames.map_if(f, TypeFilter<HEADER_BODY>());
 }
 bool MessageTransfer::isImmediateDeliveryRequired(const qpid::broker::Message& /*message*/)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Fri Nov  1 10:33:30 2013
@@ -55,6 +55,8 @@ class MessageTransfer : public qpid::bro
     std::string getExchangeName() const;
     void processProperties(qpid::amqp::MapHandler&) const;
     std::string getUserId() const;
+    void setTimestamp();
+    uint64_t getTimestamp() const;
 
     bool requiresAccept() const;
     const qpid::framing::SequenceNumber& getCommandId() const;
@@ -92,7 +94,7 @@ class MessageTransfer : public qpid::bro
 
     void clearApplicationHeadersFlag();
     void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize) const;
-    void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered,
uint64_t ttl, uint64_t timestamp, const qpid::types::Variant::Map& annotations) const;
+    void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize, bool redelivered,
uint64_t ttl, const qpid::types::Variant::Map& annotations) const;
 
     void decodeHeader(framing::Buffer& buffer);
     void decodeContent(framing::Buffer& buffer);

Modified: qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h?rev=1537889&r1=1537888&r2=1537889&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h Fri Nov  1 10:33:30 2013
@@ -98,7 +98,7 @@ struct MessageUtils
 
     static void deliver(Message& msg, FrameHandler& h, uint16_t framesize)
     {
-        qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false,
0, 0, qpid::types::Variant::Map());
+        qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false,
0, qpid::types::Variant::Map());
         qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendContent(h, framesize);
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message