qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1694094 - in /qpid/trunk/qpid/cpp/src/qpid: broker/MessageBuilder.cpp broker/MessageBuilder.h broker/SessionState.h ha/QueueReplicator.cpp
Date Tue, 04 Aug 2015 18:38:38 GMT
Author: aconway
Date: Tue Aug  4 18:38:38 2015
New Revision: 1694094

URL: http://svn.apache.org/r1694094
Log:
QPID-6577: HA - backup broker messages are larger than primary messages.

Under the 0-10 protocol (used by HA) brokers add an "exchange" property to each
message for the exchange the message arrived on .This is different (and
sometimes longer) on the backup brokers from the primary since on the backups
the message arrives on a special replication exchange. 

This fixes backup brokers to not modify the exchange property on messages.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Tue Aug  4 18:38:38 2015
@@ -37,7 +37,7 @@ namespace
     const std::string QPID_MANAGEMENT("qpid.management");
 }
 
-MessageBuilder::MessageBuilder() : state(DORMANT) {}
+MessageBuilder::MessageBuilder() : state(DORMANT), copyExchange(true) {}
 
 void MessageBuilder::handle(AMQFrame& frame)
 {
@@ -60,7 +60,10 @@ void MessageBuilder::handle(AMQFrame& fr
             header.setEof(false);
             message->getFrames().append(header);
         } else if (type == HEADER_BODY) {
-            frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->setExchange(exchange);
+            if (copyExchange) {
+                frame.castBody<AMQHeaderBody>()->get<DeliveryProperties>(true)->
+                    setExchange(exchange);
+            }
         } else {
             throw CommandInvalidException(
                 QPID_MSG("Invalid frame sequence for message, expected header or content
got "

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Tue Aug  4 18:38:38 2015
@@ -43,11 +43,14 @@ namespace qpid {
             boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> getMessage();
             QPID_BROKER_EXTERN void start(const framing::SequenceNumber& id);
             void end();
+            void setCopyExchange(bool value) { copyExchange = value; }
+
         private:
             enum State {DORMANT, METHOD, HEADER, CONTENT};
             State state;
             boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> message;
             std::string exchange;
+            bool copyExchange;
 
             void checkType(uint8_t expected, uint8_t actual);
         };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Aug  4 18:38:38 2015
@@ -147,6 +147,9 @@ class SessionState : public qpid::Sessio
     /** Send result and completion for a given command to the client. */
     void completeCommand(SequenceNumber id, bool requiresAccept, bool requiresSync,
                          const std::string& result);
+
+    MessageBuilder& getMessageBuilder() { return msgBuilder; }
+
   private:
     void handleCommand(framing::AMQMethodBody* method);
     void handleContent(framing::AMQFrame& frame);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1694094&r1=1694093&r2=1694094&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Aug  4 18:38:38 2015
@@ -35,6 +35,7 @@
 #include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionState.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/FieldValue.h"
 #include "qpid/log/Statement.h"
@@ -248,6 +249,10 @@ void QueueReplicator::initializeBridge(B
     Mutex::ScopedLock l(lock);
     if (!queue) return;         // Already destroyed
     sessionHandler = &sessionHandler_;
+    if (sessionHandler->getSession()) {
+        // Don't overwrite the exchange property set on the primary.
+        sessionHandler->getSession()->getMessageBuilder().setCopyExchange(false);
+    }
     AMQP_ServerProxy peer(sessionHandler->out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     FieldTable arguments;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message