qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r650635 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/amqp_0_10/ qpid/broker/ tests/
Date Tue, 22 Apr 2008 20:14:19 GMT
Author: gsim
Date: Tue Apr 22 13:14:15 2008
New Revision: 650635

URL: http://svn.apache.org/viewvc?rev=650635&view=rev
Log:
Moved federation to final 0-10 codepath


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Apr 22 13:14:15 2008
@@ -27,12 +27,21 @@
 
 using sys::Mutex;
 
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string&
id)
-    : frameQueueClosed(false), output(o), connection(this, broker, id),
-      identifier(id), initialized(false) {}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string&
id, bool _isClient)
+        : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+          identifier(id), initialized(false), isClient(_isClient) {}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
     framing::Buffer in(const_cast<char*>(buffer), size);
+    if (isClient && !initialized) {
+        //read in protocol header
+        framing::ProtocolInitiation pi;
+        if (pi.decode(in)) {
+            //TODO: check the version is correct
+            QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi <<
")");
+        }
+        initialized = true;
+    }
     framing::AMQFrame frame;
     while(frame.decode(in)) {
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
@@ -44,7 +53,7 @@
 bool Connection::canEncode() {
     if (!frameQueueClosed) connection.doOutput();
     Mutex::ScopedLock l(frameQueueLock);
-    return !initialized || !frameQueue.empty();
+    return (!isClient && !initialized) || !frameQueue.empty();
 }
 
 bool Connection::isClosed() const {
@@ -55,10 +64,11 @@
 size_t  Connection::encode(const char* buffer, size_t size) {
     Mutex::ScopedLock l(frameQueueLock);
     framing::Buffer out(const_cast<char*>(buffer), size);
-    if (!initialized) {
+    if (!isClient && !initialized) {
         framing::ProtocolInitiation pi(getVersion());
         pi.encode(out);
         initialized = true;
+        QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi <<
")");
     }
     while (!frameQueue.empty() && (frameQueue.front().size() <= out.available()))
{
             frameQueue.front().encode(out);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Tue Apr 22 13:14:15 2008
@@ -43,9 +43,10 @@
     broker::Connection connection; // FIXME aconway 2008-03-18: 
     std::string identifier;
     bool initialized;
+    bool isClient;
     
   public:
-    Connection(sys::OutputControl&, broker::Broker&, const std::string& id);
+    Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool
isClient = false);
     size_t decode(const char* buffer, size_t size);
     size_t encode(const char* buffer, size_t size);
     bool isClosed() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Apr 22 13:14:15 2008
@@ -34,7 +34,7 @@
 Bridge::Bridge(framing::ChannelId id, ConnectionState& c, CancellationListener l, const
management::ArgsLinkBridge& _args) : 
     args(_args), channel(id, &(c.getOutput())), peer(channel), 
     mgmtObject(new management::Bridge(this, &c, id, args.i_src, args.i_dest, args.i_key,
args.i_src_is_queue, args.i_src_is_local)),
-    connection(c), listener(l)
+    connection(c), listener(l), name(Uuid(true).str())
 {
     management::ManagementAgent::getAgent()->addObject(mgmtObject);
 }
@@ -46,24 +46,24 @@
 
 void Bridge::create()
 {
-    framing::AMQP_ServerProxy::Session session(channel);
-    session.open(0);
+    framing::AMQP_ServerProxy::Session010 session(channel);
+    session.attach(name, false);
 
     if (args.i_src_is_local) {
         //TODO: handle 'push' here... simplest way is to create frames and pass them to Connection::received()
     } else {
         if (args.i_src_is_queue) {
-            peer.getMessage().subscribe(0, args.i_src, args.i_dest, false, 0, 0, false, FieldTable());
-            peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-            peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+            peer.getMessage010().subscribe(args.i_src, args.i_dest, 0, 0, false, "", 0, FieldTable());
+            peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
+            peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
         } else {
             string queue = "bridge_queue_";
             queue += Uuid(true).str();
-            peer.getQueue().declare(0, queue, "", false, false, true, true, FieldTable());
-            peer.getQueue().bind(0, queue, args.i_src, args.i_key, FieldTable());
-            peer.getMessage().subscribe(0, queue, args.i_dest, false, 0, 0, false, FieldTable());
-            peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-            peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+            peer.getQueue010().declare(queue, "", false, false, true, true, FieldTable());
+            peer.getExchange010().bind(queue, args.i_src, args.i_key, FieldTable());
+            peer.getMessage010().subscribe(queue, args.i_dest, 0, 0, false, "", 0, FieldTable());
+            peer.getMessage010().flow(args.i_dest, 0, 0xFFFFFFFF);
+            peer.getMessage010().flow(args.i_dest, 1, 0xFFFFFFFF);
         }
     }
 
@@ -71,8 +71,8 @@
 
 void Bridge::cancel()
 {
-    peer.getMessage().cancel(args.i_dest);    
-    peer.getSession().close();
+    peer.getMessage010().cancel(args.i_dest);    
+    peer.getSession010().detach(name);
 }
 
 management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Tue Apr 22 13:14:15 2008
@@ -56,6 +56,7 @@
     management::Bridge::shared_ptr mgmtObject;
     ConnectionState& connection;
     CancellationListener listener;
+    std::string name;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Apr 22 13:14:15 2008
@@ -85,9 +85,9 @@
 };
 
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string&
mgmtId_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string&
mgmtId_, bool isLink) :
     ConnectionState(out_, broker_),
-    adapter(*this),
+    adapter(*this, isLink),
     mgmtClosing(false),
     mgmtId(mgmtId_)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Apr 22 13:14:15 2008
@@ -54,7 +54,7 @@
                    public ConnectionState
 {
   public:
-    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string&
mgmtId);
+    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string&
mgmtId, bool isLink = false);
     ~Connection ();
 
     /** Get the SessionHandler for channel. Create if it does not already exist */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionFactory.cpp Tue Apr 22 13:14:15
2008
@@ -43,11 +43,8 @@
 
 sys::ConnectionCodec*
 ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
-    // FIXME aconway 2008-03-18: 
-
-    // gsim 2008-03-26 this seems only to be used when creating
-    // connections from one broker to another
-    return new PreviewConnectionCodec(out, broker, id, true);
+    // used to create connections from one broker to another
+    return new amqp_0_10::Connection(out, broker, id, true);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Tue Apr 22 13:14:15
2008
@@ -65,23 +65,24 @@
     }
 }
 
-ConnectionHandler::ConnectionHandler(Connection& connection)  : handler(new Handler(connection))
{}
+ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient)  : handler(new
Handler(connection, isClient)) {}
 
-ConnectionHandler::Handler::Handler(Connection& c) :
+ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
     client(c.getOutput()), server(c.getOutput()), 
-    connection(c), serverMode(false)
+    connection(c), serverMode(!isClient)
 {
-    FieldTable properties;
-    Array mechanisms(0x95);
-
-    authenticator = SaslAuthenticator::createAuthenticator(c);
-    authenticator->getMechanisms(mechanisms);
-
-    Array locales(0x95);
-    boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
-    locales.add(l);
-    serverMode = true;
-    client.start(properties, mechanisms, locales);
+    if (serverMode) {
+        FieldTable properties;
+        Array mechanisms(0x95);
+        
+        authenticator = SaslAuthenticator::createAuthenticator(c);
+        authenticator->getMechanisms(mechanisms);
+        
+        Array locales(0x95);
+        boost::shared_ptr<FieldValue> l(new Str16Value(en_US));
+        locales.add(l);
+        client.start(properties, mechanisms, locales);
+    }
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.h Tue Apr 22 13:14:15
2008
@@ -50,7 +50,7 @@
         bool serverMode;
         std::auto_ptr<SaslAuthenticator> authenticator;
     
-        Handler(Connection& connection);
+        Handler(Connection& connection, bool isClient);
         ~Handler();
         void startOk(const qpid::framing::FieldTable& clientProperties,
                      const std::string& mechanism, const std::string& response,
@@ -81,7 +81,7 @@
     };
     std::auto_ptr<Handler> handler;
   public:
-    ConnectionHandler(Connection& connection);
+    ConnectionHandler(Connection& connection, bool isClient);
     void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId,
framing::MethodId methodId);
     void handle(framing::AMQFrame& frame);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Apr 22 13:14:15 2008
@@ -83,7 +83,7 @@
 }
 
 bool SessionHandler::isValid(AMQMethodBody* m) {
-    return session.get() || m->isA<SessionAttachBody>();
+    return session.get() || m->isA<SessionAttachBody>() || m->isA<SessionAttachedBody>();
 }
 
 void SessionHandler::handleOut(AMQFrame& f) {
@@ -134,10 +134,13 @@
     peerSession.commandPoint(session->nextOut, 0);
 }
 
-void SessionHandler::attached(const std::string& /*name*/)
+void SessionHandler::attached(const std::string& _name)
 {
+    name = _name;//TODO: this should be used in conjunction with
+                 //userid for connection as sessions identity
     std::auto_ptr<SessionState> state(connection.broker.getSessionManager().open(*this,
0));
     session.reset(state.release());
+    peerSession.commandPoint(session->nextOut, 0);
 }
 
 void SessionHandler::detach(const std::string& name)

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=650635&r1=650634&r2=650635&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Tue Apr 22 13:14:15 2008
@@ -94,7 +94,7 @@
             mgmt.call_method(link, "close")
             self.assertEqual(len(mgmt.get_objects("link")), 0)
 
-    def DISABLED_test_pull_from_exchange(self):
+    def test_pull_from_exchange(self):
         session = self.session
         
         mgmt = Helper(self)
@@ -122,10 +122,10 @@
 
         for i in range(1, 11):
             msg = queue.get(timeout=5)
-            self.assertEqual("Message %d" % i, msg.content.body)
+            self.assertEqual("Message %d" % i, msg.body)
         try:
             extra = queue.get(timeout=1)
-            self.fail("Got unexpected message in queue: " + extra.content.body)
+            self.fail("Got unexpected message in queue: " + extra.body)
         except Empty: None
 
 
@@ -135,7 +135,7 @@
         mgmt.call_method(link, "close")
         self.assertEqual(len(mgmt.get_objects("link")), 0)
 
-    def DISABLED_test_pull_from_queue(self):
+    def test_pull_from_queue(self):
         session = self.session
 
         #setup queue on remote broker and add some messages
@@ -168,10 +168,10 @@
 
         for i in range(1, 11):
             msg = queue.get(timeout=5)
-            self.assertEqual("Message %d" % i, msg.content.body)
+            self.assertEqual("Message %d" % i, msg.body)
         try:
             extra = queue.get(timeout=1)
-            self.fail("Got unexpected message in queue: " + extra.content.body)
+            self.fail("Got unexpected message in queue: " + extra.body)
         except Empty: None
 
 



Mime
View raw message