qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r480087 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/ test/unit/qpid/broker/
Date Tue, 28 Nov 2006 15:25:36 GMT
Author: gsim
Date: Tue Nov 28 07:25:35 2006
New Revision: 480087

URL: http://svn.apache.org/viewvc?view=rev&rev=480087
Log:
Modifications to allow loading of message data in chunks, refragmentation of messages, plus some related refactoring and tests.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h   (with props)
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h Tue Nov 28 07:25:35 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Content_
+#define _Content_
+
+#include <qpid/framing/AMQContentBody.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/OutputHandler.h>
+
+namespace qpid {
+    namespace broker {
+        class Content{
+        public:
+            virtual void add(qpid::framing::AMQContentBody::shared_ptr data) = 0;
+            virtual u_int32_t size() = 0;
+            virtual void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize) = 0;
+            virtual void encode(qpid::framing::Buffer& buffer) = 0;
+            virtual ~Content(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp Tue Nov 28 07:25:35 2006
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/InMemoryContent.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using boost::static_pointer_cast;
+
+void InMemoryContent::add(AMQContentBody::shared_ptr data)
+{
+    content.push_back(data);
+}
+
+u_int32_t InMemoryContent::size()
+{
+    int sum(0);
+    for (content_iterator i = content.begin(); i != content.end(); i++) {
+        sum += (*i)->size() + 8;//8 extra bytes for the frame
+        //TODO: have to get rid of the frame stuff from encoded data
+    }
+    return sum;
+}
+
+void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t framesize)
+{
+    for (content_iterator i = content.begin(); i != content.end(); i++) {
+        if ((*i)->size() > framesize) {
+            u_int32_t offset = 0;
+            for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
+                string data = (*i)->getData().substr(offset, framesize);
+                out->send(new AMQFrame(channel, new AMQContentBody(data)));                
+                offset += framesize;
+            }
+            u_int32_t remainder = (*i)->size() % framesize;
+            if (remainder) {
+                string data = (*i)->getData().substr(offset, remainder);
+                out->send(new AMQFrame(channel, new AMQContentBody(data)));                
+            }
+        } else {
+            AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+            out->send(new AMQFrame(channel, contentBody));
+        }
+    }
+}
+
+void InMemoryContent::encode(Buffer& buffer)
+{
+    for (content_iterator i = content.begin(); i != content.end(); i++) {
+        (*i)->encode(buffer);
+    }        
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h Tue Nov 28 07:25:35 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _InMemoryContent_
+#define _InMemoryContent_
+
+#include <qpid/broker/Content.h>
+#include <vector>
+
+namespace qpid {
+    namespace broker {
+        class InMemoryContent : public Content{
+            typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+            typedef content_list::iterator content_iterator;
+
+            content_list content;
+        public:
+            void add(qpid::framing::AMQContentBody::shared_ptr data);
+            u_int32_t size();
+            void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+            void encode(qpid::framing::Buffer& buffer);
+            ~InMemoryContent(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp Tue Nov 28 07:25:35 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/LazyLoadedContent.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t _msgId, u_int64_t _expectedSize) : 
+    store(_store), msgId(_msgId), expectedSize(_expectedSize) {}
+
+void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+{
+    store->appendContent(msgId, data->getData());
+}
+
+u_int32_t LazyLoadedContent::size()
+{
+    return 0;//all content is written as soon as it is added
+}
+
+void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t framesize)
+{
+    if (expectedSize > framesize) {        
+        for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) {            
+            u_int64_t remaining = expectedSize - offset;
+            string data;
+            store->loadContent(msgId, data, offset, remaining > framesize ? framesize : remaining);              
+            out->send(new AMQFrame(channel, new AMQContentBody(data)));
+        }
+    } else {
+        string data;
+        store->loadContent(msgId, data, 0, expectedSize);  
+        out->send(new AMQFrame(channel, new AMQContentBody(data)));
+    }
+}
+
+void LazyLoadedContent::encode(Buffer&)
+{
+    //do nothing as all content is written as soon as it is added 
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h Tue Nov 28 07:25:35 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LazyLoadedContent_
+#define _LazyLoadedContent_
+
+#include <qpid/broker/Content.h>
+#include <qpid/broker/MessageStore.h>
+
+namespace qpid {
+    namespace broker {
+        class LazyLoadedContent : public Content{
+            MessageStore* const store;
+            const u_int64_t msgId;
+            const u_int64_t expectedSize;
+        public:
+            LazyLoadedContent(MessageStore* const store, u_int64_t msgId, u_int64_t expectedSize);
+            void add(qpid::framing::AMQContentBody::shared_ptr data);
+            u_int32_t size();
+            void send(qpid::framing::OutputHandler* out, int channel, u_int32_t framesize);
+            void encode(qpid::framing::Buffer& buffer);
+            ~LazyLoadedContent(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Nov 28 07:25:35 2006
@@ -20,6 +20,10 @@
  */
 #include <qpid/broker/Message.h>
 #include <iostream>
+
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid/broker/LazyLoadedContent.h>
+#include <qpid/broker/MessageStore.h>
 // AMQP version change - kpvdr 2006-11-17
 #include <qpid/framing/ProtocolVersion.h>
 #include <qpid/framing/BasicDeliverBody.h>
@@ -40,8 +44,10 @@
                                                      size(0),
                                                      persistenceId(0) {}
 
-Message::Message(Buffer& buffer) : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
-    decode(buffer);
+Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize) : 
+    publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){
+
+    decode(buffer, headersOnly, contentChunkSize);
 }
 
 Message::Message() : publisher(0), mandatory(false), immediate(false), redelivered(false), size(0), persistenceId(0){}
@@ -53,7 +59,10 @@
 }
 
 void Message::addContent(AMQContentBody::shared_ptr data){
-    content.push_back(data);
+    if (!content.get()) {
+        content = std::auto_ptr<Content>(new InMemoryContent());
+    }
+    content->add(data);
     size += data->size();    
 }
 
@@ -68,8 +77,9 @@
 void Message::deliver(OutputHandler* out, int channel, 
                       const string& consumerTag, u_int64_t deliveryTag, 
                       u_int32_t framesize){
-	// AMQP version change - kpvdr 2006-11-17
-	// TODO: Make this class version-aware and link these hard-wired numbers to that version
+
+    // AMQP version change - kpvdr 2006-11-17
+    // TODO: Make this class version-aware and link these hard-wired numbers to that version
     out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, exchange, routingKey)));
     sendContent(out, channel, framesize);
 }
@@ -80,8 +90,8 @@
          u_int64_t deliveryTag, 
          u_int32_t framesize){
 
-	// AMQP version change - kpvdr 2006-11-17
-	// TODO: Make this class version-aware and link these hard-wired numbers to that version
+    // AMQP version change - kpvdr 2006-11-17
+    // TODO: Make this class version-aware and link these hard-wired numbers to that version
     out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0), deliveryTag, redelivered, exchange, routingKey, messageCount)));
     sendContent(out, channel, framesize);
 }
@@ -89,15 +99,8 @@
 void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
     AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
     out->send(new AMQFrame(channel, headerBody));
-    for(content_iterator i = content.begin(); i != content.end(); i++){
-        if((*i)->size() > framesize){
-            //TODO: need to split it
-            std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
-        }else{
-            AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
-            out->send(new AMQFrame(channel, contentBody));
-        }
-    }
+
+    if (content.get()) content->send(out, channel, framesize);
 }
 
 BasicHeaderProperties* Message::getHeaderProperties(){
@@ -115,10 +118,10 @@
     return props && props->getDeliveryMode() == PERSISTENT;
 }
 
-void Message::decode(Buffer& buffer)
+void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
 {
     decodeHeader(buffer);
-    decodeContent(buffer);
+    if (!headersOnly) decodeContent(buffer, contentChunkSize);
 }
 
 void Message::decodeHeader(Buffer& buffer)
@@ -132,15 +135,25 @@
     setHeader(headerBody);
 }
 
-void Message::decodeContent(Buffer& buffer)
+void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
 {    
-    AMQContentBody::shared_ptr contentBody;
-    while (buffer.available()) {
-        AMQFrame contentFrame;
-        contentFrame.decode(buffer);
-        contentBody = dynamic_pointer_cast<AMQContentBody, AMQBody>(contentFrame.getBody());
+    u_int64_t expected = expectedContentSize();
+    if (expected != buffer.available()) {
+        std::cout << "WARN: Expected " << expectedContentSize() << " bytes, got " << buffer.available() << std::endl;
+    }
+
+    if (!chunkSize || chunkSize > expected) {
+        chunkSize = expected;
+    }
+
+    u_int64_t total = 0;
+    while (total < expectedContentSize()) {
+        u_int64_t remaining =  expected - total;
+        AMQContentBody::shared_ptr contentBody(new AMQContentBody());        
+        contentBody->decode(buffer, remaining < chunkSize ? remaining : chunkSize);
         addContent(contentBody);
-    }        
+        total += chunkSize;
+    }
 }
 
 void Message::encode(Buffer& buffer)
@@ -159,15 +172,7 @@
 
 void Message::encodeContent(Buffer& buffer)
 {
-    //Use a frame around each content block. Not really required but
-    //gives some error checking at little expense. Could change in the
-    //future...
-    AMQBody::shared_ptr body;
-    for (content_iterator i = content.begin(); i != content.end(); i++) {
-        body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
-        AMQFrame contentFrame(0, body);
-        contentFrame.encode(buffer);
-    }    
+    if (content.get()) content->encode(buffer);
 }
 
 u_int32_t Message::encodedSize()
@@ -177,11 +182,7 @@
 
 u_int32_t Message::encodedContentSize()
 {
-    int encodedContentSize(0);
-    for (content_iterator i = content.begin(); i != content.end(); i++) {
-        encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
-    }
-    return encodedContentSize;
+    return content.get() ? content->size() : 0;
 }
 
 u_int32_t Message::encodedHeaderSize()
@@ -196,7 +197,15 @@
     return header.get() ? header->getContentSize() : 0;
 }
 
-void Message::releaseContent()
+void Message::releaseContent(MessageStore* store)
+{
+    if (!content.get() || content->size() > 0) {
+        //set content to lazy loading mode (but only if there is stored content):
+        content = std::auto_ptr<Content>(new LazyLoadedContent(store, getPersistenceId(), expectedContentSize()));
+    }
+}
+
+void Message::setContent(std::auto_ptr<Content>& _content)
 {
-    content.clear();
+    content = _content;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Nov 28 07:25:35 2006
@@ -21,8 +21,10 @@
 #ifndef _Message_
 #define _Message_
 
+#include <memory>
 #include <boost/shared_ptr.hpp>
 #include <qpid/broker/ConnectionToken.h>
+#include <qpid/broker/Content.h>
 #include <qpid/broker/TxBuffer.h>
 #include <qpid/framing/AMQContentBody.h>
 #include <qpid/framing/AMQHeaderBody.h>
@@ -32,6 +34,7 @@
 namespace qpid {
     namespace broker {
 
+        class MessageStore;
         using qpid::framing::string;
         /**
          * Represents an AMQP message, i.e. a header body, a list of
@@ -39,9 +42,6 @@
          * request.
          */
         class Message{
-            typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
-            typedef content_list::iterator content_iterator;
-
             const ConnectionToken* const publisher;
             string exchange;
             string routingKey;
@@ -49,7 +49,7 @@
             const bool immediate;
             bool redelivered;
             qpid::framing::AMQHeaderBody::shared_ptr header;
-            content_list content;
+            std::auto_ptr<Content> content;
             u_int64_t size;
             u_int64_t persistenceId;
 
@@ -62,7 +62,7 @@
             Message(const ConnectionToken* const publisher, 
                     const string& exchange, const string& routingKey, 
                     bool mandatory, bool immediate);
-            Message(qpid::framing::Buffer& buffer);
+            Message(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
             Message();
             ~Message();
             void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
@@ -90,9 +90,9 @@
             u_int64_t getPersistenceId() const { return persistenceId; }
             void setPersistenceId(u_int64_t _persistenceId) { persistenceId = _persistenceId; }
 
-            void decode(qpid::framing::Buffer& buffer);
+            void decode(qpid::framing::Buffer& buffer, bool headersOnly = false, u_int32_t contentChunkSize = 0);
             void decodeHeader(qpid::framing::Buffer& buffer);
-            void decodeContent(qpid::framing::Buffer& buffer);
+            void decodeContent(qpid::framing::Buffer& buffer, u_int32_t contentChunkSize = 0);
 
             void encode(qpid::framing::Buffer& buffer);
             void encodeHeader(qpid::framing::Buffer& buffer);
@@ -114,14 +114,22 @@
              */
             u_int32_t encodedContentSize();
             /**
-             * Releases the in-memory content data held by this message.
+             * Releases the in-memory content data held by this
+             * message. Must pass in a store from which the data can
+             * be reloaded.
              */
-            void releaseContent();
+            void releaseContent(MessageStore* store);
             /**
              * If headers have been received, returns the expected
              * content size else returns 0.
              */
             u_int64_t expectedContentSize();
+            /**
+             * Sets the 'content' implementation of this message (the
+             * message controls the lifecycle of the content instance
+             * it uses).
+             */
+            void setContent(std::auto_ptr<Content>& content);
         };
 
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Tue Nov 28 07:25:35 2006
@@ -20,25 +20,23 @@
  */
 #include <qpid/broker/MessageBuilder.h>
 
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid/broker/LazyLoadedContent.h>
+
 using namespace qpid::broker;
 using namespace qpid::framing;
+using std::auto_ptr;
 
 MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* const _store, u_int64_t _stagingThreshold) : 
     handler(_handler),
     store(_store),
-    stagingThreshold(_stagingThreshold),
-    staging(false)
+    stagingThreshold(_stagingThreshold)
 {}
 
 void MessageBuilder::route(){
-    if (staging && store) {
-        store->stage(message);
-        message->releaseContent();
-    }
     if (message->isComplete()) {
         if (handler) handler->complete(message);
         message.reset();
-        staging = false;
     }
 }
 
@@ -54,7 +52,14 @@
         THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
     }
     message->setHeader(header);
-    staging = stagingThreshold && header->getContentSize() >= stagingThreshold;
+    if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
+        store->stage(message);
+        auto_ptr<Content> content(new LazyLoadedContent(store, message->getPersistenceId(), message->expectedContentSize()));
+        message->setContent(content);
+    } else {
+        auto_ptr<Content> content(new InMemoryContent());
+        message->setContent(content);
+    }
     route();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Tue Nov 28 07:25:35 2006
@@ -21,6 +21,7 @@
 #ifndef _MessageBuilder_
 #define _MessageBuilder_
 
+#include <memory>
 #include <qpid/QpidError.h>
 #include <qpid/broker/Exchange.h>
 #include <qpid/broker/Message.h>
@@ -47,7 +48,6 @@
             CompletionHandler* handler;
             MessageStore* const store;
             const u_int64_t stagingThreshold;
-            bool staging;
 
             void route();
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Tue Nov 28 07:25:35 2006
@@ -51,9 +51,9 @@
              * (enqueueing automatically stores the message so this is
              * only required if storage is required prior to that
              * point). If the message has not yet been stored it will
-             * store the headers and any available content. If the
-             * message has already been stored it will append any
-             * currently held content.
+             * store the headers as well as any content passed in. A
+             * persistence id will be set on the message which can be
+             * used to load the content or to append to it.
              */
             virtual void stage(Message::shared_ptr& msg) = 0;
             
@@ -64,6 +64,21 @@
              * is dequeued from all queues it was enqueued onto).
              */
             virtual void destroy(Message::shared_ptr& msg) = 0;
+
+            /**
+             * Appends content to a previously staged message
+             */
+            virtual void appendContent(u_int64_t msgId, const std::string& data) = 0;
+
+            /**
+             * Loads (a section) of content data for the specified
+             * message id (previously set on the message through a
+             * call to stage or enqueue) into data. The offset refers
+             * to the content only (i.e. an offset of 0 implies that
+             * the start of the content should be loaded, not the
+             * headers or related meta-data).
+             */
+            virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length) = 0;
 
             /**
              * Enqueues a message, storing the message if it has not

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Tue Nov 28 07:25:35 2006
@@ -53,6 +53,16 @@
     store->destroy(msg);
 }
 
+void MessageStoreModule::appendContent(u_int64_t msgId, const std::string& data)
+{
+    store->appendContent(msgId, data);
+}
+
+void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t offset, u_int32_t length)
+{
+    store->loadContent(msgId, data, offset, length);
+}
+
 void MessageStoreModule::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid)
 {
     store->enqueue(ctxt, msg, queue, xid);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Tue Nov 28 07:25:35 2006
@@ -41,6 +41,8 @@
             void recover(RecoveryManager& queues);
             void stage(Message::shared_ptr& msg);
             void destroy(Message::shared_ptr& msg);
+            void appendContent(u_int64_t msgId, const std::string& data);
+            void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length);
             void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
             void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
             void committed(const string * const xid);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Tue Nov 28 07:25:35 2006
@@ -34,45 +34,66 @@
 {
     if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
+
 void NullMessageStore::destroy(const Queue& queue)
 {
     if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
+
 void NullMessageStore::recover(RecoveryManager&)
 {
     if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl;
 }
+
 void NullMessageStore::stage(Message::shared_ptr&)
 {
     if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl;
 }
+
 void NullMessageStore::destroy(Message::shared_ptr&)
 {
     if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl;
 }
+
+void NullMessageStore::appendContent(u_int64_t, const string&)
+{
+    if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl;
+}
+
+void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t)
+{
+    if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl;
+}
+
 void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
 {
     if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
+
 void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, const Queue& queue, const string * const)
 {
     if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl;
 }
+
 void NullMessageStore::committed(const string * const)
 {
     if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
 }
+
 void NullMessageStore::aborted(const string * const)
 {
     if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
 }
+
 std::auto_ptr<TransactionContext> NullMessageStore::begin()
 {
     return std::auto_ptr<TransactionContext>();
 }
+
 void NullMessageStore::commit(TransactionContext*)
 {
 }
+
 void NullMessageStore::abort(TransactionContext*)
 {
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Tue Nov 28 07:25:35 2006
@@ -35,18 +35,20 @@
             const bool warn;
         public:
             NullMessageStore(bool warn = true);
-            void virtual create(const Queue& queue);
-            void virtual destroy(const Queue& queue);
-            void virtual recover(RecoveryManager& queues);
-            void virtual stage(Message::shared_ptr& msg);
-            void virtual destroy(Message::shared_ptr& msg);
-            void virtual enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
-            void virtual dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
-            void virtual committed(const string * const xid);
-            void virtual aborted(const string * const xid);
+            virtual void create(const Queue& queue);
+            virtual void destroy(const Queue& queue);
+            virtual void recover(RecoveryManager& queues);
+            virtual void stage(Message::shared_ptr& msg);
+            virtual void destroy(Message::shared_ptr& msg);
+            virtual void appendContent(u_int64_t msgId, const std::string& data);
+            virtual void loadContent(u_int64_t msgId, std::string& data, u_int64_t offset, u_int32_t length);
+            virtual void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            virtual void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            virtual void committed(const string * const xid);
+            virtual void aborted(const string * const xid);
             virtual std::auto_ptr<TransactionContext> begin();
-            void virtual commit(TransactionContext* ctxt);
-            void virtual abort(TransactionContext* ctxt);
+            virtual void commit(TransactionContext* ctxt);
+            virtual void abort(TransactionContext* ctxt);
             ~NullMessageStore(){}
         };
     }

Added: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp Tue Nov 28 07:25:35 2006
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+
+using std::list;
+using std::string;
+using boost::dynamic_pointer_cast;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+    std::vector<AMQFrame*> frames; 
+
+    virtual void send(AMQFrame* frame){
+        frames.push_back(frame);
+    }
+};
+
+class InMemoryContentTest : public CppUnit::TestCase  
+{
+        CPPUNIT_TEST_SUITE(InMemoryContentTest);
+        CPPUNIT_TEST(testRefragmentation);
+        CPPUNIT_TEST_SUITE_END();
+
+public:
+    void testRefragmentation()
+    {
+        {//no remainder
+            string out[] = {"abcde", "fghij", "klmno", "pqrst"};
+            string in[] = {out[0] + out[1], out[2] + out[3]};        
+            refragment(2, in, 4, out);
+        }
+        {//remainder for last frame
+            string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"};
+            string in[] = {out[0] + out[1], out[2] + out[3] + out[4]};        
+            refragment(2, in, 5, out);
+        }
+    }
+
+
+    void refragment(size_t inCount, string* in, size_t outCount, string* out, u_int32_t framesize = 5)
+    {
+        InMemoryContent content;
+        DummyHandler handler;
+        u_int16_t channel = 3;
+
+        addframes(content, inCount, in);
+        content.send(&handler, channel, framesize);         
+        check(handler, channel, outCount, out);
+    }
+
+    void addframes(InMemoryContent& content, size_t frameCount, string* frameData)
+    {
+        for (unsigned int i = 0; i < frameCount; i++) {
+            AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i]));
+            content.add(frame);
+        }
+    }
+
+    void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
+    {
+        CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+
+        for (unsigned int i = 0; i < expectedChunkCount; i++) {
+            AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
+            CPPUNIT_ASSERT(chunk);
+            CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
+            CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+        }
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp Tue Nov 28 07:25:35 2006
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/broker/LazyLoadedContent.h>
+#include <qpid/broker/NullMessageStore.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <sstream>
+
+using std::list;
+using std::string;
+using boost::dynamic_pointer_cast;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+    std::vector<AMQFrame*> frames; 
+
+    virtual void send(AMQFrame* frame){
+        frames.push_back(frame);
+    }
+};
+
+
+class LazyLoadedContentTest : public CppUnit::TestCase  
+{
+        CPPUNIT_TEST_SUITE(LazyLoadedContentTest);
+        CPPUNIT_TEST(testFragmented);
+        CPPUNIT_TEST(testWhole);
+        CPPUNIT_TEST(testHalved);
+        CPPUNIT_TEST_SUITE_END();
+
+    class TestMessageStore : public NullMessageStore
+    {
+        const string content;
+        
+    public:
+        TestMessageStore(const string& _content) : content(_content) {}
+
+        void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t length)
+        {
+            if (offset + length <= content.size()) {
+                data = content.substr(offset, length);
+            } else{
+                std::stringstream error;
+                error << "Invalid segment: offset=" << offset << ", length=" << length << ", content_length=" << content.size();
+                throw qpid::Exception(error.str());
+            }
+        }
+    };
+
+
+public:
+    void testFragmented()
+    {
+        string data = "abcdefghijklmnopqrstuvwxyz";
+        u_int32_t framesize = 5;
+        string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"};
+        load(data, 6, out, framesize);
+    }
+
+    void testWhole()
+    {
+        string data = "abcdefghijklmnopqrstuvwxyz";
+        u_int32_t framesize = 50;
+        string out[] = {data};
+        load(data, 1, out, framesize);
+    }
+
+    void testHalved()
+    {
+        string data = "abcdefghijklmnopqrstuvwxyz";
+        u_int32_t framesize = 13;
+        string out[] = {"abcdefghijklm", "nopqrstuvwxyz"};
+        load(data, 2, out, framesize);
+    }
+
+    void load(string& in, size_t outCount, string* out, u_int32_t framesize)
+    {
+        TestMessageStore store(in);
+        LazyLoadedContent content(&store, 1, in.size());
+        DummyHandler handler;
+        u_int16_t channel = 3;
+        content.send(&handler, channel, framesize);         
+        check(handler, channel, outCount, out);
+    }
+
+    void check(DummyHandler& handler, u_int16_t channel, size_t expectedChunkCount, string* expectedChunks)
+    {
+        CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+
+        for (unsigned int i = 0; i < expectedChunkCount; i++) {
+            AMQContentBody::shared_ptr chunk(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[i]->getBody()));
+            CPPUNIT_ASSERT(chunk);
+            CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
+            CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+        }
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp Tue Nov 28 07:25:35 2006
@@ -56,13 +56,19 @@
                 header = new Buffer(msg->encodedHeaderSize());
                 msg->encodeHeader(*header);                
                 content = new Buffer(contentBufferSize);
-                msg->encodeContent(*content);
-            } else if (!header || !content) {
-                throw qpid::Exception("Buffers not initialised!");
+                msg->setPersistenceId(1);
             } else {
-                msg->encodeContent(*content);
+                throw qpid::Exception("Message already staged!");
+            }
+        }
+
+        void appendContent(u_int64_t msgId, const string& data)
+        {
+            if (msgId == 1) {
+                content->putRawData(data);
+            } else {
+                throw qpid::Exception("Invalid message id!");
             }
-            msg->setPersistenceId(1);
         }
 
         Message::shared_ptr getRestoredMessage()
@@ -159,7 +165,7 @@
 
     void testStaging(){
         DummyHandler handler;
-        TestMessageStore store(50);//more than enough for two frames of 14 bytes
+        TestMessageStore store(14);
         MessageBuilder builder(&handler, &store, 5);
 
         string data1("abcdefg");

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp Tue Nov 28 07:25:35 2006
@@ -77,13 +77,10 @@
 
         DummyHandler handler;
         msg->deliver(&handler, 0, "ignore", 0, 100); 
-        CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
-        AMQContentBody::shared_ptr contentBody1(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
-        AMQContentBody::shared_ptr contentBody2(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3]->getBody()));
-        CPPUNIT_ASSERT(contentBody1);
-        CPPUNIT_ASSERT(contentBody2);
-        CPPUNIT_ASSERT_EQUAL(data1, contentBody1->getData());
-        CPPUNIT_ASSERT_EQUAL(data2, contentBody2->getData());
+        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+        AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+        CPPUNIT_ASSERT(contentBody);
+        CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
     }
 };
 



Mime
View raw message