qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r577027 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/framing/
Date Tue, 18 Sep 2007 19:43:34 GMT
Author: aconway
Date: Tue Sep 18 12:43:29 2007
New Revision: 577027

URL: http://svn.apache.org/viewvc?rev=577027&view=rev
Log:
Refactor HandlerImpl to use Session rather than CoreRefs.
Remove most uses of ChannelAdapter in broker code.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Sep 18 12:43:29 2007
@@ -247,7 +247,6 @@
   qpid/broker/DtxWorkRecord.h \
   qpid/broker/ExchangeRegistry.h \
   qpid/broker/FanOutExchange.h \
-  qpid/broker/HandlerImpl.h \
   qpid/broker/Message.h \
   qpid/broker/MessageAdapter.h \
   qpid/broker/MessageBuilder.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Tue Sep 18 12:43:29 2007
@@ -38,42 +38,35 @@
 // by the handlers responsible for those classes.
 //
 
-BrokerAdapter::BrokerAdapter(Session& s, ChannelAdapter& a) :
-    CoreRefs(s,
-             s.getAdapter()->getConnection(),
-             s.getAdapter()->getConnection().broker,
-             a),
-    basicHandler(*this),
-    exchangeHandler(*this),
-    bindingHandler(*this),
-    messageHandler(*this),
-    queueHandler(*this),
-    txHandler(*this),
-    dtxHandler(*this)
+BrokerAdapter::BrokerAdapter(Session& s) :
+    HandlerImpl(s),
+    basicHandler(s),
+    exchangeHandler(s),
+    bindingHandler(s),
+    messageHandler(s),
+    queueHandler(s),
+    txHandler(s),
+    dtxHandler(s)
 {}
 
 
-ProtocolVersion BrokerAdapter::getVersion() const {
-    return connection.getVersion();
-}
-
 void BrokerAdapter::ExchangeHandlerImpl::declare(uint16_t /*ticket*/, const string& exchange, const string& type, 
                                                  const string& alternateExchange, 
                                                  bool passive, bool durable, bool /*autoDelete*/, const FieldTable& args){
     Exchange::shared_ptr alternate;
     if (!alternateExchange.empty()) {
-        alternate = broker.getExchanges().get(alternateExchange);
+        alternate = getBroker().getExchanges().get(alternateExchange);
     }
     if(passive){
-        Exchange::shared_ptr actual(broker.getExchanges().get(exchange));
+        Exchange::shared_ptr actual(getBroker().getExchanges().get(exchange));
         checkType(actual, type);
         checkAlternate(actual, alternate);
     }else{        
         try{
-            std::pair<Exchange::shared_ptr, bool> response = broker.getExchanges().declare(exchange, type, durable, args);
+            std::pair<Exchange::shared_ptr, bool> response = getBroker().getExchanges().declare(exchange, type, durable, args);
             if (response.second) {
                 if (durable) {
-                    broker.getStore().create(*response.first);
+                    getBroker().getStore().create(*response.first);
                 }
                 if (alternate) {
                     response.first->setAlternate(alternate);
@@ -109,17 +102,17 @@
                 
 void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
     //TODO: implement unused
-    Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+    Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
     if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
-    if (exchange->isDurable()) broker.getStore().destroy(*exchange);
+    if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
     if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
-    broker.getExchanges().destroy(name);
+    getBroker().getExchanges().destroy(name);
 } 
 
 ExchangeQueryResult BrokerAdapter::ExchangeHandlerImpl::query(u_int16_t /*ticket*/, const string& name)
 {
     try {
-        Exchange::shared_ptr exchange(broker.getExchanges().get(name));
+        Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
         return ExchangeQueryResult(exchange->getType(), exchange->isDurable(), false, exchange->getArgs());
     } catch (const ChannelException& e) {
         return ExchangeQueryResult("", false, true, FieldTable());        
@@ -134,12 +127,12 @@
 {
     Exchange::shared_ptr exchange;
     try {
-        exchange = broker.getExchanges().get(exchangeName);
+        exchange = getBroker().getExchanges().get(exchangeName);
     } catch (const ChannelException&) {}
 
     Queue::shared_ptr queue;
     if (!queueName.empty()) {
-        queue = broker.getQueues().find(queueName);
+        queue = getBroker().getQueues().find(queueName);
     }
 
     if (!exchange) {
@@ -160,7 +153,7 @@
 
 QueueQueryResult BrokerAdapter::QueueHandlerImpl::query(const string& name)
 {
-    Queue::shared_ptr queue = session.getQueue(name);
+    Queue::shared_ptr queue = getSession().getQueue(name);
     Exchange::shared_ptr alternateExchange = queue->getAlternateExchange();
 
     return QueueQueryResult(queue->getName(), 
@@ -179,22 +172,22 @@
  
     Exchange::shared_ptr alternate;
     if (!alternateExchange.empty()) {
-        alternate = broker.getExchanges().get(alternateExchange);
+        alternate = getBroker().getExchanges().get(alternateExchange);
     }
     Queue::shared_ptr queue;
     if (passive && !name.empty()) {
-	queue = session.getQueue(name);
+	queue = getSession().getQueue(name);
         //TODO: check alternate-exchange is as expected
     } else {
 	std::pair<Queue::shared_ptr, bool> queue_created =  
-            broker.getQueues().declare(
+            getBroker().getQueues().declare(
                 name, durable,
                 autoDelete && !exclusive,
-                exclusive ? &connection : 0);
+                exclusive ? &getConnection() : 0);
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue
-	    session.setDefaultQueue(queue);
+	    getSession().setDefaultQueue(queue);
             if (alternate) {
                 queue->setAlternateExchange(alternate);
                 alternate->incAlternateUsers();
@@ -204,16 +197,16 @@
             queue_created.first->create(arguments);
 
 	    //add default binding:
-	    broker.getExchanges().getDefault()->bind(queue, name, 0);
-            queue->bound(broker.getExchanges().getDefault()->getName(), name, arguments);
+	    getBroker().getExchanges().getDefault()->bind(queue, name, 0);
+            queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
 
             //handle automatic cleanup:
 	    if (exclusive) {
-		connection.exclusiveQueues.push_back(queue);
+		getConnection().exclusiveQueues.push_back(queue);
 	    }
 	}
     }
-    if (exclusive && !queue->isExclusiveOwner(&connection)) 
+    if (exclusive && !queue->isExclusiveOwner(&getConnection())) 
 	throw ResourceLockedException(
             QPID_MSG("Cannot grant exclusive access to queue "
                      << queue->getName()));
@@ -223,14 +216,14 @@
                                            const string& exchangeName, const string& routingKey, 
                                            const FieldTable& arguments){
 
-    Queue::shared_ptr queue = session.getQueue(queueName);
-    Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+    Queue::shared_ptr queue = getSession().getQueue(queueName);
+    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
     if(exchange){
         string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey;
         if (exchange->bind(queue, exchangeRoutingKey, &arguments)) {
             queue->bound(exchangeName, routingKey, arguments);
             if (exchange->isDurable() && queue->isDurable()) {
-                broker.getStore().bind(*exchange, *queue, routingKey, arguments);
+                getBroker().getStore().bind(*exchange, *queue, routingKey, arguments);
             }
         }
     }else{
@@ -246,38 +239,38 @@
                                         const string& routingKey,
                                         const qpid::framing::FieldTable& arguments )
 {
-    Queue::shared_ptr queue = session.getQueue(queueName);
+    Queue::shared_ptr queue = getSession().getQueue(queueName);
     if (!queue.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
 
-    Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName);
+    Exchange::shared_ptr exchange = getBroker().getExchanges().get(exchangeName);
     if (!exchange.get()) throw NotFoundException("Unbind failed. No such exchange: " + exchangeName);
 
     if (exchange->unbind(queue, routingKey, &arguments) && exchange->isDurable() && queue->isDurable()) {
-        broker.getStore().unbind(*exchange, *queue, routingKey, arguments);
+        getBroker().getStore().unbind(*exchange, *queue, routingKey, arguments);
     }
 
 }
         
 void BrokerAdapter::QueueHandlerImpl::purge(uint16_t /*ticket*/, const string& queue){
-    session.getQueue(queue)->purge();
+    getSession().getQueue(queue)->purge();
 } 
         
 void BrokerAdapter::QueueHandlerImpl::delete_(uint16_t /*ticket*/, const string& queue, bool ifUnused, bool ifEmpty){
     ChannelException error(0, "");
-    Queue::shared_ptr q = session.getQueue(queue);
+    Queue::shared_ptr q = getSession().getQueue(queue);
     if(ifEmpty && q->getMessageCount() > 0){
         throw PreconditionFailedException("Queue not empty.");
     }else if(ifUnused && q->getConsumerCount() > 0){
         throw PreconditionFailedException("Queue in use.");
     }else{
         //remove the queue from the list of exclusive queues if necessary
-        if(q->isExclusiveOwner(&connection)){
-            QueueVector::iterator i = find(connection.exclusiveQueues.begin(), connection.exclusiveQueues.end(), q);
-            if(i < connection.exclusiveQueues.end()) connection.exclusiveQueues.erase(i);
+        if(q->isExclusiveOwner(&getConnection())){
+            QueueVector::iterator i = find(getConnection().exclusiveQueues.begin(), getConnection().exclusiveQueues.end(), q);
+            if(i < getConnection().exclusiveQueues.end()) getConnection().exclusiveQueues.erase(i);
         }
         q->destroy();
-        broker.getQueues().destroy(queue);
-        q->unbind(broker.getExchanges(), q);
+        getBroker().getQueues().destroy(queue);
+        q->unbind(getBroker().getExchanges(), q);
     }
 } 
               
@@ -286,8 +279,8 @@
 
 void BrokerAdapter::BasicHandlerImpl::qos(uint32_t prefetchSize, uint16_t prefetchCount, bool /*global*/){
     //TODO: handle global
-    session.setPrefetchSize(prefetchSize);
-    session.setPrefetchCount(prefetchCount);
+    getSession().setPrefetchSize(prefetchSize);
+    getSession().setPrefetchCount(prefetchCount);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, 
@@ -296,8 +289,8 @@
                                               bool nowait, const FieldTable& fields)
 {
     
-    Queue::shared_ptr queue = session.getQueue(queueName);    
-    if(!consumerTag.empty() && session.exists(consumerTag)){
+    Queue::shared_ptr queue = getSession().getQueue(queueName);    
+    if(!consumerTag.empty() && getSession().exists(consumerTag)){
         throw ConnectionException(530, "Consumer tags must be unique");
     }
     string newTag = consumerTag;
@@ -305,33 +298,34 @@
     //also version specific behaviour now)
     if (newTag.empty()) newTag = tagGenerator.generate();
     DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken(newTag));
-    session.consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
+    getSession().consume(token, newTag, queue, noLocal, !noAck, true, exclusive, &fields);
 
-    if(!nowait) client.consumeOk(newTag);
+    if(!nowait)
+        getProxy().getBasic().consumeOk(newTag);
 
     //allow messages to be dispatched if required as there is now a consumer:
     queue->requestDispatch();
 } 
         
 void BrokerAdapter::BasicHandlerImpl::cancel(const string& consumerTag){
-    session.cancel(consumerTag);
+    getSession().cancel(consumerTag);
 } 
         
 void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& queueName, bool noAck){
-    Queue::shared_ptr queue = session.getQueue(queueName);    
+    Queue::shared_ptr queue = getSession().getQueue(queueName);    
     DeliveryToken::shared_ptr token(MessageDelivery::getBasicGetToken(queue));
-    if(!session.get(token, queue, !noAck)){
+    if(!getSession().get(token, queue, !noAck)){
         string clusterId;//not used, part of an imatix hack
 
-        client.getEmpty(clusterId);
+        getProxy().getBasic().getEmpty(clusterId);
     }
 } 
         
 void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){
     if (multiple) {
-        session.ackCumulative(deliveryTag);
+        getSession().ackCumulative(deliveryTag);
     } else {
-        session.ackRange(deliveryTag, deliveryTag);
+        getSession().ackRange(deliveryTag, deliveryTag);
     }
 } 
         
@@ -339,23 +333,23 @@
         
 void BrokerAdapter::BasicHandlerImpl::recover(bool requeue)
 {
-    session.recover(requeue);
+    getSession().recover(requeue);
 } 
 
 void BrokerAdapter::TxHandlerImpl::select()
 {
-    session.startTx();
+    getSession().startTx();
 }
 
 void BrokerAdapter::TxHandlerImpl::commit()
 {
-    session.commit(&broker.getStore());
+    getSession().commit(&getBroker().getStore());
 }
 
 void BrokerAdapter::TxHandlerImpl::rollback()
 {    
-    session.rollback();
-    session.recover(false);    
+    getSession().rollback();
+    getSession().recover(false);    
 }
               
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Tue Sep 18 12:43:29 2007
@@ -19,7 +19,6 @@
  *
  */
 #include "DtxHandlerImpl.h"
-#include "HandlerImpl.h"
 #include "MessageHandlerImpl.h"
 #include "NameGenerator.h"
 #include "qpid/Exception.h"
@@ -55,18 +54,28 @@
  * peer.
  * 
  */
-class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations
+
+// TODO aconway 2007-09-18: BrokerAdapter is no longer an appropriate way
+// to group methods as seen by the BADHANDLERs below.
+// Handlers should be grouped by layer, the BrokerAdapter stuff
+// belongs on the SemanticHandler.
+// 
+class BrokerAdapter : public HandlerImpl, public framing::AMQP_ServerOperations
 {
   public:
-    BrokerAdapter(Session& session, framing::ChannelAdapter& a);
+    BrokerAdapter(Session& session);
 
-    framing::ProtocolVersion getVersion() const;
     BasicHandler* getBasicHandler() { return &basicHandler; }
     ExchangeHandler* getExchangeHandler() { return &exchangeHandler; }
     BindingHandler* getBindingHandler() { return &bindingHandler; }
     QueueHandler* getQueueHandler() { return &queueHandler; }
     TxHandler* getTxHandler() { return &txHandler;  }
     MessageHandler* getMessageHandler() { return &messageHandler;  }
+    DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
+    DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+    framing::ProtocolVersion getVersion() const { return getConnection().getVersion(); }
+
+
     AccessHandler* getAccessHandler() {
         throw framing::NotImplementedException("Access class not implemented");  }
     FileHandler* getFileHandler() {
@@ -75,26 +84,22 @@
         throw framing::NotImplementedException("Stream class not implemented");  }
     TunnelHandler* getTunnelHandler() {
         throw framing::NotImplementedException("Tunnel class not implemented"); }
-    DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
-    DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
-    ExecutionHandler* getExecutionHandler() { throw ConnectionException(531, "Wrong adapter for execution layer method!"); }
 
     // Handlers no longer implemented in BrokerAdapter:
 #define BADHANDLER() assert(0); throw framing::InternalErrorException()
+    ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
     ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
     SessionHandler* getSessionHandler() { BADHANDLER(); }
     ChannelHandler* getChannelHandler() { BADHANDLER(); }
 #undef BADHANDLER
 
-    framing::AMQP_ClientProxy& getProxy() { return proxy; }
-
   private:
     class ExchangeHandlerImpl :
         public ExchangeHandler,
-        public HandlerImpl<framing::AMQP_ClientProxy::Exchange>
+        public HandlerImpl
     {
       public:
-        ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+        ExchangeHandlerImpl(Session& session) : HandlerImpl(session) {}
         
         void declare(uint16_t ticket,
                      const std::string& exchange, const std::string& type,
@@ -111,10 +116,10 @@
 
     class BindingHandlerImpl : 
         public BindingHandler,
-            public HandlerImpl<framing::AMQP_ClientProxy::Binding>
+            public HandlerImpl
     {
     public:
-        BindingHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+        BindingHandlerImpl(Session& session) : HandlerImpl(session) {}
 
         framing::BindingQueryResult query(u_int16_t ticket,
                                           const std::string& exchange,
@@ -125,10 +130,10 @@
 
     class QueueHandlerImpl :
         public QueueHandler,
-        public HandlerImpl<framing::AMQP_ClientProxy::Queue>
+        public HandlerImpl
     {
       public:
-        QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+        QueueHandlerImpl(Session& session) : HandlerImpl(session) {}
         
         void declare(uint16_t ticket, const std::string& queue,
                      const std::string& alternateExchange, 
@@ -151,12 +156,12 @@
 
     class BasicHandlerImpl :
         public BasicHandler,
-        public HandlerImpl<framing::AMQP_ClientProxy::Basic>
+        public HandlerImpl
     {
         NameGenerator tagGenerator;
 
       public:
-        BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent), tagGenerator("sgen") {}
+        BasicHandlerImpl(Session& session) : HandlerImpl(session), tagGenerator("sgen") {}
 
         void qos(uint32_t prefetchSize,
                  uint16_t prefetchCount, bool global); 
@@ -173,10 +178,10 @@
 
     class TxHandlerImpl :
         public TxHandler,
-        public HandlerImpl<framing::AMQP_ClientProxy::Tx>
+        public HandlerImpl
     {
       public:
-        TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {}
+        TxHandlerImpl(Session& session) : HandlerImpl(session) {}
         
         void select();
         void commit();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Sep 18 12:43:29 2007
@@ -68,6 +68,7 @@
     void setHeartbeat(uint16_t hb) { heartbeat = hb; }
     void setStagingThreshold(uint64_t st) { stagingThreshold = st; }
     
+    Broker& getBroker() { return broker; }
 
     Broker& broker;
     std::vector<Queue::shared_ptr> exclusiveQueues;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Sep 18 12:43:29 2007
@@ -21,6 +21,7 @@
 #include "DeliveryRecord.h"
 #include "DeliverableMessage.h"
 #include "Session.h"
+#include "BrokerExchange.h"
 #include "qpid/log/Statement.h"
 
 using namespace qpid::broker;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Tue Sep 18 12:43:29 2007
@@ -26,14 +26,14 @@
 using namespace qpid::framing;
 using std::string;
 
-DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
+DtxHandlerImpl::DtxHandlerImpl(Session& s) : HandlerImpl(s) {}
 
 // DtxDemarcationHandler:
 
 
 void DtxHandlerImpl::select()
 {
-    session.selectDtx();
+    getSession().selectDtx();
 }
 
 DtxDemarcationEndResult DtxHandlerImpl::end(u_int16_t /*ticket*/,
@@ -43,7 +43,7 @@
 {
     try {
         if (fail) {
-            session.endDtx(xid, true);
+            getSession().endDtx(xid, true);
             if (suspend) {
                 throw ConnectionException(503, "End and suspend cannot both be set.");
             } else {
@@ -51,9 +51,9 @@
             }
         } else {
             if (suspend) {
-                session.suspendDtx(xid);
+                getSession().suspendDtx(xid);
             } else {
-                session.endDtx(xid, false);
+                getSession().endDtx(xid, false);
             }
             return DtxDemarcationEndResult(XA_OK);
         }
@@ -72,9 +72,9 @@
     }
     try {
         if (resume) {
-            session.resumeDtx(xid);
+            getSession().resumeDtx(xid);
         } else {
-            session.startDtx(xid, broker.getDtxManager(), join);
+            getSession().startDtx(xid, getBroker().getDtxManager(), join);
         }
         return DtxDemarcationStartResult(XA_OK);
     } catch (const DtxTimeoutException& e) {
@@ -88,7 +88,7 @@
                              const string& xid)
 {
     try {
-        bool ok = broker.getDtxManager().prepare(xid);
+        bool ok = getBroker().getDtxManager().prepare(xid);
         return DtxCoordinationPrepareResult(ok ? XA_OK : XA_RBROLLBACK);
     } catch (const DtxTimeoutException& e) {
         return DtxCoordinationPrepareResult(XA_RBTIMEOUT);        
@@ -100,7 +100,7 @@
                             bool onePhase)
 {
     try {
-        bool ok = broker.getDtxManager().commit(xid, onePhase);
+        bool ok = getBroker().getDtxManager().commit(xid, onePhase);
         return DtxCoordinationCommitResult(ok ? XA_OK : XA_RBROLLBACK);
     } catch (const DtxTimeoutException& e) {
         return DtxCoordinationCommitResult(XA_RBTIMEOUT);        
@@ -112,7 +112,7 @@
                               const string& xid )
 {
     try {
-        broker.getDtxManager().rollback(xid);
+        getBroker().getDtxManager().rollback(xid);
         return DtxCoordinationRollbackResult(XA_OK);
     } catch (const DtxTimeoutException& e) {
         return DtxCoordinationRollbackResult(XA_RBTIMEOUT);        
@@ -136,7 +136,7 @@
     // note that this restricts the length of the xids more than is
     // strictly 'legal', but that is ok for testing
     std::set<std::string> xids;
-    broker.getStore().collectPreparedXids(xids);        
+    getBroker().getStore().collectPreparedXids(xids);        
     uint size(0);
     for (std::set<std::string>::iterator i = xids.begin(); i != xids.end(); i++) {
         size += i->size() + 1/*shortstr size*/;        
@@ -167,7 +167,7 @@
 
 DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)
 {
-    uint32_t timeout = broker.getDtxManager().getTimeout(xid);
+    uint32_t timeout = getBroker().getDtxManager().getTimeout(xid);
     return DtxCoordinationGetTimeoutResult(timeout);    
 }
 
@@ -176,7 +176,7 @@
                                 const string& xid,
                                 u_int32_t timeout)
 {
-    broker.getDtxManager().setTimeout(xid, timeout);
+    getBroker().getDtxManager().setTimeout(xid, timeout);
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Tue Sep 18 12:43:29 2007
@@ -27,12 +27,12 @@
 namespace broker {
 
 class DtxHandlerImpl 
-    : public CoreRefs,
+    : public HandlerImpl,
       public framing::AMQP_ServerOperations::DtxCoordinationHandler,
       public framing::AMQP_ServerOperations::DtxDemarcationHandler
 {    
 public:
-    DtxHandlerImpl(CoreRefs& parent);
+    DtxHandlerImpl(Session&);
 
     // DtxCoordinationHandler:
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HandlerImpl.h Tue Sep 18 12:43:29 2007
@@ -19,49 +19,47 @@
  *
  */
 
-#include "Broker.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include "Session.h"
+#include "SessionHandler.h"
+#include "Connection.h"
 
 namespace qpid {
 namespace broker {
 
-class Connection;
-class Session;
+class Broker;
 
 /**
- * A collection of references to the core objects required by an adapter,
- * and a client proxy.
+ * Base template for protocol handler implementations.
+ * Provides convenience methods for getting common session objects.
  */
-struct CoreRefs
-{
-    CoreRefs(Session& ch, Connection& c, Broker& b, framing::ChannelAdapter& a)
-        : session(ch), connection(c), broker(b), adapter(a), proxy(a.getHandlers().out) {}
+class HandlerImpl {
+  protected:
+    HandlerImpl(Session& s) : session(s) {}
+
+    Session& getSession() { return session; }
+    const Session& getSession() const { return session; }
+    
+    SessionHandler* getSessionHandler() { return session.getHandler(); }
+    const SessionHandler* getSessionHandler() const { return session.getHandler(); }
+
+    // Remaining functions may only be called if getSessionHandler() != 0
+    framing::AMQP_ClientProxy& getProxy() { return getSessionHandler()->getProxy(); }
+    const framing::AMQP_ClientProxy& getProxy() const { return getSessionHandler()->getProxy(); }
+
+    Connection& getConnection() { return getSessionHandler()->getConnection(); }
+    const Connection& getConnection() const { return getSessionHandler()->getConnection(); }
+    
+    Broker& getBroker() { return getConnection().broker; }
+    const Broker& getBroker() const { return getConnection().broker; }
 
+  private:
     Session& session;
-    Connection& connection;
-    Broker& broker;
-    framing::ChannelAdapter& adapter;
-    framing::AMQP_ClientProxy proxy;
 };
 
-
-/**
- * Base template for protocol handler implementations.
- * Provides the core references and appropriate AMQP class proxy.
- */
-template <class ProxyType>
-struct HandlerImpl : public CoreRefs {
-    typedef HandlerImpl<ProxyType> HandlerImplType;
-    HandlerImpl(CoreRefs& parent)
-        : CoreRefs(parent), client(ProxyType::get(proxy)) {}
-    ProxyType client;
-};
-
-
-
 }} // namespace qpid::broker
 
 
 
 #endif  /*!_broker_HandlerImpl_h*/
+
+

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Sep 18 12:43:29 2007
@@ -139,7 +139,7 @@
     frames.remove(TypeFilter(CONTENT_BODY));
 }
 
-void Message::sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize)
+void Message::sendContent(framing::FrameHandler& out, uint16_t maxFrameSize)
 {
     if (isContentReleased()) {
         //load content from store in chunks of maxContentSize
@@ -148,7 +148,7 @@
         for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize)
         {            
             uint64_t remaining = expectedSize - offset;
-            AMQFrame frame(channel, AMQContentBody());
+            AMQFrame frame(0, AMQContentBody());
             string& data = frame.castBody<AMQContentBody>()->getData();
 
             store->loadContent(*this, data, offset,
@@ -168,15 +168,14 @@
         Count c;
         frames.map_if(c, TypeFilter(CONTENT_BODY));
 
-        SendContent f(out, channel, maxFrameSize, c.getCount());
+        SendContent f(out, maxFrameSize, c.getCount());
         frames.map_if(f, TypeFilter(CONTENT_BODY));
     }
 }
 
-void Message::sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t /*maxFrameSize*/)
+void Message::sendHeader(framing::FrameHandler& out, uint16_t /*maxFrameSize*/)
 {
-    Relay f(out, channel);
-    frames.map_if(f, TypeFilter(HEADER_BODY));    
+    frames.map_if(out, TypeFilter(HEADER_BODY));    
 }
 
 MessageAdapter& Message::getAdapter() const

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Sep 18 12:43:29 2007
@@ -114,8 +114,8 @@
      */
     void releaseContent(MessageStore* store);
 
-    void sendContent(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
-    void sendHeader(framing::FrameHandler& out, uint16_t channel, uint16_t maxFrameSize);
+    void sendContent(framing::FrameHandler& out, uint16_t maxFrameSize);
+    void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize);
 
     bool isContentLoaded() const;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.cpp Tue Sep 18 12:43:29 2007
@@ -23,7 +23,7 @@
 #include "DeliveryToken.h"
 #include "Message.h"
 #include "BrokerQueue.h"
-#include "qpid/framing/ChannelAdapter.h"
+#include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/BasicDeliverBody.h"
 #include "qpid/framing/BasicGetOkBody.h"
 #include "qpid/framing/MessageTransferBody.h"
@@ -114,7 +114,7 @@
 }
 
 void MessageDelivery::deliver(Message::shared_ptr msg, 
-                              framing::ChannelAdapter& channel, 
+                              framing::FrameHandler& handler, 
                               DeliveryId id, 
                               DeliveryToken::shared_ptr token, 
                               uint16_t framesize)
@@ -123,15 +123,10 @@
     //another may well have the wrong headers; however we will only
     //have one content class for 0-10 proper
 
-    FrameHandler& handler = channel.getHandlers().out;
-
-    //send method
     boost::shared_ptr<BaseToken> t = dynamic_pointer_cast<BaseToken>(token);
     AMQFrame method = t->sendMethod(msg, id);
     method.setEof(false);
-    method.setChannel(channel.getId());
     handler.handle(method);
-
-    msg->sendHeader(handler, channel.getId(), framesize);
-    msg->sendContent(handler, channel.getId(), framesize);
+    msg->sendHeader(handler, framesize);
+    msg->sendContent(handler, framesize);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageDelivery.h Tue Sep 18 12:43:29 2007
@@ -23,15 +23,9 @@
  */
 #include <boost/shared_ptr.hpp>
 #include "DeliveryId.h"
+#include "qpid/framing/FrameHandler.h"
 
 namespace qpid {
-
-namespace framing {
-
-class ChannelAdapter;
-
-}
-
 namespace broker {
 
 class DeliveryToken;
@@ -49,7 +43,7 @@
                                                                     u_int8_t confirmMode, 
                                                                     u_int8_t acquireMode);
 
-    static void deliver(boost::shared_ptr<Message> msg, framing::ChannelAdapter& channel, 
+    static void deliver(boost::shared_ptr<Message> msg, framing::FrameHandler& out, 
                         DeliveryId deliveryTag, boost::shared_ptr<DeliveryToken> token, uint16_t framesize);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Tue Sep 18 12:43:29 2007
@@ -36,8 +36,8 @@
 
 using namespace framing;
 
-MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent)
-    : HandlerImplType(parent) {}
+MessageHandlerImpl::MessageHandlerImpl(Session& session)
+    : HandlerImpl(session) {}
 
 //
 // Message class method handlers
@@ -46,7 +46,7 @@
 void
 MessageHandlerImpl::cancel(const string& destination )
 {
-    session.cancel(destination);
+    getSession().cancel(destination);
 }
 
 void
@@ -97,14 +97,14 @@
                             bool exclusive,
                             const framing::FieldTable& filter )
 {
-    Queue::shared_ptr queue = session.getQueue(queueName);
-    if(!destination.empty() && session.exists(destination))
+    Queue::shared_ptr queue = getSession().getQueue(queueName);
+    if(!destination.empty() && getSession().exists(destination))
         throw ConnectionException(530, "Consumer tags must be unique");
 
     string tag = destination;
     //NB: am assuming pre-acquired = 0 as discussed on SIG list as is
     //the previously expected behaviour
-    session.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
+    getSession().consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
                     tag, queue, noLocal, confirmMode == 1, acquireMode == 0, exclusive, &filter);
     // Dispatch messages as there is now a consumer.
     queue->requestDispatch();
@@ -117,9 +117,9 @@
                          const string& destination,
                          bool noAck )
 {
-    Queue::shared_ptr queue = session.getQueue(queueName);
+    Queue::shared_ptr queue = getSession().getQueue(queueName);
     
-    if (session.get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
+    if (getSession().get(MessageDelivery::getMessageDeliveryToken(destination, noAck ? 0 : 1, 0), queue, !noAck)){
         //don't send any response... rely on execution completion
     } else {
         //temporarily disabled:
@@ -148,14 +148,14 @@
                         bool /*global*/ )
 {
     //TODO: handle global
-    session.setPrefetchSize(prefetchSize);
-    session.setPrefetchCount(prefetchCount);
+    getSession().setPrefetchSize(prefetchSize);
+    getSession().setPrefetchCount(prefetchCount);
 }
 
 void
 MessageHandlerImpl::recover(bool requeue)
 {
-    session.recover(requeue);
+    getSession().recover(requeue);
 }
 
 void
@@ -166,7 +166,7 @@
     }
     
     for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
-        session.reject(i->getValue(), (++i)->getValue());
+        getSession().reject(i->getValue(), (++i)->getValue());
     }
 }
 
@@ -175,10 +175,10 @@
     
     if (unit == 0) {
         //message
-        session.addMessageCredit(destination, value);
+        getSession().addMessageCredit(destination, value);
     } else if (unit == 1) {
         //bytes
-        session.addByteCredit(destination, value);
+        getSession().addByteCredit(destination, value);
     } else {
         //unknown
         throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
@@ -190,10 +190,10 @@
 {
     if (mode == 0) {
         //credit
-        session.setCreditMode(destination);
+        getSession().setCreditMode(destination);
     } else if (mode == 1) {
         //window
-        session.setWindowMode(destination);
+        getSession().setWindowMode(destination);
     } else{
         throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);        
     }
@@ -201,12 +201,12 @@
     
 void MessageHandlerImpl::flush(const std::string& destination)
 {
-    session.flush(destination);        
+    getSession().flush(destination);        
 }
 
 void MessageHandlerImpl::stop(const std::string& destination)
 {
-    session.stop(destination);        
+    getSession().stop(destination);        
 }
 
 void MessageHandlerImpl::acquire(const SequenceNumberSet& transfers, u_int8_t /*mode*/)
@@ -218,11 +218,11 @@
     }
     
     for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
-        session.acquire(i->getValue(), (++i)->getValue(), results);
+        getSession().acquire(i->getValue(), (++i)->getValue(), results);
     }
 
     results = results.condense();
-    client.acquired(results);
+    getProxy().getMessage().acquired(results);
 }
 
 void MessageHandlerImpl::release(const SequenceNumberSet& transfers)
@@ -232,7 +232,7 @@
     }
     
     for (SequenceNumberSet::const_iterator i = transfers.begin(); i != transfers.end(); i++) {
-        session.release(i->getValue(), (++i)->getValue());
+        getSession().release(i->getValue(), (++i)->getValue());
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.h Tue Sep 18 12:43:29 2007
@@ -34,10 +34,10 @@
 
 class MessageHandlerImpl :
         public framing::AMQP_ServerOperations::MessageHandler,
-        public HandlerImpl<framing::AMQP_ClientProxy::Message>
+        public HandlerImpl
 {
   public:
-    MessageHandlerImpl(CoreRefs& parent);
+    MessageHandlerImpl(Session&);
 
     void append(const std::string& reference, const std::string& bytes);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Tue Sep 18 12:43:29 2007
@@ -37,13 +37,7 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-SemanticHandler::SemanticHandler(Session& s) :
-    session(s),
-    connection(s.getAdapter()->getConnection()),
-    adapter(s, static_cast<ChannelAdapter&>(*this))
-{
-    init(s.getAdapter()->getChannel(), s.out, 0);
-}
+SemanticHandler::SemanticHandler(Session& s) : HandlerImpl(s) {}
 
 void SemanticHandler::handle(framing::AMQFrame& frame) 
 {    
@@ -86,24 +80,24 @@
     if (outgoing.lwm < mark) {
         outgoing.lwm = mark;
         //ack messages:
-        session.ackCumulative(mark.getValue());
+        getSession().ackCumulative(mark.getValue());
     }
     if (range.size() % 2) { //must be even number        
         throw ConnectionException(530, "Received odd number of elements in ranged mark");
     } else {
         for (SequenceNumberSet::const_iterator i = range.begin(); i != range.end(); i++) {
-            session.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
+            getSession().ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue());
         }
     }
 }
 
 void SemanticHandler::sendCompletion()
 {
-    if (isOpen()) {
+    if (getSessionHandler()) {
         SequenceNumber mark = incoming.getMark();
         SequenceNumberSet range = incoming.getRange();
         Mutex::ScopedLock l(outLock);
-        ChannelAdapter::send(ExecutionCompleteBody(getVersion(), mark.getValue(), range));
+        getProxy().getExecution().complete(mark.getValue(), range);
     }
 }
 void SemanticHandler::flush()
@@ -129,7 +123,8 @@
 
 void SemanticHandler::handleCommand(framing::AMQMethodBody* method)
 {
-    SequenceNumber id = incoming.next();                        
+    SequenceNumber id = incoming.next();
+    BrokerAdapter adapter(getSession());
     InvocationVisitor v(&adapter);
     method->accept(v);
     incoming.complete(id);                                    
@@ -137,7 +132,7 @@
     if (!v.wasHandled()) {
         throw ConnectionException(540, "Not implemented");
     } else if (v.hasResult()) {
-        ChannelAdapter::send(ExecutionResultBody(getVersion(), id.getValue(), v.getResult()));
+        getProxy().getExecution().result(id.getValue(), v.getResult());
     }
     //TODO: if (method->isSync()) { incoming.synch(id); sendCompletion(); }
     //TODO: if window gets too large send unsolicited completion
@@ -159,45 +154,24 @@
     }
     msgBuilder.handle(frame);
     if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
-        msg->setPublisher(&connection);
-        session.handle(msg);        
+        msg->setPublisher(&getConnection());
+        getSession().handle(msg);        
         msgBuilder.end();
         incoming.track(msg);
         //TODO: if (msg.getMethod().isSync()) { incoming.synch(msg.getCommandId()); sendCompletion(); }
     }
 }
 
-bool SemanticHandler::isOpen() const {
-    // FIXME aconway 2007-08-30: remove.
-    return true;
-}
-
 DeliveryId SemanticHandler::deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
 {
     Mutex::ScopedLock l(outLock);
-    MessageDelivery::deliver(msg, *this, ++outgoing.hwm, token, connection.getFrameMax());
+    MessageDelivery::deliver(msg, getSessionHandler()->out, ++outgoing.hwm, token, getConnection().getFrameMax());
     return outgoing.hwm;
 }
 
 void SemanticHandler::redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag)
 {
-    MessageDelivery::deliver(msg, *this, tag, token, connection.getFrameMax());
-}
-
-void SemanticHandler::send(const AMQBody& body)
-{
-    Mutex::ScopedLock l(outLock);
-    // FIXME aconway 2007-08-31: SessionHandler should not send
-    // channel/session commands  via the semantic handler, it should shortcut
-    // directly to its own output handler. That will make the CLASS_ID
-    // part of the test unnecessary.
-    // 
-    if (body.getMethod() &&
-        body.getMethod()->amqpClassId() != ChannelOpenBody::CLASS_ID)
-    {
-        ++outgoing.hwm;
-    }
-    ChannelAdapter::send(body);
+    MessageDelivery::deliver(msg, getSessionHandler()->out, tag, token, getConnection().getFrameMax());
 }
 
 SemanticHandler::TrackId SemanticHandler::getTrack(const AMQFrame& frame)
@@ -225,11 +199,3 @@
     throw Exception("Could not determine track");
 }
 
-//ChannelAdapter virtual methods, no longer used:
-void SemanticHandler::handleMethod(framing::AMQMethodBody*){}
-
-void SemanticHandler::handleHeader(qpid::framing::AMQHeaderBody*) {}
-
-void SemanticHandler::handleContent(qpid::framing::AMQContentBody*) {}
-
-void SemanticHandler::handleHeartbeat(qpid::framing::AMQHeartbeatBody*) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.h Tue Sep 18 12:43:29 2007
@@ -26,12 +26,12 @@
 #include "DeliveryAdapter.h"
 #include "MessageBuilder.h"
 #include "IncomingExecutionContext.h"
+#include "HandlerImpl.h"
 
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/SequenceNumber.h"
-#include "qpid/framing/ChannelAdapter.h"
 
 namespace qpid {
 
@@ -49,11 +49,8 @@
 class SemanticHandler : public DeliveryAdapter,
                         public framing::FrameHandler, 
                         public framing::AMQP_ServerOperations::ExecutionHandler,
-                        private framing::ChannelAdapter
+                        private HandlerImpl
 {
-    Session& session;
-    Connection& connection;
-    BrokerAdapter adapter;
     IncomingExecutionContext incoming;
     framing::Window outgoing;
     sys::Mutex outLock;
@@ -68,17 +65,6 @@
 
     void sendCompletion();
 
-    //ChannelAdapter virtual methods:
-    void handleMethod(framing::AMQMethodBody* method);
-    
-    bool isOpen() const;
-    void handleHeader(framing::AMQHeaderBody*);
-    void handleContent(framing::AMQContentBody*);
-    void handleHeartbeat(framing::AMQHeartbeatBody*);
-
-    void send(const framing::AMQBody& body);
-
-
     //delivery adapter methods:
     DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token);
     void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId tag);
@@ -88,9 +74,6 @@
 
     //frame handler:
     void handle(framing::AMQFrame& frame);
-
-    // FIXME aconway 2007-08-31: Move proxy to Session.
-    framing::AMQP_ClientProxy& getProxy() { return adapter.getProxy(); }
 
     //execution class method handlers:
     void complete(uint32_t cumulativeExecutionMark, const framing::SequenceNumberSet& range);    

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Tue Sep 18 12:43:29 2007
@@ -68,11 +68,9 @@
       flowActive(true)
 {
     outstanding.reset();
-    // FIXME aconway 2007-08-29: handler to get Session, not connection.
     std::auto_ptr<SemanticHandler> semantic(new SemanticHandler(*this));
+    // FIXME aconway 2007-08-29:  move deliveryAdapter to SemanticHandlerState. 
     deliveryAdapter=semantic.get();
-    // FIXME aconway 2007-08-31: Remove, workaround.
-    semanticHandler=semantic.get();
     handlers.push_back(semantic.release());
     in = &handlers[0];
     out = &adapter->out;
@@ -256,7 +254,7 @@
 
 bool Session::ConsumerImpl::deliver(QueuedMessage& msg)
 {
-    if (nolocal && &parent->getAdapter()->getConnection() == msg.payload->getPublisher()) {
+    if (nolocal && &parent->getHandler()->getConnection() == msg.payload->getPublisher()) {
         return false;
     } else {
         if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
@@ -306,7 +304,7 @@
     if(queue) {
         queue->cancel(this);
         if (queue->canAutoDelete()) {            
-            parent->getAdapter()->getConnection().broker.getQueues().destroyIf(queue->getName(), 
+            parent->getHandler()->getConnection().broker.getQueues().destroyIf(queue->getName(), 
                                                                                boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
         }
     }
@@ -333,7 +331,7 @@
 void Session::route(Message::shared_ptr msg, Deliverable& strategy) {
     std::string exchangeName = msg->getExchangeName();      
     if (!cacheExchange || cacheExchange->getName() != exchangeName){
-        cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
+        cacheExchange = getHandler()->getConnection().broker.getExchanges().get(exchangeName);
     }
 
     cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.h Tue Sep 18 12:43:29 2007
@@ -32,7 +32,6 @@
 #include "NameGenerator.h"
 #include "Prefetch.h"
 #include "TxBuffer.h"
-#include "SemanticHandler.h"  // FIXME aconway 2007-08-31: remove
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/AccumulatedAck.h"
 #include "qpid/shared_ptr.h"
@@ -43,11 +42,6 @@
 #include <vector>
 
 namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
 namespace broker {
 
 class SessionHandler;
@@ -129,20 +123,15 @@
     ConsumerImpl& find(const std::string& destination);
     void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
     void acknowledged(const DeliveryRecord&);
-
-    // FIXME aconway 2007-08-31: remove, temporary hack.
-    SemanticHandler* semanticHandler;
-
     AckRange findRange(DeliveryId first, DeliveryId last);
-    
 
   public:
     Session(SessionHandler&, uint32_t timeout);
     ~Session();
 
     /** Returns 0 if this session is not currently attached */
-    SessionHandler* getAdapter() { return adapter; }
-    const SessionHandler* getAdapter() const { return adapter; }
+    SessionHandler* getHandler() { return adapter; }
+    const SessionHandler* getHandler() const { return adapter; }
 
     Broker& getBroker() const { return broker; }
     
@@ -198,13 +187,7 @@
     void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
     void release(DeliveryId first, DeliveryId last);
     void reject(DeliveryId first, DeliveryId last);
-
     void handle(Message::shared_ptr msg);
-
-    framing::AMQP_ClientProxy& getProxy() {
-        // FIXME aconway 2007-08-31: Move proxy to Session.
-        return semanticHandler->getProxy();
-    }    
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Sep 18 12:43:29 2007
@@ -29,18 +29,10 @@
 namespace broker {
 using namespace framing;
 
-// FIXME aconway 2007-08-31: the SessionHandler should create its
-// private proxy directly on the connections out handler.
-// Session/channel methods should not go thru the other layers.
-// Need to get rid of ChannelAdapter and allow proxies to be created
-// directly on output handlers.
-// 
-framing::AMQP_ClientProxy& SessionHandler::getProxy() {
-    return session->getProxy();
-}
-
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
-    : InOutHandler(0, &c.getOutput()), connection(c), channel(ch), ignoring(false), channelHandler(*this) {}
+    : InOutHandler(0, &c.getOutput()),
+      connection(c), channel(ch), proxy(out),
+      ignoring(false), channelHandler(*this) {}
 
 SessionHandler::~SessionHandler() {}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Sep 18 12:43:29 2007
@@ -24,14 +24,10 @@
 
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
 
 namespace qpid {
-
-namespace framing {
-class AMQP_ClientProxy;
-}
-
 namespace broker {
 
 class Connection;
@@ -51,12 +47,17 @@
     ~SessionHandler();
 
     /** Returns 0 if not attached to a session */
-    Session* getSession() const { return session.get(); }
+    Session* getSession() { return session.get(); }
+    const Session* getSession() const { return session.get(); }
 
     framing::ChannelId getChannel() const { return channel; }
+    
     Connection& getConnection() { return connection; }
     const Connection& getConnection() const { return connection; }
 
+    framing::AMQP_ClientProxy& getProxy() { return proxy; }
+    const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+
   protected:
     void handleIn(framing::AMQFrame&);
     void handleOut(framing::AMQFrame&);
@@ -84,10 +85,9 @@
     void assertOpen(const char* method);
     void assertClosed(const char* method);
 
-    framing::AMQP_ClientProxy& getProxy();
-    
     Connection& connection;
     const framing::ChannelId channel;
+    framing::AMQP_ClientProxy proxy;
     shared_ptr<Session> session;
     bool ignoring;
     ChannelMethods channelHandler;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.cpp Tue Sep 18 12:43:29 2007
@@ -21,7 +21,7 @@
 
 #include "SendContent.h"
 
-qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c), 
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t mfs, uint efc) : handler(h), 
                                                                                               maxFrameSize(mfs),
                                                                                                expectedFrameCount(efc), frameCount(0) {}
 
@@ -45,14 +45,13 @@
     } else {
         AMQFrame copy(f);
         setFlags(copy, first, last);
-        copy.setChannel(channel);
         handler.handle(copy);
     }        
 }
 
 void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const
 {
-    AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+    AMQFrame fragment(0, AMQContentBody(body.getData().substr(offset, size)));
     setFlags(fragment, first, last);
     handler.handle(fragment);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SendContent.h Tue Sep 18 12:43:29 2007
@@ -37,7 +37,6 @@
 class SendContent
 {
     mutable FrameHandler& handler;
-    const uint16_t channel;
     const uint16_t maxFrameSize;
     uint expectedFrameCount;
     uint frameCount;
@@ -45,7 +44,7 @@
     void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const;
     void setFlags(AMQFrame& f, bool first, bool last) const;
 public:
-    SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount);
+    SendContent(FrameHandler& _handler, uint16_t _maxFrameSize, uint frameCount);
     void operator()(const AMQFrame& f);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h?rev=577027&r1=577026&r2=577027&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/frame_functors.h Tue Sep 18 12:43:29 2007
@@ -82,22 +82,6 @@
     void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); }
 };
 
-class Relay
-{
-    FrameHandler& handler;
-    const uint16_t channel;
-
-public:
-    Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {}
-
-    void operator()(AMQFrame& f)
-    {
-        AMQFrame copy(f);
-        copy.setChannel(channel);
-        handler.handle(copy);
-    }
-};
-
 class Print
 {
     std::ostream& out;



Mime
View raw message