qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r447994 [3/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concurr...
Date Tue, 19 Sep 2006 22:07:25 GMT
Added: incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "FanOutExchange.h"
+#include "ExchangeBinding.h"
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+FanOutExchange::FanOutExchange(const string& _name) : name(_name) {}
+
+void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+    Locker locker(lock);
+    // Add if not already present.
+    Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+    if (i == bindings.end()) {
+        bindings.push_back(queue);
+        queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+    }
+}
+
+void FanOutExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+    Locker locker(lock);
+    Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
+    if (i != bindings.end()) {
+        bindings.erase(i);
+        // TODO aconway 2006-09-14: What about the ExchangeBinding object? Don't we have to verify routingKey/args match?
+    }
+}
+
+void FanOutExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+    Locker locker(lock);
+    for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
+        (*i)->deliver(msg);
+    }
+}
+
+FanOutExchange::~FanOutExchange() {}
+
+const std::string FanOutExchange::typeName("fanout");

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/FanOutExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,97 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "MonitorImpl.h"
+#include "Message.h"
+#include "ExchangeRegistry.h"
+#include <iostream>
+
+using namespace std::tr1;//for *_pointer_cast methods
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+
+Message::Message(const ConnectionToken* const _publisher, 
+                 const string& _exchange, const string& _routingKey, 
+                 bool _mandatory, bool _immediate) : publisher(_publisher),
+                                                     exchange(_exchange),
+                                                     routingKey(_routingKey), 
+                                                     mandatory(_mandatory),
+                                                     immediate(_immediate){
+
+}
+
+Message::~Message(){
+}
+
+void Message::setHeader(AMQHeaderBody::shared_ptr header){
+    this->header = header;
+}
+
+void Message::addContent(AMQContentBody::shared_ptr data){
+    content.push_back(data);
+}
+
+bool Message::isComplete(){
+    return header.get() && (header->getContentSize() == contentSize());
+}
+
+void Message::deliver(OutputHandler* out, int channel, 
+                      string& consumerTag, u_int64_t deliveryTag, 
+                      u_int32_t framesize){
+
+    out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false, exchange, routingKey)));
+    AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+    out->send(new AMQFrame(channel, headerBody));
+    for(content_iterator i = content.begin(); i != content.end(); i++){
+        if((*i)->size() > framesize){
+            //TODO: need to split it
+            std::cout << "WARNING: Dropped message. Re-fragmentation not yet implemented." << std::endl;
+        }else{
+            AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+            out->send(new AMQFrame(channel, contentBody));
+        }
+    }
+}
+
+BasicHeaderProperties* Message::getHeaderProperties(){
+    return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+}
+
+u_int64_t Message::contentSize(){
+    u_int64_t size(0);
+    for(content_iterator i = content.begin(); i != content.end(); i++){
+        size += (*i)->size();
+    }
+    return size;
+}
+
+const ConnectionToken* const Message::getPublisher(){
+    return publisher;
+}
+
+bool qpid::broker::route(Message::shared_ptr& msg, ExchangeRegistry* registry){
+    Exchange* exchange = registry->get(msg->exchange);
+    if(exchange){
+        exchange->route(msg, msg->routingKey, &(msg->getHeaderProperties()->getHeaders()));
+        return true;
+    }else{
+        return false;
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/Message.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/NameGenerator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/NameGenerator.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/NameGenerator.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/NameGenerator.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "NameGenerator.h"
+#include <sstream>
+
+using namespace qpid::broker;
+
+NameGenerator::NameGenerator(const std::string& _base) : base(_base), counter(1) {}
+
+std::string NameGenerator::generate(){
+    std::stringstream ss;
+    ss << base << counter++;
+    return ss.str();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/NameGenerator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Queue.h"
+#include "MonitorImpl.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+Queue::Queue(const string& _name, bool _durable, u_int32_t _autodelete, const ConnectionToken* const _owner) : name(_name), 
+                                                                                                               durable(_durable), 
+                                                                                                               autodelete(_autodelete),
+                                                                                                               owner(_owner), 
+                                                                                                               queueing(false),
+                                                                                                               dispatching(false),
+                                                                                                               next(0),
+                                                                                                               lastUsed(0),
+                                                                                                               exclusive(0){
+
+    if(autodelete) lastUsed = apr_time_as_msec(apr_time_now());
+}
+
+Queue::~Queue(){
+    for(Binding* b = bindings.front(); !bindings.empty(); b = bindings.front()){
+        b->cancel();
+        bindings.pop();
+    }
+}
+
+void Queue::bound(Binding* b){
+    bindings.push(b);
+}
+
+void Queue::deliver(Message::shared_ptr& msg){
+    Locker locker(lock);
+    if(queueing || !dispatch(msg)){
+        queueing = true;
+        messages.push(msg);
+    }
+}
+
+bool Queue::dispatch(Message::shared_ptr& msg){
+    if(consumers.empty()){
+        return false;
+    }else if(exclusive){
+        if(!exclusive->deliver(msg)){
+            std::cout << "WARNING: Dropping undeliverable message from queue with exclusive consumer." << std::endl;
+        }
+        return true;
+    }else{
+        //deliver to next consumer
+        next = next % consumers.size();
+        Consumer* c = consumers[next];
+        int start = next;
+        while(c){
+            next++;
+            if(c->deliver(msg)) return true;            
+
+            next = next % consumers.size();
+            c = next == start ? 0 : consumers[next];            
+        }
+        return false;
+    }
+}
+
+bool Queue::startDispatching(){
+    Locker locker(lock);
+    if(queueing && !dispatching){
+        dispatching = true;
+        return true;
+    }else{
+        return false;
+    }
+}
+
+void Queue::dispatch(){
+    bool proceed = startDispatching();
+    while(proceed){
+        Locker locker(lock);
+        if(!messages.empty() && dispatch(messages.front())){
+            messages.pop();
+        }else{
+            dispatching = false;
+            proceed = false;
+            queueing = !messages.empty();
+        }
+    }
+}
+
+void Queue::consume(Consumer* c, bool requestExclusive){
+    Locker locker(lock);
+    if(exclusive) throw ExclusiveAccessException();
+    if(requestExclusive){
+        if(!consumers.empty()) throw ExclusiveAccessException();
+        exclusive = c;
+    }
+
+    if(autodelete && consumers.empty()) lastUsed = 0;
+    consumers.push_back(c);
+}
+
+void Queue::cancel(Consumer* c){
+    Locker locker(lock);
+    consumers.erase(find(consumers.begin(), consumers.end(), c));
+    if(autodelete && consumers.empty()) lastUsed = apr_time_as_msec(apr_time_now());
+    if(exclusive == c) exclusive = 0;
+}
+
+Message::shared_ptr Queue::dequeue(){
+
+}
+
+u_int32_t Queue::purge(){
+    Locker locker(lock);
+    int count = messages.size();
+    while(!messages.empty()) messages.pop();
+    return count;
+}
+
+u_int32_t Queue::getMessageCount() const{
+    Locker locker(lock);
+    return messages.size();
+}
+
+u_int32_t Queue::getConsumerCount() const{
+    Locker locker(lock);
+    return consumers.size();
+}
+
+bool Queue::canAutoDelete() const{
+    Locker locker(lock);
+    return lastUsed && ((apr_time_as_msec(apr_time_now()) - lastUsed) > autodelete);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/Queue.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/QueueRegistry.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/QueueRegistry.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/QueueRegistry.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "QueueRegistry.h"
+#include "MonitorImpl.h"
+#include "SessionHandlerImpl.h"
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+QueueRegistry::QueueRegistry() : counter(1){}
+
+QueueRegistry::~QueueRegistry(){}
+
+std::pair<Queue::shared_ptr, bool>
+QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner)
+{
+    Locker locker(lock);
+    string name = declareName.empty() ? generateName() : declareName;
+    assert(!name.empty());
+    QueueMap::iterator i =  queues.find(name);
+    if (i == queues.end()) {
+	Queue::shared_ptr queue(new Queue(name, durable, autoDelete, owner));
+	queues[name] = queue;
+	return std::pair<Queue::shared_ptr, bool>(queue, true);
+    } else {
+	return std::pair<Queue::shared_ptr, bool>(i->second, false);
+    }
+}
+
+void QueueRegistry::destroy(const string& name){
+    Locker locker(lock);
+    queues.erase(name);
+}
+
+Queue::shared_ptr QueueRegistry::find(const string& name){
+    Locker locker(lock);
+    QueueMap::iterator i = queues.find(name);
+    if (i == queues.end()) {
+	return Queue::shared_ptr();
+    } else {
+	return i->second;
+    }
+}
+
+string QueueRegistry::generateName(){
+    string name;
+    do {
+	std::stringstream ss;
+	ss << "tmp_" << counter++;
+	name = ss.str();
+	// Thread safety: Private function, only called with lock held
+	// so this is OK.
+    } while(queues.find(name) != queues.end());
+    return name;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/QueueRegistry.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "SessionHandlerFactoryImpl.h"
+#include "SessionHandlerImpl.h"
+#include "FanOutExchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
+    exchanges.declare(new DirectExchange("amq.direct"));
+    exchanges.declare(new TopicExchange("amq.topic"));
+    exchanges.declare(new FanOutExchange("amq.fanout"));
+    cleaner.start();
+}
+
+SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt){
+    return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, timeout);
+}
+
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl(){
+    cleaner.stop();
+    exchanges.destroy("amq.direct");
+    exchanges.destroy("amq.topic");    
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,378 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "SessionHandlerImpl.h"
+#include "FanOutExchange.h"
+#include "assert.h"
+
+using namespace std::tr1;
+using namespace qpid::broker;
+using namespace qpid::io;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, 
+                                       QueueRegistry* _queues, 
+                                       ExchangeRegistry* _exchanges, 
+                                       AutoDelete* _cleaner,
+                                       const u_int32_t _timeout) : context(_context), 
+                                                                   queues(_queues), 
+                                                                   exchanges(_exchanges),
+                                                                   cleaner(_cleaner),
+                                                                   timeout(_timeout),
+                                                                   channelHandler(new ChannelHandlerImpl(this)),
+                                                                   connectionHandler(new ConnectionHandlerImpl(this)),
+                                                                   basicHandler(new BasicHandlerImpl(this)),
+                                                                   exchangeHandler(new ExchangeHandlerImpl(this)),
+                                                                   queueHandler(new QueueHandlerImpl(this)),
+                                                                   framemax(65536), 
+                                                                   heartbeat(0){
+
+}
+
+SessionHandlerImpl::~SessionHandlerImpl(){
+    // TODO aconway 2006-09-07: Should be auto_ptr or plain members.
+    delete channelHandler;
+    delete connectionHandler;
+    delete basicHandler;
+    delete exchangeHandler;
+    delete queueHandler;
+}
+
+Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t channel){
+    Queue::shared_ptr queue;
+    if (name.empty()) {
+        queue = channels[channel]->getDefaultQueue();
+        if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
+    } else {
+        queue = queues->find(name);
+        if (queue == 0) {
+            throw ChannelException( 404, "Queue not found: " + name);
+        }
+    }
+    return queue;
+}
+
+
+Exchange* SessionHandlerImpl::findExchange(const string& name){
+    exchanges->getLock()->acquire();
+    Exchange* exchange(exchanges->get(name));
+    exchanges->getLock()->release();
+    return exchange;
+}
+
+void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
+    u_int16_t channel = frame->getChannel();
+    AMQBody::shared_ptr body = frame->getBody();
+    AMQMethodBody::shared_ptr method;
+
+    switch(body->type())
+    {
+    case METHOD_BODY:
+        method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
+        try{
+            method->invoke(*this, channel);
+        }catch(ChannelException& e){
+            channels[channel]->close();
+            channels.erase(channel);
+            context->send(new AMQFrame(channel, new ChannelCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+        }catch(ConnectionException& e){
+            context->send(new AMQFrame(0, new ConnectionCloseBody(e.code, e.text, method->amqpClassId(), method->amqpMethodId())));
+        }
+	break;
+
+    case HEADER_BODY:
+	this->handleHeader(channel, dynamic_pointer_cast<AMQHeaderBody, AMQBody>(body));
+	break;
+
+    case CONTENT_BODY:
+	this->handleContent(channel, dynamic_pointer_cast<AMQContentBody, AMQBody>(body));
+	break;
+
+    case HEARTBEAT_BODY:
+        //channel must be 0
+	this->handleHeartbeat(dynamic_pointer_cast<AMQHeartbeatBody, AMQBody>(body));
+	break;
+    }
+}
+
+void SessionHandlerImpl::initiated(qpid::framing::ProtocolInitiation* header){
+    //send connection start
+    FieldTable properties;
+    string mechanisms("PLAIN");
+    string locales("en_US");
+    context->send(new AMQFrame(0, new ConnectionStartBody(8, 0, properties, mechanisms, locales)));
+}
+
+void SessionHandlerImpl::idleOut(){
+
+}
+
+void SessionHandlerImpl::idleIn(){
+
+}
+
+void SessionHandlerImpl::closed(){
+    for(channel_iterator i = channels.begin(); i != channels.end(); i = channels.begin()){
+        Channel* c = i->second;
+        channels.erase(i);
+        c->close();
+        delete c;
+    }
+    for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
+        string name = (*i)->getName();
+        queues->destroy(name);
+        exclusiveQueues.erase(i);
+    }
+}
+
+void SessionHandlerImpl::handleHeader(u_int16_t channel, AMQHeaderBody::shared_ptr body){
+    channels[channel]->handleHeader(body, exchanges);
+}
+
+void SessionHandlerImpl::handleContent(u_int16_t channel, AMQContentBody::shared_ptr body){
+    channels[channel]->handleContent(body, exchanges);
+}
+
+void SessionHandlerImpl::handleHeartbeat(AMQHeartbeatBody::shared_ptr body){
+    std::cout << "SessionHandlerImpl::handleHeartbeat()" << std::endl;
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::startOk(u_int16_t channel, FieldTable& clientProperties, string& mechanism, 
+                                    string& response, string& locale){
+
+    parent->context->send(new AMQFrame(0, new ConnectionTuneBody(100, parent->framemax, parent->heartbeat)));
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::secureOk(u_int16_t channel, string& response){}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::tuneOk(u_int16_t channel, u_int16_t channelmax, u_int32_t framemax, u_int16_t heartbeat){
+    parent->framemax = framemax;
+    parent->heartbeat = heartbeat;
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist){
+    string knownhosts;
+    parent->context->send(new AMQFrame(0, new ConnectionOpenOkBody(knownhosts)));
+}
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, 
+                                                      u_int16_t classId, u_int16_t methodId){
+
+    parent->context->send(new AMQFrame(0, new ConnectionCloseOkBody()));
+    parent->context->close();
+} 
+        
+void SessionHandlerImpl::ConnectionHandlerImpl::closeOk(u_int16_t channel){
+    parent->context->close();
+} 
+              
+
+
+void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, string& outOfBand){
+    parent->channels[channel] = new Channel(parent->context, channel, parent->framemax);
+    parent->context->send(new AMQFrame(channel, new ChannelOpenOkBody()));
+} 
+        
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){}         
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t channel, bool active){} 
+        
+void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t replyCode, string& replyText, 
+                                                   u_int16_t classId, u_int16_t methodId){
+    Channel* c = parent->channels[channel];
+    parent->channels.erase(channel);
+    c->close();
+    delete c;
+    parent->context->send(new AMQFrame(channel, new ChannelCloseOkBody()));
+} 
+        
+void SessionHandlerImpl::ChannelHandlerImpl::closeOk(u_int16_t channel){} 
+              
+
+
+void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type, 
+                                                      bool passive, bool durable, bool autoDelete, bool internal, bool nowait, 
+                                                      FieldTable& arguments){
+
+    if(!passive && (
+           type != TopicExchange::typeName &&
+           type != DirectExchange::typeName &&
+           type != FanOutExchange::typeName)
+        )
+    {
+        throw ChannelException(540, "Exchange type not implemented: " + type);
+    }
+    
+    parent->exchanges->getLock()->acquire();
+    if(!parent->exchanges->get(exchange)){
+        if(type == TopicExchange::typeName){
+            parent->exchanges->declare(new TopicExchange(exchange));
+        }else if(type == DirectExchange::typeName){
+            parent->exchanges->declare(new DirectExchange(exchange));
+        }else if(type == FanOutExchange::typeName){
+            parent->exchanges->declare(new DirectExchange(exchange));
+        }
+    }
+    parent->exchanges->getLock()->release();
+    if(!nowait){
+        parent->context->send(new AMQFrame(channel, new ExchangeDeclareOkBody()));
+    }
+} 
+        
+void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait){
+    //TODO: implement unused
+    parent->exchanges->getLock()->acquire();
+    parent->exchanges->destroy(exchange);
+    parent->exchanges->getLock()->release();
+    if(!nowait) parent->context->send(new AMQFrame(channel, new ExchangeDeleteOkBody()));
+} 
+              
+        
+
+
+void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t ticket, string& name, 
+                                                   bool passive, bool durable, bool exclusive, 
+                                                   bool autoDelete, bool nowait, FieldTable& arguments){
+    Queue::shared_ptr queue;
+    if (passive && !name.empty()) {
+	queue = parent->getQueue(name, channel);
+    } else {
+	std::pair<Queue::shared_ptr, bool> queue_created =  parent->queues->declare(name, durable, autoDelete ? parent->timeout : 0, exclusive ? parent : 0);
+	queue = queue_created.first;
+	assert(queue);
+	if (queue_created.second) { // This is a new queue
+	    parent->channels[channel]->setDefaultQueue(queue);
+	    //add default binding:
+	    parent->exchanges->get("amq.direct")->bind(queue, name, 0);
+	    if(exclusive){
+		parent->exclusiveQueues.push_back(queue);
+	    } else if(autoDelete){
+		parent->cleaner->add(queue);
+	    }
+	}
+    }
+    if(exclusive && !queue->isExclusiveOwner(parent)){
+	throw ChannelException(405, "Cannot grant exclusive access to queue");
+    }
+    if(!nowait){
+        name = queue->getName();
+        QueueDeclareOkBody* response = new QueueDeclareOkBody(name, queue->getMessageCount(), queue->getConsumerCount());
+        parent->context->send(new AMQFrame(channel, response));
+    }
+} 
+        
+void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t ticket, string& queueName, 
+                                                string& exchangeName, string& routingKey, bool nowait, 
+                                                FieldTable& arguments){
+
+    Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+    Exchange* exchange = parent->exchanges->get(exchangeName);
+    if(exchange){
+        if(routingKey.size() == 0 && queueName.size() == 0) routingKey = queue->getName();
+        exchange->bind(queue, routingKey, &arguments);
+        if(!nowait) parent->context->send(new AMQFrame(channel, new QueueBindOkBody()));    
+    }else{
+        throw ChannelException(404, "Bind failed. No such exchange: " + exchangeName);
+    }
+} 
+        
+void SessionHandlerImpl::QueueHandlerImpl::purge(u_int16_t channel, u_int16_t ticket, string& queueName, bool nowait){
+
+    Queue::shared_ptr queue = parent->getQueue(queueName, channel);
+    int count = queue->purge();
+    if(!nowait) parent->context->send(new AMQFrame(channel, new QueuePurgeOkBody(count)));
+} 
+        
+void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t ticket, string& queue, 
+                                                   bool ifUnused, bool ifEmpty, bool nowait){
+    ChannelException error(0, "");
+    int count(0);
+    Queue::shared_ptr q = parent->getQueue(queue, channel);
+    if(ifEmpty && q->getMessageCount() > 0){
+        throw ChannelException(406, "Queue not empty.");
+    }else if(ifUnused && q->getConsumerCount() > 0){
+        throw ChannelException(406, "Queue in use.");
+    }else{
+        //remove the queue from the list of exclusive queues if necessary
+        if(q->isExclusiveOwner(parent)){
+            queue_iterator i = find(parent->exclusiveQueues.begin(), parent->exclusiveQueues.end(), q);
+            if(i < parent->exclusiveQueues.end()) parent->exclusiveQueues.erase(i);
+        }
+        count = q->getMessageCount();
+        parent->queues->destroy(queue);
+    }
+    if(!nowait) parent->context->send(new AMQFrame(channel, new QueueDeleteOkBody(count)));
+} 
+              
+        
+
+
+void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){
+    //TODO: handle global
+    //TODO: channel doesn't do anything with these qos parameters yet
+    parent->channels[channel]->setPrefetchSize(prefetchSize);
+    parent->channels[channel]->setPrefetchCount(prefetchCount);
+    parent->context->send(new AMQFrame(channel, new BasicQosOkBody()));
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_t ticket, 
+                                                   string& queueName, string& consumerTag, 
+                                                   bool noLocal, bool noAck, bool exclusive, 
+                                                   bool nowait){
+    
+    //TODO: implement nolocal
+    Queue::shared_ptr queue = parent->getQueue(queueName, channelId);    
+    Channel* channel = parent->channels[channelId];
+    if(!consumerTag.empty() && channel->exists(consumerTag)){
+        throw ConnectionException(530, "Consumer tags must be unique");
+    }
+
+    try{
+        channel->consume(consumerTag, queue, !noAck, exclusive, noLocal ? parent : 0);
+        if(!nowait) parent->context->send(new AMQFrame(channelId, new BasicConsumeOkBody(consumerTag)));
+
+        //allow messages to be dispatched if required as there is now a consumer:
+        queue->dispatch();
+    }catch(ExclusiveAccessException& e){
+        if(exclusive) throw ChannelException(403, "Exclusive access cannot be granted");
+        else throw ChannelException(403, "Access would violate previously granted exclusivity");
+    }
+
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::cancel(u_int16_t channel, string& consumerTag, bool nowait){
+    parent->channels[channel]->cancel(consumerTag);
+    if(!nowait) parent->context->send(new AMQFrame(channel, new BasicCancelOkBody(consumerTag)));
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t ticket, 
+                                                   string& exchange, string& routingKey, 
+                                                   bool mandatory, bool immediate){
+
+    Message* msg = new Message(parent, exchange.length() ? exchange : "amq.direct", routingKey, mandatory, immediate);
+    parent->channels[channel]->handlePublish(msg);
+} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){} 
+        
+void SessionHandlerImpl::BasicHandlerImpl::recover(u_int16_t channel, bool requeue){} 
+              

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "TopicExchange.h"
+#include "ExchangeBinding.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+TopicExchange::TopicExchange(const string& _name) : name(_name) {
+
+}
+
+void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+    lock.acquire();
+    bindings[routingKey].push_back(queue);
+    queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+    lock.release();
+}
+
+void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+    lock.acquire();
+    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+    std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+    if(i < queues.end()){
+        queues.erase(i);
+        if(queues.empty()){
+            bindings.erase(routingKey);
+        }
+    }
+    lock.release();
+}
+
+void TopicExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+    lock.acquire();
+    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+    for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++){
+        (*i)->deliver(msg);
+    }
+    lock.release();
+}
+
+TopicExchange::~TopicExchange(){
+
+}
+
+const std::string TopicExchange::typeName("topic");

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/TopicExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/test/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/Makefile?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/Makefile Tue Sep 19 15:06:50 2006
@@ -0,0 +1,20 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+QPID_HOME = ../../..
+LDLIBS=-lapr-1 -lcppunit $(COMMON_LIB) $(BROKER_LIB)
+include ${QPID_HOME}/cpp/test_plugins.mk
+

Propchange: incubator/qpid/trunk/qpid/cpp/broker/test/Makefile
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/test/QueueRegistryTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/QueueRegistryTest.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/QueueRegistryTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/QueueRegistryTest.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,79 @@
+#include "QueueRegistry.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <string>
+
+using namespace qpid::broker;
+
+class QueueRegistryTest : public CppUnit::TestCase 
+{
+    CPPUNIT_TEST_SUITE(QueueRegistryTest);
+    CPPUNIT_TEST(testDeclare);
+    CPPUNIT_TEST(testDeclareTmp);
+    CPPUNIT_TEST(testFind);
+    CPPUNIT_TEST(testDestroy);
+    CPPUNIT_TEST_SUITE_END();
+
+  private:
+    std::string foo, bar;
+    QueueRegistry reg;
+    std::pair<Queue::shared_ptr,  bool> qc;
+    
+  public:
+    void setUp() {
+        foo = "foo";
+        bar = "bar";
+    }
+    
+    void testDeclare() {
+        qc = reg.declare(foo, false, 0, 0);
+        Queue::shared_ptr q = qc.first;
+        CPPUNIT_ASSERT(q);
+        CPPUNIT_ASSERT(qc.second); // New queue
+        CPPUNIT_ASSERT_EQUAL(foo, q->getName());
+
+        qc = reg.declare(foo, false, 0, 0);
+        CPPUNIT_ASSERT_EQUAL(q, qc.first);
+        CPPUNIT_ASSERT(!qc.second);
+
+        qc = reg.declare(bar, false, 0, 0);
+        q = qc.first;
+        CPPUNIT_ASSERT(q);
+        CPPUNIT_ASSERT_EQUAL(true, qc.second);
+        CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+    }
+
+    void testDeclareTmp() 
+    {
+        qc = reg.declare(std::string(), false, 0, 0);
+        CPPUNIT_ASSERT(qc.second);
+        CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName());
+    }
+    
+    void testFind() {
+        CPPUNIT_ASSERT(reg.find(foo) == 0);
+
+        reg.declare(foo, false, 0, 0);
+        reg.declare(bar, false, 0, 0);
+        Queue::shared_ptr q = reg.find(bar);
+        CPPUNIT_ASSERT(q);
+        CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+    }
+
+    void testDestroy() {
+        qc = reg.declare(foo, false, 0, 0);
+        reg.destroy(foo);
+        // Queue is gone from the registry.
+        CPPUNIT_ASSERT(reg.find(foo) == 0);
+        // Queue is not actually destroyed till we drop our reference.
+        CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName());
+        // We shoud be the only reference.
+        CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count());
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest);

Propchange: incubator/qpid/trunk/qpid/cpp/broker/test/QueueRegistryTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/test/exchange_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/exchange_test.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/exchange_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/exchange_test.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "DirectExchange.h"
+#include "Exchange.h"
+#include "Queue.h"
+#include "TopicExchange.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+class ExchangeTest : public CppUnit::TestCase
+{
+    CPPUNIT_TEST_SUITE(ExchangeTest);
+    CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    // TODO aconway 2006-09-12: Need more detailed tests.
+
+    void testMe() 
+    {
+        Queue::shared_ptr queue(new Queue("queue", true, true));
+        Queue::shared_ptr queue2(new Queue("queue2", true, true));
+
+        TopicExchange topic("topic");
+        topic.bind(queue, "abc", 0);
+        topic.bind(queue2, "abc", 0);
+
+        DirectExchange direct("direct");
+        direct.bind(queue, "abc", 0);
+        direct.bind(queue2, "abc", 0);
+
+        queue.reset();
+        queue2.reset();
+
+        Message::shared_ptr msg = Message::shared_ptr(new Message(0, "e", "A", true, true));
+        topic.route(msg, "abc", 0);
+        direct.route(msg, "abc", 0);
+
+        // TODO aconway 2006-09-12: TODO Why no assertions?
+    }
+};
+    
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ExchangeTest);

Propchange: incubator/qpid/trunk/qpid/cpp/broker/test/exchange_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/test/message_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/message_test.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/message_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/message_test.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "APRBase.h"
+#include "Message.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+class MessageTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(MessageTest);
+    CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    // TODO aconway 2006-09-12: Need more detailed tests,
+    // need tests to assert something!
+    // 
+    void testMe() 
+    {
+        APRBase::increment();
+        const int size(10);
+        for(int i = 0; i < size; i++){
+            Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", "B", true, true));
+            msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody()));
+            msg->addContent(AMQContentBody::shared_ptr(new AMQContentBody()));
+            msg.reset();
+        }
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/broker/test/message_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,138 @@
+ /*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Queue.h"
+#include "QueueRegistry.h"
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+
+class TestBinding : public virtual Binding{
+    bool cancelled;
+
+public:
+    TestBinding();
+    virtual void cancel();
+    bool isCancelled();
+};
+
+class TestConsumer : public virtual Consumer{
+public:
+    Message::shared_ptr last;
+
+    virtual bool deliver(Message::shared_ptr& msg);
+};
+
+
+class QueueTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(QueueTest);
+    CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+    void testMe() 
+    {
+        Queue::shared_ptr queue(new Queue("my_queue", true, true));
+    
+        //Test adding consumers:
+        TestConsumer c1; 
+        TestConsumer c2; 
+        queue->consume(&c1);
+        queue->consume(&c2);
+
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
+        
+        //Test basic delivery:
+        Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+        Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+        Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+
+        queue->deliver(msg1);
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+
+        queue->deliver(msg2);
+        CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+        
+        queue->deliver(msg3);
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());        
+    
+        //Test cancellation:
+        queue->cancel(&c1);
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
+        queue->cancel(&c2);
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+
+        //Test bindings:
+        TestBinding a;
+        TestBinding b;
+        queue->bound(&a);
+        queue->bound(&b);    
+    
+        queue.reset();
+
+        CPPUNIT_ASSERT(a.isCancelled());
+        CPPUNIT_ASSERT(b.isCancelled());
+
+        //Test use of queues in registry:
+        QueueRegistry registry;
+        registry.declare("queue1", true, true);
+        registry.declare("queue2", true, true);
+        registry.declare("queue3", true, true);
+
+        CPPUNIT_ASSERT(registry.find("queue1"));
+        CPPUNIT_ASSERT(registry.find("queue2"));
+        CPPUNIT_ASSERT(registry.find("queue3"));
+        
+        registry.destroy("queue1");
+        registry.destroy("queue2");
+        registry.destroy("queue3");
+
+        CPPUNIT_ASSERT(!registry.find("queue1"));
+        CPPUNIT_ASSERT(!registry.find("queue2"));
+        CPPUNIT_ASSERT(!registry.find("queue3"));
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
+
+//TestBinding
+TestBinding::TestBinding() : cancelled(false) {}
+
+void TestBinding::cancel(){
+    CPPUNIT_ASSERT(!cancelled);
+    cancelled = true;
+}
+
+bool TestBinding::isCancelled(){
+    return cancelled;
+}
+
+//TestConsumer
+bool TestConsumer::deliver(Message::shared_ptr& msg){
+    last = msg;
+    return true;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/broker/test/queue_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/Makefile?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/client/Makefile Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Build client library.
+# 
+
+QPID_HOME = ../..
+include ${QPID_HOME}/cpp/options.mk
+
+SOURCES := $(wildcard src/*.cpp)
+OBJECTS := $(subst .cpp,.o,$(SOURCES))
+CLIENT_LIB=$(LIB_DIR)/libqpid_client.so.1.0
+
+.PHONY: all test clean 
+
+all: $(CLIENT_LIB)
+
+test:
+	@$(MAKE) -C test all
+
+clean:
+	-@rm -f $(CLIENT_LIB) $(OBJECTS)  src/*.d
+	$(MAKE) -C test clean
+
+$(CLIENT_LIB): $(OBJECTS)
+	$(CXX) -shared -o $@ $^ $(LDFLAGS)  $(COMMON_LIB) 
+
+# Dependencies
+-include $(SOURCES:.cpp=.d)

Propchange: incubator/qpid/trunk/qpid/cpp/client/Makefile
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/Channel.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/Channel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/Channel.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,127 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <map>
+#include <string>
+#include <queue>
+#include "sys/types.h"
+
+#ifndef _Channel_
+#define _Channel_
+
+#include "amqp_framing.h"
+
+#include "ThreadFactory.h"
+
+#include "Connection.h"
+#include "Exchange.h"
+#include "IncomingMessage.h"
+#include "Message.h"
+#include "MessageListener.h"
+#include "Queue.h"
+#include "ResponseHandler.h"
+#include "ReturnedMessageHandler.h"
+
+namespace qpid {
+namespace client {
+    enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3};
+
+    class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{
+        struct Consumer{
+            MessageListener* listener;
+            int ackMode;
+            int count;
+            u_int64_t lastDeliveryTag;
+        };
+        typedef std::map<string,Consumer*>::iterator consumer_iterator; 
+
+	u_int16_t id;
+	Connection* con;
+	qpid::concurrent::ThreadFactory* threadFactory;
+	qpid::concurrent::Thread* dispatcher;
+	qpid::framing::OutputHandler* out;
+	IncomingMessage* incoming;
+	ResponseHandler responses;
+	std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
+	IncomingMessage* retrieved;//holds response to basic.get
+	qpid::concurrent::Monitor* dispatchMonitor;
+	qpid::concurrent::Monitor* retrievalMonitor;
+	std::map<std::string, Consumer*> consumers;
+	ReturnedMessageHandler* returnsHandler;
+	bool closed;
+
+        u_int16_t prefetch;
+        const bool transactional;
+
+	void enqueue();
+	void retrieve(Message& msg);
+	IncomingMessage* dequeue();
+	void dispatch();
+	void stop();
+	void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);            
+        void deliver(Consumer* consumer, Message& msg);
+        void setQos();
+	void cancelAll();
+
+	virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+	virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+	virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+	virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+    public:
+	Channel(bool transactional = false, u_int16_t prefetch = 500);
+	~Channel();
+
+	void declareExchange(Exchange& exchange, bool synch = true);
+	void deleteExchange(Exchange& exchange, bool synch = true);
+	void declareQueue(Queue& queue, bool synch = true);
+	void deleteQueue(Queue& queue, bool ifunused = false, bool ifempty = false, bool synch = true);
+	void bind(const Exchange& exchange, const Queue& queue, const std::string& key, 
+                  const qpid::framing::FieldTable& args, bool synch = true);
+        void consume(Queue& queue, std::string& tag, MessageListener* listener, 
+                     int ackMode = NO_ACK, bool noLocal = false, bool synch = true);
+	void cancel(std::string& tag, bool synch = true);
+        bool get(Message& msg, const Queue& queue, int ackMode = NO_ACK);
+        void publish(Message& msg, const Exchange& exchange, const std::string& routingKey, 
+                     bool mandatory = false, bool immediate = false);
+
+        void commit();
+        void rollback();
+
+        void setPrefetch(u_int16_t prefetch);
+
+	/**
+	 * Start message dispatching on a new thread
+	 */
+	void start();
+	/**
+	 * Do message dispatching on this thread
+	 */
+	void run();
+
+        void close();
+
+	void setReturnedMessageHandler(ReturnedMessageHandler* handler);
+
+        friend class Connection;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/Channel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/Connection.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/Connection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/Connection.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <map>
+#include <string>
+
+#ifndef _Connection_
+#define _Connection_
+
+#include "QpidError.h"
+#include "Connector.h"
+#include "ShutdownHandler.h"
+#include "TimeoutHandler.h"
+
+#include "amqp_framing.h"
+#include "Exchange.h"
+#include "IncomingMessage.h"
+#include "Message.h"
+#include "MessageListener.h"
+#include "Queue.h"
+#include "ResponseHandler.h"
+
+namespace qpid {
+namespace client {
+
+    class Channel;
+
+    class Connection : public virtual qpid::framing::InputHandler, 
+        public virtual qpid::io::TimeoutHandler, 
+        public virtual qpid::io::ShutdownHandler, 
+        private virtual qpid::framing::BodyHandler{
+
+        typedef std::map<int, Channel*>::iterator iterator;
+
+	static u_int16_t channelIdCounter;
+
+	std::string host;
+	int port;
+	const u_int32_t max_frame_size;
+	std::map<int, Channel*> channels; 
+	qpid::io::Connector* connector;
+	qpid::framing::OutputHandler* out;
+	ResponseHandler responses;
+        volatile bool closed;
+
+        void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
+        void error(int code, const string& msg, int classid = 0, int methodid = 0);
+        void closeChannel(Channel* channel, u_int16_t code, string& text, u_int16_t classId = 0, u_int16_t methodId = 0);
+	void sendAndReceive(qpid::framing::AMQFrame* frame, const qpid::framing::AMQMethodBody& body);
+
+	virtual void handleMethod(qpid::framing::AMQMethodBody::shared_ptr body);
+	virtual void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr body);
+	virtual void handleContent(qpid::framing::AMQContentBody::shared_ptr body);
+	virtual void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+    public:
+
+	Connection(bool debug = false, u_int32_t max_frame_size = 65536);
+	~Connection();
+        void open(const std::string& host, int port = 5672, 
+                  const std::string& uid = "guest", const std::string& pwd = "guest", 
+                  const std::string& virtualhost = "/");
+        void close();
+	void openChannel(Channel* channel);
+	/*
+         * Requests that the server close this channel, then removes
+         * the association to the channel from this connection
+         */
+	void closeChannel(Channel* channel);
+	/*
+         * Removes the channel from association with this connection,
+	 * without sending a close request to the server.
+         */
+	void removeChannel(Channel* channel);
+
+	virtual void received(qpid::framing::AMQFrame* frame);
+
+	virtual void idleOut();
+	virtual void idleIn();
+
+	virtual void shutdown();
+
+	inline u_int32_t getMaxFrameSize(){ return max_frame_size; }
+    };
+
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/Connection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/Exchange.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/Exchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/Exchange.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Exchange_
+#define _Exchange_
+
+namespace qpid {
+namespace client {
+
+    class Exchange{
+	const std::string name;
+	const std::string type;
+
+    public:
+
+	static const std::string DIRECT_EXCHANGE;
+	static const std::string TOPIC_EXCHANGE;
+	static const std::string HEADERS_EXCHANGE;
+
+	static const Exchange DEFAULT_DIRECT_EXCHANGE;
+	static const Exchange DEFAULT_TOPIC_EXCHANGE;
+	static const Exchange DEFAULT_HEADERS_EXCHANGE;
+
+	Exchange(std::string name, std::string type = DIRECT_EXCHANGE);
+	const std::string& getName() const;
+	const std::string& getType() const;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/Exchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/IncomingMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/IncomingMessage.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/IncomingMessage.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/IncomingMessage.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include "amqp_framing.h"
+
+#ifndef _IncomingMessage_
+#define _IncomingMessage_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+    class IncomingMessage{
+        //content will be preceded by one of these method frames
+	qpid::framing::BasicDeliverBody::shared_ptr delivered;
+	qpid::framing::BasicReturnBody::shared_ptr returned;
+	qpid::framing::BasicGetOkBody::shared_ptr response;
+	qpid::framing::AMQHeaderBody::shared_ptr header;
+	std::vector<qpid::framing::AMQContentBody::shared_ptr> content;
+
+	long contentSize();
+    public:
+	IncomingMessage(qpid::framing::BasicDeliverBody::shared_ptr intro);
+	IncomingMessage(qpid::framing::BasicReturnBody::shared_ptr intro);
+	IncomingMessage(qpid::framing::BasicGetOkBody::shared_ptr intro);
+        ~IncomingMessage();
+	void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+	void addContent(qpid::framing::AMQContentBody::shared_ptr content);
+	bool isComplete();
+	bool isReturn();
+	bool isDelivery();
+	bool isResponse();
+	string& getConsumerTag();//only relevant if isDelivery()
+	qpid::framing::AMQHeaderBody::shared_ptr& getHeader();
+        u_int64_t getDeliveryTag();
+	void getData(string& data);
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/IncomingMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/Message.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/Message.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/Message.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include "amqp_framing.h"
+
+#ifndef _Message_
+#define _Message_
+
+
+namespace qpid {
+namespace client {
+
+    class Message{
+	qpid::framing::AMQHeaderBody::shared_ptr header;
+	string data;
+	bool redelivered;
+        u_int64_t deliveryTag;
+
+        qpid::framing::BasicHeaderProperties* getHeaderProperties();
+	Message(qpid::framing::AMQHeaderBody::shared_ptr& header);
+    public:
+	Message();
+	~Message();
+	
+	inline std::string getData(){ return data; }
+	inline void setData(const std::string& data){ this->data = data; }
+
+	inline bool isRedelivered(){ return redelivered; }
+	inline void setRedelivered(bool redelivered){ this->redelivered = redelivered; }
+
+        inline u_int64_t getDeliveryTag(){ return deliveryTag; }
+
+        std::string& getContentType();
+        std::string& getContentEncoding();
+        qpid::framing::FieldTable& getHeaders();
+        u_int8_t getDeliveryMode();
+        u_int8_t getPriority();
+        std::string& getCorrelationId();
+        std::string& getReplyTo();
+        std::string& getExpiration();
+        std::string& getMessageId();
+        u_int64_t getTimestamp();
+        std::string& getType();
+        std::string& getUserId();
+        std::string& getAppId();
+        std::string& getClusterId();
+
+	void setContentType(std::string& type);
+	void setContentEncoding(std::string& encoding);
+	void setHeaders(qpid::framing::FieldTable& headers);
+	void setDeliveryMode(u_int8_t mode);
+	void setPriority(u_int8_t priority);
+	void setCorrelationId(std::string& correlationId);
+	void setReplyTo(std::string& replyTo);
+	void setExpiration(std::string&  expiration);
+	void setMessageId(std::string& messageId);
+	void setTimestamp(u_int64_t timestamp);
+	void setType(std::string& type);
+	void setUserId(std::string& userId);
+	void setAppId(std::string& appId);
+	void setClusterId(std::string& clusterId);
+
+
+	friend class Channel;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/Message.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/MessageListener.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/MessageListener.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/MessageListener.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/MessageListener.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _MessageListener_
+#define _MessageListener_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+    class MessageListener{
+    public:
+	virtual void received(Message& msg) = 0;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/MessageListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/Queue.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/Queue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/Queue.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _Queue_
+#define _Queue_
+
+namespace qpid {
+namespace client {
+
+    class Queue{
+	std::string name;
+        const bool autodelete;
+        const bool exclusive;
+
+    public:
+
+	Queue();
+	Queue(std::string name);
+	Queue(std::string name, bool temp);
+	Queue(std::string name, bool autodelete, bool exclusive);
+	const std::string& getName() const;
+	void setName(const std::string&);
+        bool isAutoDelete() const;
+        bool isExclusive() const;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/Queue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/ResponseHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/ResponseHandler.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/ResponseHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/ResponseHandler.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+#include "amqp_framing.h"
+#include "Monitor.h"
+
+#ifndef _ResponseHandler_
+#define _ResponseHandler_
+
+namespace qpid {
+    namespace client {
+
+        class ResponseHandler{
+            bool waiting;
+            qpid::framing::AMQMethodBody::shared_ptr response;
+            qpid::concurrent::Monitor* monitor;
+
+        public:
+            ResponseHandler();
+            ~ResponseHandler();
+            inline bool isWaiting(){ return waiting; }
+            inline qpid::framing::AMQMethodBody::shared_ptr getResponse(){ return response; }
+            bool validate(const qpid::framing::AMQMethodBody& expected);
+            void waitForResponse();
+            void signalResponse(qpid::framing::AMQMethodBody::shared_ptr response);
+            void receive(const qpid::framing::AMQMethodBody& expected);
+            void expect();//must be called before calling receive
+        };
+
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/ResponseHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/client/inc/ReturnedMessageHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/client/inc/ReturnedMessageHandler.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/client/inc/ReturnedMessageHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/client/inc/ReturnedMessageHandler.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <string>
+
+#ifndef _ReturnedMessageHandler_
+#define _ReturnedMessageHandler_
+
+#include "Message.h"
+
+namespace qpid {
+namespace client {
+
+    class ReturnedMessageHandler{
+    public:
+	virtual void returned(Message& msg) = 0;
+    };
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/client/inc/ReturnedMessageHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message