qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r697951 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/amqp_0_10/ qpid/broker/ qpid/cluster/ qpid/sys/ tests/
Date Mon, 22 Sep 2008 19:08:48 GMT
Author: aconway
Date: Mon Sep 22 12:08:47 2008
New Revision: 697951

URL: http://svn.apache.org/viewvc?rev=697951&view=rev
Log:
Fixed error handling session-busy condition on broker.
Added accessors to iterate over broker::SemanticState consumers.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Mon Sep 22 12:08:47
2008
@@ -83,9 +83,7 @@
     }
     catch(const ChannelException& e){
         QPID_LOG(error, "Channel exception: " << e.what());
-        if (getState())
-            peer.detached(getState()->getId().getName(), e.code);
-        channelException(e.code, e.getMessage());
+        peer.detached(name, e.code);
     }
     catch(const ConnectionException& e) {
         QPID_LOG(error, "Connection exception: " << e.what());
@@ -126,11 +124,15 @@
                      << ", expecting: " << getState()->getId().getName()));
 }
 
-void SessionHandler::attach(const std::string& name, bool force) {
+void SessionHandler::attach(const std::string& name_, bool force) {
+    // Save the name for possible session-busy exception. Session-busy
+    // can be thrown before we have attached the handler to a valid
+    // SessionState, and in that case we need the name to send peer.detached
+    name = name_;               
     if (getState() && name == getState()->getId().getName())
         return;                 // Idempotent
     if (getState())
-        throw SessionBusyException(
+        throw TransportBusyException(
             QPID_MSG("Channel " << channel.get() << " already attached to " <<
getState()->getId()));
     setState(name, force);
     QPID_LOG(debug, "Attached channel " << channel.get() << " to " << getState()->getId());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Mon Sep 22 12:08:47
2008
@@ -106,6 +106,7 @@
     Peer peer;
     bool ignoring;
     bool sendReady, receiveReady;
+    std::string name;
 
   private:
     void sendCommandPoint(const SessionPoint&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Mon Sep 22 12:08:47 2008
@@ -45,6 +45,7 @@
 #include <vector>
 
 #include <boost/intrusive_ptr.hpp>
+#include <boost/cast.hpp>
 
 namespace qpid {
 namespace broker {
@@ -58,6 +59,7 @@
 class SemanticState : public sys::OutputTask,
                       private boost::noncopyable
 {
+  public:
     class ConsumerImpl : public Consumer, public sys::OutputTask,
                          public boost::enable_shared_from_this<ConsumerImpl>
     {
@@ -106,8 +108,11 @@
 
         bool hasOutput();
         bool doOutput();
+
+        std::string getName() const { return name; }
     };
 
+  private:
     typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
     typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
@@ -190,6 +195,11 @@
 
     void attached();
     void detached();
+
+    template <class F> void eachConsumer(const F& f) {
+        outputTasks.eachOutput(
+            boost::bind(f, boost::bind(&boost::polymorphic_downcast<ConsumerImpl*,
OutputTask>, _1)));
+    }
 };
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Sep 22 12:08:47 2008
@@ -100,6 +100,8 @@
 
     void readyToSend();
 
+    template <class F> void eachConsumer(const F& f) { semanticState.eachConsumer(f);
}
+    
   private:
 
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber&
id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Mon Sep 22 12:08:47 2008
@@ -186,8 +186,16 @@
     client::Session cs;
     client::SessionBase_0_10Access(cs).set(simpl);
     cs.sync();
+
+    broker::SessionState* ss = sh.getSession();
+    ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1));
+    
     // FIXME aconway 2008-09-19: remaining session state.
     QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
 }
 
+void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+    QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName());
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Mon Sep 22 12:08:47 2008
@@ -24,6 +24,7 @@
 
 #include "qpid/client/Connection.h"
 #include "qpid/client/AsyncSession.h"
+#include "qpid/broker/SemanticState.h"
 #include "qpid/sys/Runnable.h"
 #include <boost/shared_ptr.hpp>
 
@@ -69,7 +70,8 @@
     void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
     void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
     void dumpSession(broker::SessionHandler& s);
-
+    void dumpConsumer(broker::SemanticState::ConsumerImpl*);
+    
   private:
     Url receiver;
     Cluster& donor;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h Mon Sep 22 12:08:47 2008
@@ -21,11 +21,13 @@
 #ifndef _AggregateOutput_
 #define _AggregateOutput_
 
-#include <vector>
 #include "Mutex.h"
 #include "OutputControl.h"
 #include "OutputTask.h"
 
+#include <algorithm>
+#include <vector>
+
 namespace qpid {
 namespace sys {
 
@@ -46,6 +48,11 @@
         bool hasOutput();
         void addOutputTask(OutputTask* t);
         void removeOutputTask(OutputTask* t);
+
+        /** Apply f to each OutputTask* in the tasks list */
+        template <class F> void eachOutput(const F& f) {
+            std::for_each(tasks.begin(), tasks.end(), f);
+        }
     };
 
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Sep 22 12:08:47 2008
@@ -46,7 +46,7 @@
 }} // namespace qpid::cluster
 
 
-QPID_AUTO_TEST_SUITE(CpgTestSuite)
+QPID_AUTO_TEST_SUITE(cluster)
 
 using namespace std;
 using namespace qpid;
@@ -147,8 +147,6 @@
     if (size()) front() = broker0->getPort(); else push_back(broker0->getPort());
 }
 
-// For debugging: op << for CPG types.
-
 ostream& operator<<(ostream& o, const cpg_name* n) {
     return o << qpid::cluster::Cpg::str(*n);
 }
@@ -166,35 +164,35 @@
     return o;
 }
 
-#if 0                           // FIXME aconway 2008-09-10: finish & enable
-QPID_AUTO_TEST_CASE(testDumpConsumers) {
+#if 0                           // FIXME aconway 2008-09-22: enable.
+QPID_AUTO_TEST_CASE(DumpConsumers) {
     ClusterFixture cluster(1);
-    Client a(cluster[0]);
-    a.session.queueDeclare("q");
-    a.subs.subscribe(a.lq, "q");
+    Client c0(cluster[0]);
+    c0.session.queueDeclare("q");
+    c0.subs.subscribe(c0.lq, "q");
+    c0.session.messageTransfer(arg::content=Message("before", "q"));
+    Message m;
+    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "before");
 
+    // Start new member
     cluster.add();
-    Client b(cluster[1]);
-    try {
-        b.connection.newSession(a.session.getId().getName());
-        BOOST_FAIL("Expected SessionBusyException for " << a.session.getId().getName());
-    } catch (const SessionBusyException&) {}
+    Client c1(cluster[1]);
 
-    // Transfer some messages to the subscription by client a.
-    Message m;
-    a.session.messageTransfer(arg::content=Message("aaa", "q"));
-    BOOST_CHECK(a.lq.get(m, TIME_SEC));
+    // Transfer some messages to the subscription by client c0.
+    c0.session.messageTransfer(arg::content=Message("aaa", "q"));
+    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "aaa");
 
-    b.session.messageTransfer(arg::content=Message("bbb", "q"));
-    BOOST_CHECK(a.lq.get(m, TIME_SEC));
+    c1.session.messageTransfer(arg::content=Message("bbb", "q"));
+    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "bbb");
 
     // Verify that the queue has been drained on both brokers.
     // This proves that the consumer was replicated when the second broker joined.
-    BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), (unsigned)0);
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
 }
-
 #endif
 
 QPID_AUTO_TEST_CASE(testCatchupSharedState) {
@@ -218,8 +216,8 @@
     // Do some work post-join
     cluster.waitFor(2);
     c0.session.messageTransfer(arg::content=Message("pbar","p"));
-    
-    // Verify new broker has all state.
+
+    // Verify new brokers have all state.
     Message m;
 
     Client c1(cluster[1], "c1");

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=697951&r1=697950&r2=697951&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon Sep 22 12:08:47 2008
@@ -73,6 +73,14 @@
     }
 };
 
+QPID_AUTO_TEST_CASE(TestSessionBusy) {
+    SessionFixture f;
+    try {
+        f.connection.newSession(f.session.getId().getName());
+        BOOST_FAIL("Expected SessionBusyException for " << f.session.getId().getName());
+    } catch (const Exception&) {} // FIXME aconway 2008-09-22: client is not throwing
correct exception.
+}
+
 QPID_AUTO_TEST_CASE(DisconnectedPop) {
     ProxySessionFixture fix;
     ProxyConnection c(fix.broker->getPort());



Mime
View raw message