qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r588761 [1/3] - in /incubator/qpid/trunk/qpid/cpp: rubygen/templates/ src/ src/qpid/ src/qpid/broker/ src/qpid/client/ src/qpid/framing/ src/qpid/log/ src/qpid/sys/ src/qpid/sys/apr/ src/qpid/sys/posix/ src/tests/
Date Fri, 26 Oct 2007 19:48:37 GMT
Author: aconway
Date: Fri Oct 26 12:48:31 2007
New Revision: 588761

URL: http://svn.apache.org/viewvc?rev=588761&view=rev
Log:


Session resume support in client & broker: Client can resume a session
after voluntary suspend() or network failure. Frames lost in network
failure are automatically re-transmitted for transparent re-connection.

client::Session improvements:
 - Locking to avoid races between network & user threads.
 - Replaced client::StateManager with sys::StateMonitor - avoid heap allocation.

qpid::Exception clean up:
 - use QPID_MSG consistently to format exception messages.
 - throw typed exceptions (in reply_exceptions.h) for AMQP exceptions.
 - re-throw correct typed exception on client for exceptions from broker.
 - Removed QpidError.h

rubygen/templates/constants.rb: 
 - constants.h: Added FOO_CLASS_ID and FOO_BAR_METHOD_ID constants.
 - reply_constants.h: Added throwReplyException(code, text)
 
log::Logger:
 - Fixed shutdown race in Statement::~Initializer()

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SessionState.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TemplateVisitor.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/StateMonitor.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Waitable.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/SessionState.cpp   (with props)
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/QpidError.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolVersionException.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ProtocolVersionException.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ResumeHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ResumeHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ResumeHandler.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/Msg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionCore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/StateManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/BodyHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldValue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FramingContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Uuid.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_framing.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/amqp_types.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/variant.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Logger.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConcurrentQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ScopedIncrement.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRBase.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/APRBase.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Shlib.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Socket.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/check.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ConcurrentQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/EventChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/FramingTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/HeadersExchangeTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/InProcessBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Shlib.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
    incubator/qpid/trunk/qpid/cpp/src/tests/client_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/echo_service.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/logging.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/perftest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/run-unit-tests
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_listener.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/topic_publisher.cpp

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/templates/constants.rb Fri Oct 26 12:48:31 2007
@@ -10,31 +10,71 @@
     @dir="qpid/framing"
   end
 
-  def generate()
+  def constants_h()
     h_file("#{@dir}/constants") {
       namespace(@namespace) { 
-        scope("enum AmqpConstant {","};") { 
-          genl @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }.join(",\n")
-        }
-      }
-    }
-    
+        scope("enum AmqpConstant {","};") {
+          l=[]
+          l.concat @amqp.constants.map { |c| "#{c.name.shout}=#{c.value}" }
+          @amqp.classes.each { |c|
+            l << "#{c.name.shout}_CLASS_ID=#{c.index}"
+            l.concat c.methods_.map { |m|
+              "#{c.name.shout}_#{m.name.shout}_METHOD_ID=#{m.index}" }
+          }
+          genl l.join(",\n")
+        }}}
+  end
+
+  def exbase(c)
+    case c.class_
+    when "soft-error" then "ChannelException"
+    when "hard-error" then "ConnectionException"
+    end
+  end
+
+  def reply_exceptions_h()
     h_file("#{@dir}/reply_exceptions") {
       include "qpid/Exception"
       namespace(@namespace) {
         @amqp.constants.each { |c|
-          if c.class_
-            exname=c.name.caps+"Exception"
-            base = c.class_=="soft-error" ? "ChannelException" : "ConnectionException"
-            text=(c.doc or c.name).tr_s!(" \t\n"," ")
-            struct(exname, base) {
-              genl "#{exname}(const std::string& msg=\"#{text})\") : #{base}(#{c.value}, msg) {}"
+          base = exbase c
+          if base
+            genl
+            struct(c.name.caps+"Exception", base) {
+              genl "#{c.name.caps}Exception(const std::string& msg=std::string()) : #{base}(#{c.value}, \"#{c.name}: \"+msg) {}"
             }
           end
         }
+        genl
+        genl "void throwReplyException(int code, const std::string& text);"
       }
     }
-    
+  end
+
+  def reply_exceptions_cpp()
+    cpp_file("#{@dir}/reply_exceptions") {
+      include "#{@dir}/reply_exceptions"
+      include "<sstream>"
+      namespace("qpid::framing") {
+        scope("void throwReplyException(int code, const std::string& text) {"){
+          scope("switch (code) {") {
+            genl "case 200: break; // No exception"
+            @amqp.constants.each {  |c|
+              if exbase c 
+                genl "case #{c.value}: throw #{c.name.caps}Exception(text);"
+              end
+            }
+            scope("default:","") {
+              genl "std::ostringstream msg;"
+              genl 'msg << "Invalid reply code " << code << ": " << text;'
+              genl 'throw InvalidArgumentException(msg.str());'
+            }}}}}
+  end
+
+  def generate()
+    constants_h
+    reply_exceptions_h
+    reply_exceptions_cpp
   end
 end
 

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Oct 26 12:48:31 2007
@@ -52,7 +52,6 @@
 
 posix_plat_src = \
   qpid/sys/epoll/EpollPoller.cpp \
-  qpid/sys/posix/check.cpp \
   qpid/sys/posix/Socket.cpp \
   qpid/sys/posix/AsynchIO.cpp \
   qpid/sys/posix/Time.cpp \
@@ -110,8 +109,7 @@
   qpid/framing/InitiationHandler.cpp \
   qpid/framing/ProtocolInitiation.cpp \
   qpid/framing/ProtocolVersion.cpp \
-  qpid/framing/ProtocolVersionException.cpp \
-  qpid/framing/ResumeHandler.cpp qpid/framing/ResumeHandler.h \
+  qpid/framing/SessionState.cpp qpid/framing/SessionState.h \
   qpid/framing/SendContent.cpp \
   qpid/framing/SequenceNumber.cpp \
   qpid/framing/SequenceNumberSet.cpp \
@@ -126,7 +124,6 @@
   qpid/Exception.cpp \
   qpid/Plugin.cpp \
   qpid/Url.cpp \
-  qpid/QpidError.cpp \
   qpid/sys/AsynchIOAcceptor.cpp \
   qpid/sys/Dispatcher.cpp \
   qpid/sys/Runnable.cpp \
@@ -210,7 +207,6 @@
   qpid/client/MessageListener.cpp		\
   qpid/client/Correlator.cpp			\
   qpid/client/CompletionTracker.cpp		\
-  qpid/client/SessionHandler.cpp		\
   qpid/client/ConnectionHandler.cpp		\
   qpid/client/ExecutionHandler.cpp		\
   qpid/client/FutureCompletion.cpp		\
@@ -221,12 +217,12 @@
 
 nobase_include_HEADERS = \
   $(platform_hdr) \
+  qpid/assert.h \
   qpid/Exception.h \
   qpid/ExceptionHolder.h \
   qpid/Msg.h \
   qpid/Options.h \
   qpid/Plugin.h \
-  qpid/QpidError.h \
   qpid/SharedObject.h \
   qpid/Url.h \
   qpid/memory.h \
@@ -323,7 +319,6 @@
   qpid/client/MessageQueue.h \
   qpid/client/Response.h \
   qpid/client/SessionCore.h \
-  qpid/client/SessionHandler.h \
   qpid/client/StateManager.h \
   qpid/client/TypedResult.h \
   qpid/framing/AMQBody.h \
@@ -339,6 +334,7 @@
   qpid/framing/Blob.h \
   qpid/framing/BodyHandler.h \
   qpid/framing/Buffer.h \
+  qpid/framing/ChannelHandler.h \
   qpid/framing/ChannelAdapter.h \
   qpid/framing/FieldTable.h \
   qpid/framing/FieldValue.h \
@@ -360,7 +356,6 @@
   qpid/framing/OutputHandler.h \
   qpid/framing/ProtocolInitiation.h \
   qpid/framing/ProtocolVersion.h \
-  qpid/framing/ProtocolVersionException.h \
   qpid/framing/Proxy.h \
   qpid/framing/SendContent.h \
   qpid/framing/SequenceNumber.h \
@@ -399,6 +394,8 @@
   qpid/sys/Shlib.h \
   qpid/sys/ShutdownHandler.h \
   qpid/sys/Socket.h \
+  qpid/sys/StateMonitor.h \
+  qpid/sys/Waitable.h \
   qpid/sys/Thread.h \
   qpid/sys/Time.h \
   qpid/sys/TimeoutHandler.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Fri Oct 26 12:48:31 2007
@@ -23,6 +23,7 @@
 #include "Exception.h"
 #include <typeinfo>
 #include <errno.h>
+#include <assert.h>
 
 namespace qpid {
 
@@ -31,45 +32,22 @@
     return std::string(strerror_r(err, buf, sizeof(buf)));
 }
 
-static void ctorLog(const std::exception* e) {
-    QPID_LOG(trace, "Exception: " << e->what());
+Exception::Exception(const std::string& s) throw() : msg(s) {
+    QPID_LOG(warning, "Exception: " << msg);
 }
-    
-Exception::Exception() throw() { ctorLog(this); }
-
-Exception::Exception(const std::string& str) throw()
-    : whatStr(str) { ctorLog(this); }
-
-Exception::Exception(const char* str) throw() : whatStr(str) { ctorLog(this); }
-
-Exception::Exception(const std::exception& e) throw() : whatStr(e.what()) {}
 
 Exception::~Exception() throw() {}
 
-const char* Exception::what() const throw() { return whatStr.c_str(); }
-
-std::string Exception::toString() const throw() { return whatStr; }
-
-Exception::auto_ptr Exception::clone() const throw() { return Exception::auto_ptr(new Exception(*this)); }
-
-void Exception::throwSelf() const  { throw *this; }
-
-ShutdownException::ShutdownException() : Exception("Shut down.") {}
-
-EmptyException::EmptyException() : Exception("Empty.") {}
-
-const char* Exception::defaultMessage = "Unexpected exception";
-
-void Exception::log(const char* what, const char* message) {
-    QPID_LOG(error, message << ": " << what);
+std::string Exception::str() const throw() {
+    if (msg.empty())
+        const_cast<std::string&>(msg).assign(typeid(*this).name());
+    return msg;
 }
 
-void Exception::log(const std::exception& e, const char* message) {
-    log(e.what(), message);
-}
+const char* Exception::what() const throw() { return str().c_str(); }
 
-void Exception::logUnknown(const char* message) {
-    log("unknown exception.", message);
+std::auto_ptr<Exception> Exception::clone() const throw() {
+    return std::auto_ptr<Exception>(new Exception(*this));
 }
 
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Fri Oct 26 12:48:31 2007
@@ -24,135 +24,52 @@
 
 #include "qpid/framing/amqp_types.h"
 #include "qpid/Msg.h"
-#include <exception>
-#include <string>
+
 #include <memory>
-#include <boost/shared_ptr.hpp>
-#include <boost/lexical_cast.hpp>
-#include <boost/function.hpp>
+#include <string>
 
 namespace qpid
 {
 
-/** Get the error message for error number err. */
+/** Get the error message for a system number err, e.g. errno. */
 std::string strError(int err);
 
 /**
- * Exception base class for all Qpid exceptions.
+ * Base class for Qpid runtime exceptions.
  */
 class Exception : public std::exception
 {
-  protected:
-    std::string whatStr;
-
   public:
-    typedef boost::shared_ptr<Exception> shared_ptr;
-    typedef boost::shared_ptr<const Exception> shared_ptr_const;
-    typedef std::auto_ptr<Exception> auto_ptr;
-
-    Exception() throw();
-    Exception(const std::string& str) throw();
-    Exception(const char* str) throw();
-    Exception(const std::exception&) throw();
-
-    /** Allow any type that has ostream operator<< to act as message */
-    template <class T>
-    Exception(const T& message)
-        : whatStr(boost::lexical_cast<std::string>(message)) {}
-
+    explicit Exception(const std::string& str=std::string()) throw();
     virtual ~Exception() throw();
-
-    virtual const char* what() const throw();
-    virtual std::string toString() const throw();
-
-    virtual auto_ptr clone() const throw();
-    virtual void throwSelf() const;
-
-    /** Default message: "Unknown exception" or something like it. */
-    static const char* defaultMessage;
-
-    /**
-     * Log a message of the form "message: what"
-     *@param what Exception's what() message.
-     *@param message Prefix message.
-     */
-    static void log(const char* what, const char* message = defaultMessage);
-
-    /**
-     * Log an exception.
-     *@param e Exception to log.
-
-     */
-    static void log(
-        const std::exception& e, const char* message = defaultMessage);
     
-
-    /**
-     * Log an unknown exception - use in catch(...)
-     *@param message Prefix message.
-     */
-    static void logUnknown(const char* message = defaultMessage);
-
-    /**
-     * Wrapper template function to call another function inside
-     * try/catch and log any exception. Use boost::bind to wrap
-     * member function calls or functions with arguments.
-     * 
-     *@param f Function to call in try block.
-     *@param retrhow If true the exception is rethrown.
-     *@param message Prefix message.
-     */
-    template <class T>
-    static T tryCatchLog(boost::function0<T> f, bool rethrow=true,
-                         const char* message=defaultMessage)
-    {
-        try {
-            return f();
-        }
-        catch (const std::exception& e) {
-            log(e, message);
-            if (rethrow)
-                throw;
-        }
-        catch (...) {
-            logUnknown(message);
-            if (rethrow)
-                throw;
-        }
-    }
+    virtual const char *what() const throw();
+    virtual std::auto_ptr<Exception> clone() const throw();
+    virtual std::string str() const throw();
+  private:
+    std::string msg;
 };
 
 struct ChannelException : public Exception {
-    framing::ReplyCode code;
-    template <class T>
-    ChannelException(framing::ReplyCode code_, const T& message)
+    const framing::ReplyCode code;
+    ChannelException(framing::ReplyCode code_, const std::string& message)
         : Exception(message), code(code_) {}
-    void throwSelf() const { throw *this; }
 };
 
 struct ConnectionException : public Exception {
-    framing::ReplyCode code;
-    template <class T>
-    ConnectionException(framing::ReplyCode code_, const T& message)
+    const framing::ReplyCode code;
+    ConnectionException(framing::ReplyCode code_, const std::string& message)
         : Exception(message), code(code_) {}
-    void throwSelf() const { throw *this; }
 };
 
-/**
- * Exception used to indicate that a thread should shut down.
- * Does not indicate an error that should be signalled to the user.
+/** Clone an exception.
+ * For qpid::Exception this calls the clone member function.
+ * For standard exceptions, uses the copy constructor.
+ * For unknown exception types creates a std::exception
+ * with the same what() string.
  */
-struct ShutdownException : public Exception {
-    ShutdownException();
-    void throwSelf() const { throw *this; }
-};
-
-/** Exception to indicate empty queue or other empty state */
-struct EmptyException : public Exception {
-    EmptyException();
-    void throwSelf() const { throw *this; }
-};
+std::auto_ptr<std::exception> clone(const std::exception&);
 
-}
+} // namespace qpid
 
 #endif  /*!_Exception_*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Msg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Msg.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Msg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Msg.h Fri Oct 26 12:48:31 2007
@@ -54,7 +54,7 @@
 }
 
 /** Construct a message using operator << and append (file:line) */
-#define QPID_MSG(message) Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")"
+#define QPID_MSG(message) ::qpid::Msg() << message << " (" << __FILE__ << ":" << __LINE__ << ")"
 
 } // namespace qpid
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Oct 26 12:48:31 2007
@@ -64,7 +64,8 @@
     storeDir("/var"),
     storeAsync(false),
     enableMgmt(0),
-    mgmtPubInterval(10)
+    mgmtPubInterval(10),
+    ack(100)	    
 {
     addOptions()
         ("port,p", optValue(port,"PORT"),
@@ -102,7 +103,8 @@
     queues(store.get()),
     stagingThreshold(0),
     factory(*this),
-    dtxManager(store.get())
+    dtxManager(store.get()),
+    sessionManager(conf.ack)
 {
     if(conf.enableMgmt){
         managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Oct 26 12:48:31 2007
@@ -69,6 +69,7 @@
 	bool storeAsync;
 	bool enableMgmt;
 	uint16_t mgmtPubInterval;
+        uint32_t ack;
     };
     
     virtual ~Broker();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Fri Oct 26 12:48:31 2007
@@ -21,6 +21,7 @@
 #include "MessageDelivery.h"
 #include "qpid/framing/AMQMethodBody.h"
 #include "qpid/Exception.h"
+#include "qpid/framing/reply_exceptions.h"
 
 namespace qpid {
 namespace broker {
@@ -75,8 +76,7 @@
                 checkAlternate(response.first, alternate);
             }
         }catch(UnknownExchangeTypeException& e){
-            throw ConnectionException(
-                503, "Exchange type not implemented: " + type);
+            throw CommandInvalidException(QPID_MSG("Exchange type not implemented: " << type));
         }
     }
 }
@@ -84,24 +84,23 @@
 void BrokerAdapter::ExchangeHandlerImpl::checkType(Exchange::shared_ptr exchange, const std::string& type)
 {
     if (!type.empty() && exchange->getType() != type) {
-        throw ConnectionException(530, "Exchange declared to be of type " + exchange->getType() + ", requested " + type);
+        throw NotAllowedException(QPID_MSG("Exchange declared to be of type " << exchange->getType() << ", requested " << type));
     }
 }
 
 void BrokerAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr exchange, Exchange::shared_ptr alternate)
 {
-    if (alternate && alternate != exchange->getAlternate()) {
-        throw ConnectionException(530, "Exchange declared with alternate-exchange "
-                                  + exchange->getAlternate()->getName() + ", requested " 
-                                  + alternate->getName());
-    }
-
+    if (alternate && alternate != exchange->getAlternate()) 
+        throw NotAllowedException(
+            QPID_MSG("Exchange declared with alternate-exchange "
+                     << exchange->getAlternate()->getName() << ", requested " 
+                     << alternate->getName()));
 }
                 
 void BrokerAdapter::ExchangeHandlerImpl::delete_(uint16_t /*ticket*/, const string& name, bool /*ifUnused*/){
     //TODO: implement unused
     Exchange::shared_ptr exchange(getBroker().getExchanges().get(name));
-    if (exchange->inUseAsAlternate()) throw ConnectionException(530, "Exchange in use as alternate-exchange.");
+    if (exchange->inUseAsAlternate()) throw NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange."));
     if (exchange->isDurable()) getBroker().getStore().destroy(*exchange);
     if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers();
     getBroker().getExchanges().destroy(name);
@@ -292,7 +291,7 @@
     
     Queue::shared_ptr queue = state.getQueue(queueName);    
     if(!consumerTag.empty() && state.exists(consumerTag)){
-        throw ConnectionException(530, "Consumer tags must be unique");
+        throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
     }
     string newTag = consumerTag;
     //need to generate name here, so we have it for the adapter (it is

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Fri Oct 26 12:48:31 2007
@@ -82,7 +82,7 @@
         throw framing::NotImplementedException("Tunnel class not implemented"); }
 
     // Handlers no longer implemented in BrokerAdapter:
-#define BADHANDLER() assert(0); throw framing::InternalErrorException()
+#define BADHANDLER() assert(0); throw framing::NotImplementedException("")
     ExecutionHandler* getExecutionHandler() { BADHANDLER(); }
     ConnectionHandler* getConnectionHandler() { BADHANDLER(); }
     SessionHandler* getSessionHandler() { BADHANDLER(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Oct 26 12:48:31 2007
@@ -28,9 +28,10 @@
 #include "BrokerAdapter.h"
 #include "SemanticHandler.h"
 
-#include <boost/utility/in_place_factory.hpp>
 #include <boost/bind.hpp>
 
+#include <algorithm>
+
 using namespace boost;
 using namespace qpid::sys;
 using namespace qpid::framing;
@@ -61,6 +62,7 @@
     ReplyCode code, const string& text, ClassId classId, MethodId methodId)
 {
     adapter.close(code, text, classId, methodId);
+    channels.clear();
     getOutput().close();
 }
 
@@ -73,8 +75,11 @@
 
 void Connection::idleIn(){}
 
-void Connection::closed(){
+void Connection::closed(){ // Physically closed, suspend open sessions.
     try {
+        std::for_each(
+            channels.begin(), channels.end(),
+            boost::bind(&SessionHandler::localSuspend, _1));
         while (!exclusiveQueues.empty()) {
             Queue::shared_ptr q(exclusiveQueues.front());
             q->releaseExclusiveOwnership();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Fri Oct 26 12:48:31 2007
@@ -46,9 +46,9 @@
     AMQMethodBody* method=frame.getBody()->getMethod();
     try{
         if (!invoke(*handler.get(), *method))
-            throw ConnectionException(503, "Class can't be accessed over channel 0");
+            throw ChannelErrorException(QPID_MSG("Class can't be accessed over channel 0"));
     }catch(ConnectionException& e){
-        handler->client.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
+        handler->client.close(e.code, e.what(), method->amqpClassId(), method->amqpMethodId());
     }catch(std::exception& e){
         handler->client.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Daemon.cpp Fri Oct 26 12:48:31 2007
@@ -17,7 +17,7 @@
  */
 #include "Daemon.h"
 #include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
 
 #include <boost/iostreams/stream.hpp>
 #include <boost/iostreams/device/file_descriptor.hpp>

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Oct 26 12:48:31 2007
@@ -44,7 +44,7 @@
         if (fail) {
             state.endDtx(xid, true);
             if (suspend) {
-                throw ConnectionException(503, "End and suspend cannot both be set.");
+                throw CommandInvalidException(QPID_MSG("End and suspend cannot both be set."));
             } else {
                 return DtxDemarcationEndResult(XA_RBROLLBACK);
             }
@@ -67,7 +67,7 @@
                            bool resume)
 {
     if (join && resume) {
-        throw ConnectionException(503, "Join and resume cannot both be set.");
+        throw CommandInvalidException(QPID_MSG("Join and resume cannot both be set."));
     }
     try {
         if (resume) {
@@ -161,7 +161,7 @@
                             const string& xid)
 {
     //Currently no heuristic completion is supported, so this should never be used.
-    throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid);
+    throw CommandInvalidException(QPID_MSG("Forget is invalid. Branch with xid "  << xid << " not heuristically completed!"));
 }
 
 DtxCoordinationGetTimeoutResult DtxHandlerImpl::getTimeout(const string& xid)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri Oct 26 12:48:31 2007
@@ -20,16 +20,20 @@
  */
 #include "DtxManager.h"
 #include "DtxTimeout.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <iostream>
 using qpid::sys::Mutex;
 
 using namespace qpid::broker;
+using namespace qpid::framing;
 
 DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
 
-DtxManager::~DtxManager() {}
+DtxManager::~DtxManager() {
+    // timer.stop(); // FIXME aconway 2007-10-23: leaking threads.
+}
 
 void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
 {
@@ -84,7 +88,7 @@
     Mutex::ScopedLock locker(lock); 
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
-        throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+        throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
     }
     return i;
 }
@@ -94,7 +98,7 @@
     Mutex::ScopedLock locker(lock); 
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
-        throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+        throw InvalidArgumentException(QPID_MSG("Unrecognised xid " << xid));
     } else {
         work.erase(i);
     }
@@ -105,7 +109,7 @@
     Mutex::ScopedLock locker(lock); 
     WorkMap::iterator i = work.find(xid);
     if (i != work.end()) {
-        throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid);
+        throw CommandInvalidException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
     } else {
         return work.insert(xid, new DtxWorkRecord(xid, store)).first;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h Fri Oct 26 12:48:31 2007
@@ -29,12 +29,7 @@
 
 class DtxManager;
 
-
-struct DtxTimeoutException : public Exception 
-{
-    DtxTimeoutException() {}
-};
-
+struct DtxTimeoutException : public Exception {};
 
 struct DtxTimeout : public TimerTask
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Oct 26 12:48:31 2007
@@ -19,12 +19,14 @@
  *
  */
 #include "DtxWorkRecord.h"
+#include "qpid/framing/reply_exceptions.h"
 #include <boost/format.hpp>
 #include <boost/mem_fn.hpp>
 using boost::mem_fn;
 using qpid::sys::Mutex;
 
 using namespace qpid::broker;
+using namespace qpid::framing;
 
 DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : 
     xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
@@ -71,8 +73,7 @@
         if (prepared) {
             //already prepared i.e. 2pc
             if (onePhase) {
-                throw ConnectionException(503, 
-                    boost::format("Branch with xid %1% has been prepared, one-phase option not valid!") % xid);        
+                throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
             }
 
             store->commit(*txn);
@@ -83,8 +84,7 @@
         } else {
             //1pc commit optimisation, don't need a 2pc transaction context:
             if (!onePhase) {
-                throw ConnectionException(503, 
-                    boost::format("Branch with xid %1% has not been prepared, one-phase option required!") % xid);        
+                throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));        
             }
             std::auto_ptr<TransactionContext> localtxn = store->begin();
             if (prepare(localtxn.get())) {
@@ -119,7 +119,7 @@
         throw DtxTimeoutException();
     }
     if (completed) {
-        throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid);
+        throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
     }
     work.push_back(ops);
 }
@@ -133,7 +133,7 @@
         //iterate through all DtxBuffers and ensure they are all ended
         for (Work::iterator i = work.begin(); i != work.end(); i++) {
             if (!(*i)->isEnded()) {
-                throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+                throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " not completed!"));
             } else if ((*i)->isRollbackOnly()) {
                 rolledback = true;
             }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Fri Oct 26 12:48:31 2007
@@ -24,6 +24,7 @@
 #include "HeadersExchange.h"
 #include "TopicExchange.h"
 #include "ManagementExchange.h"
+#include "qpid/framing/reply_exceptions.h"
 
 using namespace qpid::broker;
 using namespace qpid::sys;
@@ -75,9 +76,8 @@
 Exchange::shared_ptr ExchangeRegistry::get(const string& name){
     RWlock::ScopedRlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
-    if (i == exchanges.end()) {
-        throw ChannelException(404, "Exchange not found: " + name);
-    }
+    if (i == exchanges.end())
+        throw framing::NotFoundException(QPID_MSG("Exchange not found: " << name));
     return i->second;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Fri Oct 26 12:48:31 2007
@@ -20,7 +20,7 @@
  */
 #include "HeadersExchange.h"
 #include "qpid/framing/FieldValue.h"
-#include "qpid/QpidError.h"
+#include "qpid/framing/reply_exceptions.h"
 #include <algorithm>
 
 
@@ -46,9 +46,8 @@
 bool HeadersExchange::bind(Queue::shared_ptr queue, const string& /*routingKey*/, const FieldTable* args){
     RWlock::ScopedWlock locker(lock);
     FieldTable::ValuePtr what = args->get(x_match);
-    if (!what || (*what != all && *what != any)) {
-        THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
-    }
+    if (!what || (*what != all && *what != any)) 
+        throw InternalErrorException(QPID_MSG("Invalid x-match value binding to headers exchange."));
     Binding binding(*args, queue);
     Bindings::iterator i =
         std::find(bindings.begin(),bindings.end(), binding);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Fri Oct 26 12:48:31 2007
@@ -16,7 +16,7 @@
  *
  */
 
-#include "qpid/QpidError.h"
+#include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 #include "MessageHandlerImpl.h"
 #include "qpid/framing/FramingContent.h"
@@ -56,39 +56,39 @@
 void
 MessageHandlerImpl::open(const string& /*reference*/)
 {
-    throw ConnectionException(540, "References no longer supported");
+    throw NotImplementedException("References no longer supported");
 }
 
 void
 MessageHandlerImpl::append(const std::string& /*reference*/, const std::string& /*bytes*/)
 {
-    throw ConnectionException(540, "References no longer supported");
+    throw NotImplementedException("References no longer supported");
 }
 
 void
 MessageHandlerImpl::close(const string& /*reference*/)
 {
-    throw ConnectionException(540, "References no longer supported");
+    throw NotImplementedException("References no longer supported");
 }
 
 void
 MessageHandlerImpl::checkpoint(const string& /*reference*/,
                                const string& /*identifier*/ )
 {
-    throw ConnectionException(540, "References no longer supported");
+    throw NotImplementedException("References no longer supported");
 }
 
 void
 MessageHandlerImpl::resume(const string& /*reference*/,
                            const string& /*identifier*/ )
 {
-    throw ConnectionException(540, "References no longer supported");
+    throw NotImplementedException("References no longer supported");
 }
 
 void
 MessageHandlerImpl::offset(uint64_t /*value*/ )
 {
-    throw ConnectionException(540, "References no longer supported");
+    throw NotImplementedException("References no longer supported");
 }
 
 void
@@ -97,19 +97,19 @@
                         const string& /*destination*/,
                         bool /*noAck*/ )
 {
-    throw ConnectionException(540, "get no longer supported");
+    throw NotImplementedException("get no longer supported");
 }
 
 void
 MessageHandlerImpl::empty()
 {
-    throw ConnectionException(540, "empty no longer supported");
+    throw NotImplementedException("empty no longer supported");
 }
 
 void
 MessageHandlerImpl::ok()
 {
-    throw ConnectionException(540, "Message.Ok no longer supported");    
+    throw NotImplementedException("Message.Ok no longer supported");    
 }
 
 void
@@ -134,7 +134,7 @@
 {
     Queue::shared_ptr queue = state.getQueue(queueName);
     if(!destination.empty() && state.exists(destination))
-        throw ConnectionException(530, "Consumer tags must be unique");
+        throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
 
     string tag = destination;
     state.consume(MessageDelivery::getMessageDeliveryToken(destination, confirmMode, acquireMode), 
@@ -165,7 +165,7 @@
         state.addByteCredit(destination, value);
     } else {
         //unknown
-        throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit);
+        throw SyntaxErrorException(QPID_MSG("Invalid value for unit " << unit));
     }
     
 }
@@ -179,7 +179,7 @@
         //window
         state.setWindowMode(destination);
     } else{
-        throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode);        
+        throw SyntaxErrorException(QPID_MSG("Invalid value for mode " << mode));        
     }
 }
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 26 12:48:31 2007
@@ -19,9 +19,8 @@
  *
  */
 
-#include <boost/format.hpp>
-
 #include "qpid/log/Statement.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "Broker.h"
 #include "Queue.h"
 #include "Exchange.h"
@@ -37,7 +36,6 @@
 using namespace qpid::broker;
 using namespace qpid::sys;
 using namespace qpid::framing;
-using boost::format;
 
 Queue::Queue(const string& _name, bool _autodelete, 
              MessageStore* const _store,
@@ -269,17 +267,15 @@
 void Queue::consume(Consumer::ptr c, bool requestExclusive){
     RWlock::ScopedWlock locker(consumerLock);
     if(exclusive) {
-        throw ChannelException(
-            403, format("Queue '%s' has an exclusive consumer."
-                        " No more consumers allowed.") % getName());
+        throw AccessRefusedException(
+            QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
     }
     if(requestExclusive) {
         if(acquirers.empty() && browsers.empty()) {
             exclusive = c;
         } else {
-            throw ChannelException(
-                403, format("Queue '%s' already has consumers."
-                            "Exclusive access denied.") % getName());
+            throw AccessRefusedException(
+                QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
         }
     }
     if (c->preAcquires()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Fri Oct 26 12:48:31 2007
@@ -125,7 +125,7 @@
     incoming.complete(id);                                    
     
     if (!invoker.wasHandled()) {
-        throw ConnectionException(540, "Not implemented");
+        throw NotImplementedException("Not implemented");
     } else if (invoker.hasResult()) {
         session.getProxy().getExecution().result(id.getValue(), invoker.getResult());
     }
@@ -139,7 +139,7 @@
 void SemanticHandler::handleL3(framing::AMQMethodBody* method)
 {
     if (!invoke(*this, *method))
-        throw ConnectionException(540, "Not implemented");
+        throw NotImplementedException("Not implemented");
 }
 
 void SemanticHandler::handleContent(AMQFrame& frame)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 26 12:48:31 2007
@@ -31,7 +31,6 @@
 #include "SessionHandler.h"
 #include "TxAck.h"
 #include "TxPublish.h"
-#include "qpid/QpidError.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
 
@@ -116,7 +115,8 @@
 
 void SemanticState::commit(MessageStore* const store)
 {
-    if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+    if (!txBuffer) throw
+        CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
 
     TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
     txBuffer->enlist(txAck);
@@ -127,7 +127,8 @@
 
 void SemanticState::rollback()
 {
-    if (!txBuffer) throw ConnectionException(503, "Session has not been selected for use with transactions");
+    if (!txBuffer)
+        throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
 
     txBuffer->rollback();
     accumulatedAck.clear();
@@ -141,7 +142,7 @@
 void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join)
 {
     if (!dtxSelected) {
-        throw ConnectionException(503, "Session has not been selected for use with dtx");
+        throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
     }
     dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
     txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
@@ -155,11 +156,12 @@
 void SemanticState::endDtx(const std::string& xid, bool fail)
 {
     if (!dtxBuffer) {
-        throw ConnectionException(503, boost::format("xid %1% not associated with this session") % xid);
+        throw CommandInvalidException(QPID_MSG("xid " << xid << " not associated with this session"));
     }
     if (dtxBuffer->getXid() != xid) {
-        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") 
-                                  % dtxBuffer->getXid() % xid);
+        throw CommandInvalidException(
+            QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end"));
+
     }
 
     txBuffer.reset();//ops on this session no longer transactional
@@ -176,8 +178,8 @@
 void SemanticState::suspendDtx(const std::string& xid)
 {
     if (dtxBuffer->getXid() != xid) {
-        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") 
-                                  % dtxBuffer->getXid() % xid);
+        throw CommandInvalidException(
+            QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend"));
     }
     txBuffer.reset();//ops on this session no longer transactional
 
@@ -188,11 +190,12 @@
 void SemanticState::resumeDtx(const std::string& xid)
 {
     if (dtxBuffer->getXid() != xid) {
-        throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") 
-                                  % dtxBuffer->getXid() % xid);
+        throw CommandInvalidException(
+            QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume"));
+
     }
     if (!dtxBuffer->isSuspended()) {
-        throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+        throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended"));
     }
 
     checkDtxTimeout();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Fri Oct 26 12:48:31 2007
@@ -26,6 +26,8 @@
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
 
+#include <boost/bind.hpp>
+
 namespace qpid {
 namespace broker {
 using namespace framing;
@@ -33,7 +35,9 @@
 
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
     : InOutHandler(0, &c.getOutput()),
-      connection(c), channel(ch), proxy(out),
+      connection(c), channel(ch, &c.getOutput()),
+      proxy(out),               // Via my own handleOut() for L2 data.
+      peerSession(channel),     // Direct to channel for L2 commands.
       ignoring(false) {}
 
 SessionHandler::~SessionHandler() {}
@@ -54,15 +58,19 @@
     try {
         if (m && invoke(*this, *m))
             return;
-        else if (session.get())
-            session->in(f);
+        else if (session.get()) {
+            boost::optional<SequenceNumber> ack=session->received(f);
+            session->in.handle(f);
+            if (ack)
+                peerSession.ack(*ack, SequenceNumberSet());
+        }
         else if (!ignoring)
             throw ChannelErrorException(
-                QPID_MSG("Channel " << channel << " is not open"));
+                QPID_MSG("Channel " << channel.get() << " is not open"));
     } catch(const ChannelException& e) {
         ignoring=true;          // Ignore trailing frames sent by client.
         session.reset();
-        getProxy().getSession().closed(e.code, e.toString());
+        peerSession.closed(e.code, e.what());
     }catch(const ConnectionException& e){
         connection.close(e.code, e.what(), classId(m), methodId(m));
     }catch(const std::exception& e){
@@ -72,21 +80,22 @@
 }
 
 void SessionHandler::handleOut(AMQFrame& f) {
-    f.setChannel(getChannel());
-    out.next->handle(f);
+    channel.handle(f);          // Send it.
+    if (session->sent(f))
+        peerSession.solicitAck();
 }
 
-void SessionHandler::assertOpen(const char* method) {
-     if (!session.get())
+void SessionHandler::assertAttached(const char* method) const {
+    if (!session.get())
         throw ChannelErrorException(
             QPID_MSG(method << " failed: No session for channel "
                      << getChannel()));
 }
 
-void SessionHandler::assertClosed(const char* method) {
+void SessionHandler::assertClosed(const char* method) const {
     if (session.get())
         throw ChannelBusyException(
-            QPID_MSG(method << " failed: channel " << channel
+            QPID_MSG(method << " failed: channel " << channel.get()
                      << " is already open."));
 }
 
@@ -95,32 +104,38 @@
     std::auto_ptr<SessionState> state(
         connection.broker.getSessionManager().open(*this, detachedLifetime));
     session.reset(state.release());
-    getProxy().getSession().attached(session->getId(), session->getTimeout());
+    peerSession.attached(session->getId(), session->getTimeout());
 }
 
 void  SessionHandler::resume(const Uuid& id) {
     assertClosed("resume");
-    session = connection.broker.getSessionManager().resume(*this, id);
-    getProxy().getSession().attached(session->getId(), session->getTimeout());
+    session = connection.broker.getSessionManager().resume(id);
+    session->attach(*this);
+    SequenceNumber seq = session->resuming();
+    peerSession.attached(session->getId(), session->getTimeout());
+    proxy.getSession().ack(seq, SequenceNumberSet());
 }
 
 void  SessionHandler::flow(bool /*active*/) {
+    assertAttached("flow");
     // FIXME aconway 2007-09-19: Removed in 0-10, remove 
-    assert(0); throw NotImplementedException();
+    assert(0); throw NotImplementedException("session.flow");
 }
 
 void  SessionHandler::flowOk(bool /*active*/) {
+    assertAttached("flowOk");
     // FIXME aconway 2007-09-19: Removed in 0-10, remove 
-    assert(0); throw NotImplementedException();
+    assert(0); throw NotImplementedException("session.flowOk");
 }
 
 void  SessionHandler::close() {
+    assertAttached("close");
     QPID_LOG(info, "Received session.close");
     ignoring=false;
     session.reset();
-    getProxy().getSession().closed(REPLY_SUCCESS, "ok");
-    assert(&connection.getChannel(channel) == this);
-    connection.closeChannel(channel); 
+    peerSession.closed(REPLY_SUCCESS, "ok");
+    assert(&connection.getChannel(channel.get()) == this);
+    connection.closeChannel(channel.get()); 
 }
 
 void  SessionHandler::closed(uint16_t replyCode, const string& replyText) {
@@ -129,26 +144,43 @@
     session.reset();
 }
 
-void  SessionHandler::suspend() {
-    assertOpen("suspend");
-    connection.broker.getSessionManager().suspend(session);
-    assert(!session.get());
-    getProxy().getSession().detached();
-    assert(&connection.getChannel(channel) == this);
-    connection.closeChannel(channel); 
+void SessionHandler::localSuspend() {
+    if (session.get() && session->getState() == SessionState::ATTACHED) {
+        session->detach();
+        connection.broker.getSessionManager().suspend(session);
+    }
 }
 
-void  SessionHandler::ack(uint32_t     /*cumulativeSeenMark*/,
-                          const SequenceNumberSet& /*seenFrameSet*/) {
-    assert(0); throw NotImplementedException();
+void  SessionHandler::suspend() {
+    assertAttached("suspend");
+    localSuspend();
+    peerSession.detached();
+    assert(&connection.getChannel(channel.get()) == this);
+    connection.closeChannel(channel.get()); 
+}
+
+void  SessionHandler::ack(uint32_t     cumulativeSeenMark,
+                          const SequenceNumberSet& /*seenFrameSet*/)
+{
+    assertAttached("ack");
+    if (session->getState() == SessionState::RESUMING) {
+        session->receivedAck(cumulativeSeenMark);
+        framing::SessionState::Replay replay=session->replay();
+        std::for_each(replay.begin(), replay.end(),
+                      boost::bind(&SessionHandler::handleOut, this, _1));
+    }
+    else
+        session->receivedAck(cumulativeSeenMark);
 }
 
 void  SessionHandler::highWaterMark(uint32_t /*lastSentMark*/) {
-    assert(0); throw NotImplementedException();
+    // FIXME aconway 2007-10-02: may be removed from spec.
+    assert(0); throw NotImplementedException("session.high-water-mark");
 }
 
 void  SessionHandler::solicitAck() {
-    assert(0); throw NotImplementedException();
+    assertAttached("solicit-ack");
+    peerSession.ack(session->sendingAck(), SequenceNumberSet());    
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Fri Oct 26 12:48:31 2007
@@ -26,6 +26,7 @@
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/framing/ChannelHandler.h"
 
 #include <boost/noncopyable.hpp>
 
@@ -52,7 +53,7 @@
     SessionState* getSession() { return session.get(); }
     const SessionState* getSession() const { return session.get(); }
 
-    framing::ChannelId getChannel() const { return channel; }
+    framing::ChannelId getChannel() const { return channel.get(); }
     
     Connection& getConnection() { return connection; }
     const Connection& getConnection() const { return connection; }
@@ -60,6 +61,9 @@
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
     const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
 
+    // Called by closing connection.
+    void localSuspend();
+    
   protected:
     void handleIn(framing::AMQFrame&);
     void handleOut(framing::AMQFrame&);
@@ -79,12 +83,14 @@
     void solicitAck();
 
 
-    void assertOpen(const char* method);
-    void assertClosed(const char* method);
+    void assertAttached(const char* method) const;
+    void assertActive(const char* method) const;
+    void assertClosed(const char* method) const;
 
     Connection& connection;
-    const framing::ChannelId channel;
+    framing::ChannelHandler channel;
     framing::AMQP_ClientProxy proxy;
+    framing::AMQP_ClientProxy::Session peerSession;
     bool ignoring;
     std::auto_ptr<SessionState> session;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Fri Oct 26 12:48:31 2007
@@ -39,7 +39,7 @@
 using namespace sys;
 using namespace framing;
 
-SessionManager::SessionManager() {}
+SessionManager::SessionManager(uint32_t a) : ack(a) {}
 
 SessionManager::~SessionManager() {}
 
@@ -47,7 +47,8 @@
     SessionHandler& h, uint32_t timeout_)
 {
     Mutex::ScopedLock l(lock);
-    std::auto_ptr<SessionState> session(new SessionState(*this, h, timeout_));
+    std::auto_ptr<SessionState> session(
+        new SessionState(*this, h, timeout_, ack));
     active.insert(session->getId());
     return session;
 }
@@ -55,14 +56,13 @@
 void  SessionManager::suspend(std::auto_ptr<SessionState> session) {
     Mutex::ScopedLock l(lock);
     active.erase(session->getId());
+    session->suspend();
     session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC);
-    session->handler = 0;
     suspended.push_back(session.release()); // In expiry order
     eraseExpired();
 }
 
-std::auto_ptr<SessionState>  SessionManager::resume(
-    SessionHandler& sh, const Uuid& id)
+std::auto_ptr<SessionState>  SessionManager::resume(const Uuid& id)
 {
     Mutex::ScopedLock l(lock);
     eraseExpired();
@@ -78,7 +78,6 @@
             QPID_MSG("No suspended session with id=" << id));
     active.insert(id);
     std::auto_ptr<SessionState> state(suspended.release(i).release());
-    state->handler = &sh;
     return state;
 }
 
@@ -94,8 +93,10 @@
         Suspended::iterator keep = std::lower_bound(
             suspended.begin(), suspended.end(), now(),
             boost::bind(std::less<AbsTime>(), boost::bind(&SessionState::expiry, _1), _2));
-        QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
-        suspended.erase(suspended.begin(), keep);
+        if (suspended.begin() != keep) {
+            QPID_LOG(debug, "Expiring sessions: " << log::formatList(suspended.begin(), keep));
+            suspended.erase(suspended.begin(), keep);
+        }
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Fri Oct 26 12:48:31 2007
@@ -44,7 +44,7 @@
  */
 class SessionManager : private boost::noncopyable {
   public:
-    SessionManager();
+    SessionManager(uint32_t ack);
     ~SessionManager();
     /** Open a new active session, caller takes ownership */
     std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
@@ -57,18 +57,20 @@
     /** Resume a suspended session.
      *@throw Exception if timed out or non-existant.
      */
-    std::auto_ptr<SessionState> resume(SessionHandler&, const framing::Uuid&);
+    std::auto_ptr<SessionState> resume(const framing::Uuid&);
 
   private:
     typedef boost::ptr_vector<SessionState> Suspended;
     typedef std::set<framing::Uuid> Active;
 
+    void erase(const framing::Uuid&);             
+    void eraseExpired();             
+
     sys::Mutex lock;
     Suspended suspended;
     Active active;
-
-    void erase(const framing::Uuid&);             
-    void eraseExpired();             
+    uint32_t ack;
+    
   friend class SessionState; // removes deleted sessions from active set.
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Oct 26 12:48:31 2007
@@ -31,21 +31,24 @@
 
 using namespace framing;
 
-SessionState::SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_) 
-    : factory(f), handler(&h), id(true), timeout(timeout_),
+void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
+
+void SessionState::handleOut(AMQFrame& f) {
+    assert(handler);
+    handler->out.handle(f);
+}
+
+SessionState::SessionState(
+    SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) 
+    : framing::SessionState(ack),
+      factory(f), handler(&h), id(true), timeout(timeout_),
       broker(h.getConnection().broker),
-      version(h.getConnection().getVersion())
+      version(h.getConnection().getVersion()),
+      semanticHandler(new SemanticHandler(*this))
 {
-    // FIXME aconway 2007-09-21: Break dependnecy - broker updates session.
-    chain.push_back(new SemanticHandler(*this));
-    in = &chain[0];             // Incoming frame to handler chain.
-    out = &handler->out;        // Outgoing frames to SessionHandler
-
-    // FIXME aconway 2007-09-20: use broker to add plugin
-    // handlers to the chain. 
-    // FIXME aconway 2007-08-31: Shouldn't be passing channel ID.
-    broker.update(handler->getChannel(), *this);       
-}
+    // FIXME aconway 2007-09-20: SessionManager may add plugin
+    // handlers to the chain.
+ }
 
 SessionState::~SessionState() {
     // Remove ID from active session list.
@@ -63,6 +66,14 @@
 
 Connection& SessionState::getConnection() {
     return getHandler().getConnection();
+}
+
+void SessionState::detach() {
+    handler = 0;
+}
+
+void SessionState::attach(SessionHandler& h) {
+    handler = &h;
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Fri Oct 26 12:48:31 2007
@@ -24,11 +24,12 @@
 
 #include "qpid/framing/Uuid.h"
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/SessionState.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/sys/Time.h"
 
-#include <boost/ptr_container/ptr_vector.hpp>
 #include <boost/noncopyable.hpp>
+#include <boost/scoped_ptr.hpp>
 
 #include <set>
 #include <vector>
@@ -42,31 +43,26 @@
 
 namespace broker {
 
+class SemanticHandler;
 class SessionHandler;
 class SessionManager;
 class Broker;
 class Connection;
 
 /**
- * State of a session.
- *
- * An attached session has a SessionHandler which is attached to a
- * connection. A suspended session has no handler.
- *
- * A SessionState is always associated with an open session (attached or
- * suspended) it is destroyed when the session is closed.
- *
- * The SessionState includes the sessions handler chains, which may
- * themselves have state. The handlers will be preserved as long as
- * the session is alive.
+ * Broker-side session state includes sessions handler chains, which may
+ * themselves have state. 
  */
-class SessionState : public framing::FrameHandler::Chains,
-                     private boost::noncopyable
+class SessionState : public framing::SessionState,
+                     public framing::FrameHandler::InOutHandler
 {
   public:
     ~SessionState();
     bool isAttached() { return handler; }
 
+    void detach();
+    void attach(SessionHandler& handler);
+    
     /** @pre isAttached() */
     SessionHandler& getHandler();
 
@@ -76,23 +72,30 @@
     /** @pre isAttached() */
     Connection& getConnection();
 
-    const framing::Uuid& getId() const { return id; }
     uint32_t getTimeout() const { return timeout; }
     Broker& getBroker() { return broker; }
     framing::ProtocolVersion getVersion() const { return version; }
+
+  protected:
+    void handleIn(framing::AMQFrame&);
+    void handleOut(framing::AMQFrame&);
     
   private:
-    /** Only SessionManager can open sessions */
-    SessionState(SessionManager& f, SessionHandler& h, uint32_t timeout_);
-
+    // SessionManager creates sessions.
+    SessionState(SessionManager&,
+                 SessionHandler& out,
+                 uint32_t timeout,
+                 uint32_t ackInterval);
+    
     SessionManager& factory;
     SessionHandler* handler;    
     framing::Uuid id;
     uint32_t timeout;
     sys::AbsTime expiry;        // Used by SessionManager.
     Broker& broker;
-    boost::ptr_vector<framing::FrameHandler> chain;
     framing::ProtocolVersion version;
+    
+    boost::scoped_ptr<SemanticHandler> semanticHandler;
 
   friend class SessionManager;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp Fri Oct 26 12:48:31 2007
@@ -73,17 +73,14 @@
     Monitor::ScopedLock l(monitor);
     if (!active) {
         active = true;
-        runner = std::auto_ptr<Thread>(new Thread(this));
+        runner = Thread(this);
     }
 }
 
 void Timer::stop()
 {
     signalStop();
-    if (runner.get()) {
-        runner->join();
-        runner.reset();
-    }
+    runner.join();
 }
 void Timer::signalStop()
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h Fri Oct 26 12:48:31 2007
@@ -53,7 +53,7 @@
 {
     qpid::sys::Monitor monitor;            
     std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>, Later> tasks;
-    std::auto_ptr<qpid::sys::Thread> runner;
+    qpid::sys::Thread runner;
     bool active;
 
     void run();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Channel.cpp Fri Oct 26 12:48:31 2007
@@ -24,7 +24,6 @@
 #include "Channel.h"
 #include "qpid/sys/Monitor.h"
 #include "Message.h"
-#include "qpid/QpidError.h"
 #include "Connection.h"
 #include "Demux.h"
 #include "FutureResponse.h"
@@ -71,7 +70,7 @@
 {
     Mutex::ScopedLock l(stopLock);
     if (isOpen())
-        THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
+        throw ChannelBusyException();
     active = true;
     session = s;
     if(isTransactional()) {
@@ -142,7 +141,7 @@
         Mutex::ScopedLock l(lock);
         ConsumerMap::iterator i = consumers.find(tag);
         if (i != consumers.end())
-            throw Exception(boost::format("Consumer already exists with tag: '%1%'") % tag);
+            throw NotAllowedException(QPID_MSG("Consumer already exists with tag " << tag ));
         Consumer& c = consumers[tag];
         c.listener = listener;
         c.ackMode = ackMode;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Oct 26 12:48:31 2007
@@ -29,7 +29,7 @@
 #include "qpid/log/Logger.h"
 #include "qpid/log/Options.h"
 #include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
+#include "qpid/shared_ptr.h"
 #include <iostream>
 #include <sstream>
 #include <functional>
@@ -44,23 +44,26 @@
 Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : 
     channelIdCounter(0), version(_version), 
     max_frame_size(_max_frame_size), 
-    impl(new ConnectionImpl(boost::shared_ptr<Connector>(new Connector(_version, _debug)))),
-    isOpen(false) {}
+    isOpen(false),
+    impl(new ConnectionImpl(
+             shared_ptr<Connector>(new Connector(_version, _debug))))
+{}
 
-Connection::Connection(boost::shared_ptr<Connector> c) : 
+Connection::Connection(shared_ptr<Connector> c) : 
     channelIdCounter(0), version(framing::highestProtocolVersion), 
     max_frame_size(65536), 
-    impl(new ConnectionImpl(c)),
-    isOpen(false) {}
+    isOpen(false),
+    impl(new ConnectionImpl(c))
+{}
 
-Connection::~Connection(){}
+Connection::~Connection(){ }
 
 void Connection::open(
     const std::string& host, int port,
     const std::string& uid, const std::string& pwd, const std::string& vhost)
 {
     if (isOpen)
-        THROW_QPID_ERROR(INTERNAL_ERROR, "Channel object is already open");
+        throw Exception(QPID_MSG("Channel object is already open"));
 
     impl->open(host, port, uid, pwd, vhost);
     isOpen = true;
@@ -79,10 +82,9 @@
 }
 
 void Connection::resume(Session& session) {
-    shared_ptr<SessionCore> core=session.impl;
-    core->setChannel(++channelIdCounter);
-    impl->addSession(core);
-    core->resume(impl);
+    session.impl->setChannel(++channelIdCounter);
+    impl->addSession(session.impl);
+    session.impl->resume(impl);
 }
 
 void Connection::close() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Fri Oct 26 12:48:31 2007
@@ -23,7 +23,6 @@
  */
 #include <map>
 #include <string>
-#include "qpid/QpidError.h"
 #include "Channel.h"
 #include "ConnectionImpl.h"
 #include "qpid/client/Session.h"
@@ -57,10 +56,12 @@
     framing::ChannelId channelIdCounter;
     framing::ProtocolVersion version;
     const uint32_t max_frame_size;
-    shared_ptr<ConnectionImpl> impl;
     bool isOpen;
     bool debug;
-    
+
+  protected:
+    boost::shared_ptr<ConnectionImpl> impl;
+
   public:
     /**
      * Creates a connection object, but does not open the

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Fri Oct 26 12:48:31 2007
@@ -68,7 +68,7 @@
             try {
                 in(frame);
             }catch(ConnectionException& e){
-                error(e.code, e.toString(), body);
+                error(e.code, e.what(), body);
             }catch(std::exception& e){
                 error(541/*internal error*/, e.what(), body);
             }
@@ -124,6 +124,8 @@
 
 void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
 {
+    if (onError)
+        onError(code, message);
     AMQMethodBody* method = body->getMethod();
     if (method)
         error(code, message, method->amqpClassId(), method->amqpMethodId());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Oct 26 12:48:31 2007
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "qpid/framing/constants.h"
 #include "qpid/framing/reply_exceptions.h"
 
 #include "ConnectionImpl.h"
@@ -35,8 +36,9 @@
 {
     handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
     handler.out = boost::bind(&Connector::send, connector, _1);
-    handler.onClose = boost::bind(&ConnectionImpl::closed, this);
-    handler.onError = boost::bind(&ConnectionImpl::closedByPeer, this, _1, _2);
+    handler.onClose = boost::bind(&ConnectionImpl::closed, this,
+                                  REPLY_SUCCESS, std::string());
+    handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
     connector->setInputHandler(&handler);
     connector->setTimeoutHandler(this);
     connector->setShutdownHandler(this);
@@ -64,7 +66,7 @@
         s = sessions[frame.getChannel()].lock();
     }
     if (!s)
-        throw ChannelErrorException();
+        throw ChannelErrorException(QPID_MSG("Invalid channel: " << frame.getChannel()));
     s->in(frame);
 }
 
@@ -84,19 +86,8 @@
 
 void ConnectionImpl::close()
 {
-    assertNotClosed();
-    handler.close();
-}
-
-void ConnectionImpl::closed()
-{
-    closedByPeer(200, "OK");
-}
-
-void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
-{
-    signalClose(code, text);
-    connector->close();
+    if (!isClosed)
+        handler.close();
 }
 
 void ConnectionImpl::idleIn()
@@ -110,26 +101,39 @@
     connector->send(frame);
 }
 
+template <class F>
+void ConnectionImpl::forChannels(F functor) {
+    for (SessionMap::iterator i = sessions.begin();
+         i != sessions.end(); ++i) {
+        try {
+            boost::shared_ptr<SessionCore> s = i->second.lock();
+            if (s) functor(*s);
+        } catch (...) { assert(0); }
+    }
+}
+
 void ConnectionImpl::shutdown() 
 {
-    //this indicates that the socket to the server has closed
-    signalClose(0, "Unexpected socket closure.");
+    Mutex::ScopedLock l(lock);
+    if (isClosed) return;
+    forChannels(boost::bind(&SessionCore::connectionBroke, _1,
+                            INTERNAL_ERROR, "Unexpected socket closure."));
+    sessions.clear();
+    isClosed = true;
 }
 
-void ConnectionImpl::signalClose(uint16_t code, const std::string& text) 
+void ConnectionImpl::closed(uint16_t code, const std::string& text) 
 {
     Mutex::ScopedLock l(lock);
-    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
-        boost::shared_ptr<SessionCore> s = i->second.lock();
-        if (s)
-            s->closed(code, text);
-    }
+    if (isClosed) return;
+    forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
     sessions.clear();
     isClosed = true;
+    connector->close();
 }
 
-void ConnectionImpl::assertNotClosed()
-{
+void ConnectionImpl::erase(uint16_t ch) {
     Mutex::ScopedLock l(lock);
-    if (isClosed) throw Exception("Connection has been closed");
+    sessions.erase(ch);
 }
+    

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Fri Oct 26 12:48:31 2007
@@ -51,14 +51,14 @@
     bool isClosed;
 
     void incoming(framing::AMQFrame& frame);    
-    void closed();
-    void closedByPeer(uint16_t, const std::string&);
+    void closed(uint16_t, const std::string&);
     void idleOut();
     void idleIn();
     void shutdown();
-    void signalClose(uint16_t, const std::string&);
-    void assertNotClosed();
-public:
+
+    template <class F> void forChannels(F functor);
+
+  public:
     typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
 
     ConnectionImpl(boost::shared_ptr<Connector> c);
@@ -69,7 +69,9 @@
               const std::string& pwd = "guest", 
               const std::string& virtualhost = "/");
     void close();
-    void handle(framing::AMQFrame& frame);    
+    void handle(framing::AMQFrame& frame);
+    void erase(uint16_t channel);
+    boost::shared_ptr<Connector> getConnector() { return connector; }
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Fri Oct 26 12:48:31 2007
@@ -20,7 +20,6 @@
  */
 #include <iostream>
 #include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
 #include "qpid/sys/Time.h"
 #include "qpid/framing/AMQFrame.h"
 #include "Connector.h"
@@ -36,7 +35,6 @@
 
 using namespace qpid::sys;
 using namespace qpid::framing;
-using qpid::QpidError;
 
 Connector::Connector(
     ProtocolVersion ver, bool _debug, uint32_t buffer_size

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Fri Oct 26 12:48:31 2007
@@ -98,6 +98,7 @@
     virtual void setInputHandler(framing::InputHandler* handler);
     virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
     virtual void setShutdownHandler(sys::ShutdownHandler* handler);
+    virtual sys::ShutdownHandler* getShutdownHandler() { return shutdownHandler; }
     virtual framing::OutputHandler* getOutputHandler();
     virtual void send(framing::AMQFrame& frame);
     virtual void setReadTimeout(uint16_t timeout);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Fri Oct 26 12:48:31 2007
@@ -73,7 +73,7 @@
 void ExecutionHandler::complete(uint32_t cumulative, const SequenceNumberSet& range)
 {
     if (range.size() % 2) { //must be even number        
-        throw ConnectionException(530, "Received odd number of elements in ranged mark");
+        throw NotAllowedException(QPID_MSG("Received odd number of elements in ranged mark"));
     } else {
         SequenceNumber mark(cumulative);        
         {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Future.h Fri Oct 26 12:48:31 2007
@@ -63,7 +63,7 @@
                 boost::bind(&FutureCompletion::completed, &callback)
             );
             callback.waitForCompletion();
-            session.checkClosed();
+            session.assertOpen();
             complete = true;
         }
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResponse.cpp Fri Oct 26 12:48:31 2007
@@ -31,7 +31,7 @@
 AMQMethodBody* FutureResponse::getResponse(SessionCore& session)
 {
     waitForCompletion();
-    session.checkClosed();            
+    session.assertOpen();            
     return response.get();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp?rev=588761&r1=588760&r2=588761&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FutureResult.cpp Fri Oct 26 12:48:31 2007
@@ -30,7 +30,7 @@
 const std::string& FutureResult::getResult(SessionCore& session) const
 {
     waitForCompletion();
-    session.checkClosed();            
+    session.assertOpen();            
     return result;
 }
 



Mime
View raw message