qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r447994 [2/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concurr...
Date Tue, 19 Sep 2006 22:07:25 GMT

Added: incubator/qpid/trunk/qpid/cpp/DESIGN
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/DESIGN?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/DESIGN (added)
+++ incubator/qpid/trunk/qpid/cpp/DESIGN Tue Sep 19 15:06:50 2006
@@ -0,0 +1,89 @@
+Qpid C++ AMQP implementation
+=============================
+
+The following is a brief description of the logical design of the
+Qpid C++ code. 
+
+Layout
+
+There are three top level modules. The first two, client and broker,
+containi the code required for an AMQP client and an AMQP broker
+respectively.  The third, common, contains code that is common to both
+client and broker implementations. [Note that at present only the
+client has been started].
+
+Within the common module there are currently four sub-modules.  The
+largest of these is framing, containing the definitions of classes
+corresponding to key AMQP concepts such as frames, content & header
+bodies, particular method bodies etc as well as some interfaces and
+utilities used in the encoding and decoding of the wire protocol.
+
+Two of the other sub-modules in common, io and concurrent, provide
+abstractions of core io and concurrency constructs used in the client
+and broker code.  The intention is to allow these to be implemented in
+different ways.interaction with the wire protocol. At present the
+implementation of the io and concurrency abstractions is based on APR
+(Apache Portable Runtime).  [Note: the io module currently only
+contains the abstractions as seen from the client - the Connector. It
+will in due time likely have the analogous broker-side abstraction -
+the Acceptor].
+
+The final common sub-module is error, containing a simple exception
+definition used in all the error handling.
+
+Client Design
+
+The client module is primarily concerned with presenting the
+functionality offered by AMQP to users through a simple API that
+nevertheless allows all the protocol functionality to be exploited.
+[Note: it is currently nothing like complete in this regard!]
+
+The code in the client module is concerned with the logic of the AMQP
+protocol and interacts with the lower level transport issues through
+the InputHandler and OutputHandler abstractions defined in
+common/framing.  It uses these in conjunction with the Connector
+interface, defined in common/io, for establishing a connection to the
+broker and interacting with it through the sending and receiving of
+messages represented by AMQFrame (defined in common/framing).
+
+The Connector implementation is responsible for connection set up,
+threading strategy and getting data on and off the wire.  It delegates
+to the framing module for encode/decode operations.  The interface
+between the io and the framing modules is primarily through the Buffer
+and AMQFrame classes. 
+
+A Buffer allows 'raw' data to be read or written in terms of the AMQP
+defined 'types' (octet, short, long, long long, short string, long
+string, field table etc.).  AMQP is defined in terms frames with
+specific bodies and the frame (as well as these different bodies) are
+defined in terms of these 'types'.  The AMQFrame class allows a frame
+to be decoded by reading from the supplied buffer, or it allows a
+particular frame to be constructed and then encoded by writing to the
+supplied buffer.  The io layer can then access the raw data that
+'backs' the buffer to either out it on the wire or to populate it from
+the wire.
+
+One minor exception to this is the protocol initiation.  AMQP defines
+a protocol 'header', that is not a frame, and is sent by a client to
+intiate a connection.  The Connector allows (indeed requires) such a
+frame to be passed in to initialise the connection (the Acceptor, when
+defined, will allow an InitiationHandler to be set allowing the broker
+to hook into the connection initiation).  In order to remove
+duplication, the ProtocolInitiation class and the AMQFrame class both
+implement a AMQDataBlock class that defines the encode and decode
+methods.  This allows both types to be treated generically for the
+purposes of encoding.  In decoding, the context determines which type
+is expected and should be used for decoding (this is only relevant to
+the broker).
+
+
+
+              
+                  --------api--------                                     
+                     Client Impl     ...............uses.....
+input handler --> --------- --------- <-- output handler    .
+                     A         |                            .
+                     |         |                      framing utils   
+                     |         V                            .
+                  ------------------- <-- connector         .
+                       IO Layer      ................uses....

Added: incubator/qpid/trunk/qpid/cpp/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/Makefile?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/Makefile Tue Sep 19 15:06:50 2006
@@ -0,0 +1,53 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#
+# Master make file for c++ Qpid project (AMQP)
+#
+# Calls the makefiles in the various subdirectories in order to
+# build them in the correct sequence.
+#
+
+include options.mk
+
+UNITTESTS=$(wildcard common/*/test/*.so broker/test/*.so)
+
+.PHONY: all clean doxygen
+
+test:   all
+	@$(MAKE) -C common test
+	@$(MAKE) -C broker test
+	@$(MAKE) -C client test
+	@$(MAKE) runtests
+
+runtests: 
+	$(CPPUNIT_HOME)/bin/DllPlugInTester -t -b $(UNITTESTS)
+	bin/qpidd >> qpidd.log &
+	cd ../python ; ./run-tests -v -I cpp_failing.txt	
+
+all:
+	@$(MAKE) -C common all
+	@$(MAKE) -C broker all
+	@$(MAKE) -C client all
+
+clean:
+	@$(MAKE) -C common clean
+	@$(MAKE) -C broker clean
+	@$(MAKE) -C client clean
+	@$(MAKE) -C doxygen clean
+	-@rm qpidd.log 
+
+doxygen:
+	@$(MAKE) -C doxygen all

Propchange: incubator/qpid/trunk/qpid/cpp/Makefile
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/README
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/README?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/README (added)
+++ incubator/qpid/trunk/qpid/cpp/README Tue Sep 19 15:06:50 2006
@@ -0,0 +1,48 @@
+= Developer guide to C++ codebase =
+
+== Prerequisites ==
+
+Apache Portable Runtime 1.2.7:  http://apr.apache.org/
+Install in /usr/local/apr or update options.mk if installed elsewhere.
+
+CppUnit: http://cppunit.sourceforge.net
+
+Optional: to generate source code documentation you need:
+ * doxygen: http://sourceforge.net/projects/doxygen/ 
+ * graphviz - http://www.graphviz.org/
+
+== Build and test ==
+
+make
+
+Default target builds and tests everything, see Makefile for other
+targets.
+
+=== Unit tests ===
+Unit tests are built as .so files containing CppUnit plugins. 
+
+DllPlugInTester is provided as part of cppunit. You can use it to run
+any subset of the unit tests. See Makefile for examples.
+
+=== System tests ===
+
+The Python test suite ../python/run_tests is the main set of broker
+system tests.
+
+There are some C++ client test executables built under client/test.
+
+== Doxygen ==
+
+Doxygen generates documentation in several formats from source code
+using special comments. You can use javadoc style comments if you know
+javadoc, if you don't or want to know the fully story on doxygen
+markup see http://www.stack.nl/~dimitri/doxygen/
+
+Even even if the code is completely uncommented, doxygen generates
+UML-esque dependency diagrams that are ''extremely'' useful in navigating
+around the code, especially for newcomers.
+
+To try it out "make doxygen" then open doxygen/html/index.html
+
+
+

Added: incubator/qpid/trunk/qpid/cpp/broker/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/Makefile?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/Makefile Tue Sep 19 15:06:50 2006
@@ -0,0 +1,47 @@
+#
+# Copyright (c) 2006 The Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Build broker library and executable.
+#
+
+QPID_HOME = ../..
+include ${QPID_HOME}/cpp/options.mk
+
+SOURCES= $(wildcard src/*.cpp)
+OBJECTS= $(subst .cpp,.o,$(SOURCES))
+LIB_OBJECTS= $(subst src/Broker.o,,$(OBJECTS))
+EXE_OBJECTS= src/Broker.o
+
+
+.PHONY: all clean test
+
+all: $(BROKER)
+
+test:
+	@$(MAKE) -C test all
+
+clean:
+	-@rm -f ${OBJECTS} src/*.d ${BROKER} $(BROKER_LIB)
+	@$(MAKE) -C test clean
+
+$(BROKER): $(BROKER_LIB) $(EXE_OBJECTS)
+	${CXX} -o $@ $(EXE_OBJECTS) $(LDFLAGS) -lapr-1 $(COMMON_LIB) $(BROKER_LIB)
+
+$(BROKER_LIB): $(LIB_OBJECTS)
+	$(CXX) -shared -o $@ $(LDFLAGS) $(LIB_OBJECTS)  -lapr-1 $(COMMON_LIB) $(LIBDIR)
+
+-include $(SOURCES:.cpp=.d)

Propchange: incubator/qpid/trunk/qpid/cpp/broker/Makefile
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/AutoDelete.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/AutoDelete.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/AutoDelete.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/AutoDelete.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _AutoDelete_
+#define _AutoDelete_
+
+#include <iostream>
+#include <queue>
+#include "MonitorImpl.h"
+#include "Queue.h"
+#include "QueueRegistry.h"
+#include "ThreadFactoryImpl.h"
+
+namespace qpid {
+    namespace broker{
+        class AutoDelete : private virtual qpid::concurrent::Runnable{
+            qpid::concurrent::ThreadFactoryImpl factory;
+            qpid::concurrent::MonitorImpl lock;            
+            qpid::concurrent::MonitorImpl monitor;            
+            std::queue<Queue::shared_ptr> queues;
+            QueueRegistry* const registry;
+            const u_int32_t period;
+            volatile bool stopped;
+            qpid::concurrent::Thread* runner;
+            
+            Queue::shared_ptr const pop();
+            void process();
+            virtual void run();
+
+        public:
+            AutoDelete(QueueRegistry* const registry, u_int32_t period);
+            void add(Queue::shared_ptr const);
+            void start();
+            void stop();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/AutoDelete.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/Binding.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Binding.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Binding.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Binding.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Binding_
+#define _Binding_
+
+#include "FieldTable.h"
+
+namespace qpid {
+    namespace broker {
+        class Binding{
+        public:
+            virtual void cancel() = 0;
+            virtual ~Binding(){}
+        };
+    }
+}
+
+
+#endif
+

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/Binding.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Channel_
+#define _Channel_
+
+#include <map>
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "BasicPublishBody.h"
+#include "Binding.h"
+#include "Consumer.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "NameGenerator.h"
+#include "OutputHandler.h"
+#include "Queue.h"
+
+namespace qpid {
+    namespace broker {
+        class Channel{
+        private:
+            class ConsumerImpl : public virtual Consumer{
+                ConnectionToken* const connection;
+                Channel* parent;
+                string tag;
+                Queue::shared_ptr queue;
+            public:
+                ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection);
+                virtual bool deliver(Message::shared_ptr& msg);            
+                void cancel();
+            };
+
+            typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; 
+
+            const int id;
+            qpid::framing::OutputHandler* out;
+            u_int64_t deliveryTag;
+            Queue::shared_ptr defaultQueue;
+            bool transactional;
+            std::map<string, ConsumerImpl*> consumers;
+            u_int32_t prefetchSize;    
+            u_int16_t prefetchCount;    
+            u_int32_t framesize;
+            Message::shared_ptr message;
+            NameGenerator tagGenerator;
+
+            void deliver(Message::shared_ptr& msg, string& tag);            
+            void publish(ExchangeRegistry* exchanges);
+        
+        public:
+            Channel(qpid::framing::OutputHandler* out, int id, u_int32_t framesize);
+            ~Channel();
+            inline void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
+            inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
+            inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; }
+            inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; }
+            void handlePublish(Message* msg);
+            void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges);
+            void handleContent(qpid::framing::AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges);
+            bool exists(string& consumerTag);
+            void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
+            void cancel(string& tag);
+            void begin();
+            void close();
+            void commit();
+            void rollback();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/Channel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/Configuration.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Configuration.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Configuration.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Configuration.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,125 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Configuration_
+#define _Configuration_
+
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+
+namespace qpid {
+    namespace broker {
+        class Configuration{
+            class Option{
+                const std::string flag;
+                const std::string name;
+                const std::string desc;
+
+                bool match(const std::string& arg);
+
+            protected:
+                virtual bool needsValue() const = 0;
+                virtual void setValue(const std::string& value) = 0;
+
+            public:
+                Option(const char flag, const std::string& name, const std::string& desc);
+                Option(const std::string& name, const std::string& desc);
+                virtual ~Option();
+
+                bool parse(int& i, char** argv, int argc);
+                void print(std::ostream& out) const;
+            };
+
+            class IntOption : public Option{
+                const int defaultValue;
+                int value;
+            public:
+                IntOption(char flag, const std::string& name, const std::string& desc, const int value = 0);
+                IntOption(const std::string& name, const std::string& desc, const int value = 0);
+                virtual ~IntOption();
+
+                int getValue() const;
+                virtual bool needsValue() const;
+                virtual void setValue(const std::string& value);
+            };
+
+            class StringOption : public Option{
+                const std::string defaultValue;
+                std::string value;
+            public:
+                StringOption(char flag, const std::string& name, const std::string& desc, const std::string value = "");
+                StringOption(const std::string& name, const std::string& desc, const std::string value = "");
+                virtual ~StringOption();
+
+                const std::string& getValue() const;
+                virtual bool needsValue() const;
+                virtual void setValue(const std::string& value);
+            };
+
+            class BoolOption : public Option{
+                const bool defaultValue;
+                bool value;
+            public:
+                BoolOption(char flag, const std::string& name, const std::string& desc, const bool value = 0);
+                BoolOption(const std::string& name, const std::string& desc, const bool value = 0);
+                virtual ~BoolOption();
+
+                bool getValue() const;
+                virtual bool needsValue() const;
+                virtual void setValue(const std::string& value);
+            };
+
+            BoolOption trace;
+            IntOption port;
+            IntOption workerThreads;
+            IntOption maxConnections;
+            IntOption connectionBacklog;
+            StringOption acceptor;
+            BoolOption help;
+
+            typedef std::vector<Option*>::iterator op_iterator;
+            std::vector<Option*> options;
+
+        public:
+            class ParseException{
+            public:
+                const std::string& error;
+                ParseException(const std::string& _error) : error(_error) {}
+            };
+
+
+            Configuration();
+            ~Configuration();
+
+            void parse(int argc, char** argv);
+
+            bool isHelp();
+            bool isTrace();
+            int getPort();
+            int getWorkerThreads();
+            int getMaxConnections();
+            int getConnectionBacklog();
+            const std::string& getAcceptor();
+
+            void usage();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/Configuration.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/ConnectionToken.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/ConnectionToken.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/ConnectionToken.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/ConnectionToken.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ConnectionToken_
+#define _ConnectionToken_
+
+namespace qpid {
+    namespace broker {
+        /**
+         * An empty interface allowing opaque implementations of some
+         * form of token to identify a connection.
+         */
+        class ConnectionToken{
+        public:
+            virtual ~ConnectionToken(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/ConnectionToken.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/Consumer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Consumer.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Consumer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Consumer.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Consumer_
+#define _Consumer_
+
+#include "Message.h"
+
+namespace qpid {
+    namespace broker {
+        class Consumer{
+        public:
+            virtual bool deliver(Message::shared_ptr& msg) = 0;
+            virtual ~Consumer(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/Consumer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _DirectExchange_
+#define _DirectExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+    class DirectExchange : public virtual Exchange{
+        const string name;
+        std::map<string, std::vector<Queue::shared_ptr> > bindings;
+        qpid::concurrent::MonitorImpl lock;
+
+    public:
+        static const std::string typeName;
+        
+        DirectExchange(const string& name);
+        
+        inline virtual const string& getName(){ return name; }
+        
+        virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+        virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+        virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+        virtual ~DirectExchange();
+    };
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/DirectExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Exchange_
+#define _Exchange_
+
+#include "FieldTable.h"
+#include "Message.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+    class Exchange{
+    public:
+        virtual const string& getName() = 0;
+        virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+        virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+        virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args) = 0;
+        virtual ~Exchange(){}
+    };
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/Exchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeBinding.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeBinding.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeBinding.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeBinding.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeBinding_
+#define _ExchangeBinding_
+
+#include "Binding.h"
+#include "FieldTable.h"
+#include "Queue.h"
+
+namespace qpid {
+    namespace broker {
+        class Exchange;
+        class Queue;
+
+        class ExchangeBinding : public virtual Binding{
+            Exchange* e;
+            Queue::shared_ptr q;
+            const string key;
+            qpid::framing::FieldTable* args;
+        public:
+            ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, qpid::framing::FieldTable* _args);
+            virtual void cancel();
+            virtual ~ExchangeBinding();
+        };
+    }
+}
+
+
+#endif
+

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeBinding.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ExchangeRegistry_
+#define _ExchangeRegistry_
+
+#include <map>
+#include "Exchange.h"
+#include "Monitor.h"
+
+namespace qpid {
+namespace broker {
+    class ExchangeRegistry{
+        std::map<string, Exchange*> exchanges;
+        qpid::concurrent::Monitor* lock;
+    public:
+        ExchangeRegistry();
+        void declare(Exchange* exchange);
+        void destroy(const string& name);
+        Exchange* get(const string& name);
+        inline qpid::concurrent::Monitor* getLock(){ return lock; }
+        ~ExchangeRegistry();
+    };
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/ExchangeRegistry.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _FanOutExchange_
+#define _FanOutExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class FanOutExchange : public virtual Exchange {
+    const string name;
+    std::vector<Queue::shared_ptr> bindings;
+    qpid::concurrent::MonitorImpl lock;
+
+  public:
+    static const std::string typeName;
+        
+    FanOutExchange(const string& name);
+        
+    inline virtual const string& getName(){ return name; }
+
+    virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+    virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+    virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+    virtual ~FanOutExchange();
+};
+
+}
+}
+
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/FanOutExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,73 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Message_
+#define _Message_
+
+#include "memory.h"
+#include "AMQContentBody.h"
+#include "AMQHeaderBody.h"
+#include "BasicHeaderProperties.h"
+#include "BasicPublishBody.h"
+#include "ConnectionToken.h"
+#include "OutputHandler.h"
+
+namespace qpid {
+    namespace broker {
+        class ExchangeRegistry;
+
+        class Message{
+            typedef std::vector<qpid::framing::AMQContentBody::shared_ptr> content_list;
+            typedef content_list::iterator content_iterator;
+
+            const ConnectionToken* const publisher;
+            string exchange;
+            string routingKey;
+            const bool mandatory;
+            const bool immediate;
+            qpid::framing::AMQHeaderBody::shared_ptr header;
+            content_list content;
+
+            u_int64_t contentSize();
+            qpid::framing::BasicHeaderProperties* getHeaderProperties();
+    
+
+        public:
+            typedef std::tr1::shared_ptr<Message> shared_ptr;
+
+            Message(const ConnectionToken* const publisher, 
+                    const string& exchange, const string& routingKey, 
+                    bool mandatory, bool immediate);
+            ~Message();
+            void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
+            void addContent(qpid::framing::AMQContentBody::shared_ptr data);
+            bool isComplete();
+            const ConnectionToken* const getPublisher();
+
+            void deliver(qpid::framing::OutputHandler* out, int channel, 
+                         string& consumerTag, u_int64_t deliveryTag, 
+                         u_int32_t framesize);
+
+            friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
+
+        };
+        bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/Message.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/NameGenerator.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/NameGenerator.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/NameGenerator.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/NameGenerator.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _NameGenerator_
+#define _NameGenerator_
+
+#include "Message.h"
+
+namespace qpid {
+    namespace broker {
+        class NameGenerator{
+            const std::string base;
+            unsigned int counter;
+        public:
+            NameGenerator(const std::string& base);
+            std::string generate();
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/NameGenerator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/Queue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/Queue.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/Queue.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/Queue.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,106 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Queue_
+#define _Queue_
+
+#include <vector>
+#include <queue>
+#include "memory.h"
+#include "apr_time.h"
+#include "amqp_types.h"
+#include "Binding.h"
+#include "ConnectionToken.h"
+#include "Consumer.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+
+namespace qpid {
+    namespace broker {
+
+        /**
+         * Thrown when exclusive access would be violated.
+         */
+        struct ExclusiveAccessException{};
+
+        /**
+         * The brokers representation of an amqp queue. Messages are
+         * delivered to a queue from where they can be dispatched to
+         * registered consumers or be stored until dequeued or until one
+         * or more consumers registers.
+         */
+        class Queue{
+            const string name;
+            const u_int32_t autodelete;
+            const bool durable;
+            const ConnectionToken* const owner;
+            std::vector<Consumer*> consumers;
+            std::queue<Binding*> bindings;
+            std::queue<Message::shared_ptr> messages;
+            bool queueing;
+            bool dispatching;
+            int next;
+            mutable qpid::concurrent::MonitorImpl lock;
+            apr_time_t lastUsed;
+            Consumer* exclusive;
+
+            bool startDispatching();
+            bool dispatch(Message::shared_ptr& msg);
+
+        public:
+            
+            typedef std::tr1::shared_ptr<Queue> shared_ptr;
+
+            typedef std::vector<shared_ptr> vector;
+	    
+            Queue(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+            ~Queue();
+            /**
+             * Informs the queue of a binding that should be cancelled on
+             * destruction of the queue.
+             */
+            void bound(Binding* b);
+            /**
+             * Delivers a message to the queue from where it will be
+             * dispatched to immediately to a consumer if one is
+             * available or stored for dequeue or later dispatch if
+             * not.
+             */
+            void deliver(Message::shared_ptr& msg);
+            /**
+             * Dispatch any queued messages providing there are
+             * consumers for them. Only one thread can be dispatching
+             * at any time, but this method (rather than the caller)
+             * is responsible for ensuring that.
+             */
+            void dispatch();
+            void consume(Consumer* c, bool exclusive = false);
+            void cancel(Consumer* c);
+            Message::shared_ptr dequeue();
+            u_int32_t purge();
+            u_int32_t getMessageCount() const;
+            u_int32_t getConsumerCount() const;
+            inline const string& getName() const { return name; }
+            inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; }
+            inline bool hasExclusiveConsumer() const { return exclusive; }
+            bool canAutoDelete() const;
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/Queue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/QueueRegistry.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/QueueRegistry.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/QueueRegistry.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/QueueRegistry.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _QueueRegistry_
+#define _QueueRegistry_
+
+#include <map>
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+
+class SessionHandlerImpl;
+
+/**
+ * A registry of queues indexed by queue name.
+ *
+ * Queues are reference counted using shared_ptr to ensure that they
+ * are deleted when and only when they are no longer in use.
+ *
+ */
+class QueueRegistry{
+ 
+  public:
+    QueueRegistry();
+    ~QueueRegistry();
+
+    /**
+     * Declare a queue.
+     *
+     * @return The queue and a boolean flag which is true if the queue
+     * was created by this declare call false if it already existed.
+     */
+    std::pair<Queue::shared_ptr, bool> declare(const string& name, bool durable = false, u_int32_t autodelete = 0, const ConnectionToken* const owner = 0);
+
+    /**
+     * Destroy the named queue.
+     *
+     * Note: if the queue is in use it is not actually destroyed until
+     * all shared_ptrs to it are destroyed. During that time it is
+     * possible that a new queue with the same name may be
+     * created. This should not create any problems as the new and
+     * old queues exist independently. The registry has
+     * forgotten the old queue so there can be no confusion for
+     * subsequent calls to find or declare with the same name.
+     *
+     */
+    void destroy(const string& name);
+
+    /**
+     * Find the named queue. Return 0 if not found.
+     */
+    Queue::shared_ptr find(const string& name);
+
+    /**
+     * Generate unique queue name.
+     */
+    string generateName();
+
+  private:
+    typedef std::map<string, Queue::shared_ptr> QueueMap;
+    QueueMap queues;
+    qpid::concurrent::MonitorImpl lock;
+    int counter;
+
+};
+
+    
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/QueueRegistry.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerFactoryImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerFactoryImpl.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerFactoryImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerFactoryImpl.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,49 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerFactoryImpl_
+#define _SessionHandlerFactoryImpl_
+
+#include "AMQFrame.h"
+#include "AutoDelete.h"
+#include "DirectExchange.h"
+#include "ExchangeRegistry.h"
+#include "ProtocolInitiation.h"
+#include "QueueRegistry.h"
+#include "SessionHandlerFactory.h"
+#include "TimeoutHandler.h"
+
+namespace qpid {
+    namespace broker {
+
+        class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
+        {
+            QueueRegistry queues;
+            ExchangeRegistry exchanges;
+            const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+            AutoDelete cleaner;
+        public:
+            SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
+            virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
+            virtual ~SessionHandlerFactoryImpl();
+        };
+
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerFactoryImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,230 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerImpl_
+#define _SessionHandlerImpl_
+
+#include <map>
+#include <sstream>
+#include <vector>
+#include <exception>
+#include "AMQFrame.h"
+#include "AMQP_ServerOperations.h"
+#include "AutoDelete.h"
+#include "ExchangeRegistry.h"
+#include "Channel.h"
+#include "ConnectionToken.h"
+#include "DirectExchange.h"
+#include "OutputHandler.h"
+#include "ProtocolInitiation.h"
+#include "QueueRegistry.h"
+#include "SessionContext.h"
+#include "SessionHandler.h"
+#include "TimeoutHandler.h"
+#include "TopicExchange.h"
+
+namespace qpid {
+namespace broker {
+
+struct ChannelException : public std::exception {
+    u_int16_t code;
+    string text;
+    ChannelException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+    ~ChannelException() throw() {}
+    const char* what() const throw() { return text.c_str(); }
+};
+
+struct ConnectionException : public std::exception {
+    u_int16_t code;
+    string text;
+    ConnectionException(u_int16_t _code, string _text) : code(_code), text(_text) {}
+    ~ConnectionException() throw() {}
+    const char* what() const throw() { return text.c_str(); }
+};
+
+class SessionHandlerImpl : public virtual qpid::io::SessionHandler, 
+                           public virtual qpid::framing::AMQP_ServerOperations, 
+                           public virtual ConnectionToken
+{
+    typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
+    typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+
+    qpid::io::SessionContext* context;
+    QueueRegistry* queues;
+    ExchangeRegistry* const exchanges;
+    AutoDelete* const cleaner;
+    const u_int32_t timeout;//timeout for auto-deleted queues (in ms)
+
+    ConnectionHandler* connectionHandler;
+    ChannelHandler* channelHandler;
+    BasicHandler* basicHandler;
+    ExchangeHandler* exchangeHandler;
+    QueueHandler* queueHandler;
+
+    std::map<u_int16_t, Channel*> channels;
+    std::vector<Queue::shared_ptr> exclusiveQueues;
+
+    u_int32_t framemax;
+    u_int16_t heartbeat;
+
+    void handleHeader(u_int16_t channel, qpid::framing::AMQHeaderBody::shared_ptr body);
+    void handleContent(u_int16_t channel, qpid::framing::AMQContentBody::shared_ptr body);
+    void handleHeartbeat(qpid::framing::AMQHeartbeatBody::shared_ptr body);
+
+    /**
+     * Get named queue, never returns 0.
+     * @return: named queue or default queue for channel if name=""
+     * @exception: ChannelException if no queue of that name is found.
+     * @exception: ConnectionException if no queue specified and channel has not declared one.
+     */
+    Queue::shared_ptr getQueue(const string& name, u_int16_t channel);
+
+    Exchange* findExchange(const string& name);
+
+  public:
+    SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues, 
+                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
+    virtual void received(qpid::framing::AMQFrame* frame);
+    virtual void initiated(qpid::framing::ProtocolInitiation* header);
+    virtual void idleOut();
+    virtual void idleIn();
+    virtual void closed();
+    virtual ~SessionHandlerImpl();
+
+    class ConnectionHandlerImpl : public virtual ConnectionHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline ConnectionHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+
+        virtual void startOk(u_int16_t channel, qpid::framing::FieldTable& clientProperties, string& mechanism, 
+                             string& response, string& locale); 
+                
+        virtual void secureOk(u_int16_t channel, string& response); 
+                
+        virtual void tuneOk(u_int16_t channel, u_int16_t channelMax, u_int32_t frameMax, u_int16_t heartbeat); 
+                
+        virtual void open(u_int16_t channel, string& virtualHost, string& capabilities, bool insist); 
+                
+        virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, u_int16_t classId, 
+                           u_int16_t methodId); 
+                
+        virtual void closeOk(u_int16_t channel); 
+                
+        virtual ~ConnectionHandlerImpl(){}
+    };
+    
+    class ChannelHandlerImpl : public virtual ChannelHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline ChannelHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        virtual void open(u_int16_t channel, string& outOfBand); 
+        
+        virtual void flow(u_int16_t channel, bool active); 
+                
+        virtual void flowOk(u_int16_t channel, bool active); 
+                
+        virtual void close(u_int16_t channel, u_int16_t replyCode, string& replyText, 
+                           u_int16_t classId, u_int16_t methodId); 
+                
+        virtual void closeOk(u_int16_t channel); 
+                
+        virtual ~ChannelHandlerImpl(){}
+    };
+    
+    class ExchangeHandlerImpl : public virtual ExchangeHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline ExchangeHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        virtual void declare(u_int16_t channel, u_int16_t ticket, string& exchange, string& type, 
+                             bool passive, bool durable, bool autoDelete, bool internal, bool nowait, 
+                             qpid::framing::FieldTable& arguments); 
+                
+        virtual void delete_(u_int16_t channel, u_int16_t ticket, string& exchange, bool ifUnused, bool nowait); 
+                
+        virtual ~ExchangeHandlerImpl(){}
+    };
+
+    
+    class QueueHandlerImpl : public virtual QueueHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline QueueHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        virtual void declare(u_int16_t channel, u_int16_t ticket, string& queue, 
+                             bool passive, bool durable, bool exclusive, 
+                             bool autoDelete, bool nowait, qpid::framing::FieldTable& arguments); 
+                
+        virtual void bind(u_int16_t channel, u_int16_t ticket, string& queue, 
+                          string& exchange, string& routingKey, bool nowait, 
+                          qpid::framing::FieldTable& arguments); 
+                
+        virtual void purge(u_int16_t channel, u_int16_t ticket, string& queue, 
+                           bool nowait); 
+                
+        virtual void delete_(u_int16_t channel, u_int16_t ticket, string& queue, bool ifUnused, bool ifEmpty, 
+                             bool nowait); 
+                
+        virtual ~QueueHandlerImpl(){}
+    };
+
+    class BasicHandlerImpl : public virtual BasicHandler{
+        SessionHandlerImpl* parent;
+      public:
+        inline BasicHandlerImpl(SessionHandlerImpl* _parent) : parent(_parent) {}
+        
+        virtual void qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); 
+                    
+        virtual void consume(u_int16_t channel, u_int16_t ticket, string& queue, string& consumerTag, 
+                             bool noLocal, bool noAck, bool exclusive, bool nowait); 
+                
+        virtual void cancel(u_int16_t channel, string& consumerTag, bool nowait); 
+                
+        virtual void publish(u_int16_t channel, u_int16_t ticket, string& exchange, string& routingKey, 
+                             bool mandatory, bool immediate); 
+                
+        virtual void get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck); 
+                
+        virtual void ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple); 
+                
+        virtual void reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue); 
+                
+        virtual void recover(u_int16_t channel, bool requeue); 
+                
+        virtual ~BasicHandlerImpl(){}
+    };
+
+    inline virtual ChannelHandler* getChannelHandler(){ return channelHandler; }
+    inline virtual ConnectionHandler* getConnectionHandler(){ return connectionHandler; }
+    inline virtual BasicHandler* getBasicHandler(){ return basicHandler; }
+    inline virtual ExchangeHandler* getExchangeHandler(){ return exchangeHandler; }
+    inline virtual QueueHandler* getQueueHandler(){ return queueHandler; }
+ 
+    inline virtual AccessHandler* getAccessHandler(){ return 0; }       
+    inline virtual FileHandler* getFileHandler(){ return 0; }       
+    inline virtual StreamHandler* getStreamHandler(){ return 0; }       
+    inline virtual TxHandler* getTxHandler(){ return 0; }       
+    inline virtual DtxHandler* getDtxHandler(){ return 0; }       
+    inline virtual TunnelHandler* getTunnelHandler(){ return 0; }       
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/SessionHandlerImpl.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h Tue Sep 19 15:06:50 2006
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TopicExchange_
+#define _TopicExchange_
+
+#include <map>
+#include <vector>
+#include "Exchange.h"
+#include "FieldTable.h"
+#include "Message.h"
+#include "MonitorImpl.h"
+#include "Queue.h"
+
+namespace qpid {
+namespace broker {
+    class TopicExchange : public virtual Exchange{
+        const string name;
+        std::map<string, std::vector<Queue::shared_ptr> > bindings;//NOTE: pattern matching not yet supported
+        qpid::concurrent::MonitorImpl lock;
+
+    public:
+        static const std::string typeName;
+        
+        TopicExchange(const string& name);
+        
+        inline virtual const string& getName(){ return name; }
+
+        virtual void bind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+        virtual void unbind(Queue::shared_ptr queue, const string& routingKey, qpid::framing::FieldTable* args);
+
+        virtual void route(Message::shared_ptr& msg, const string& routingKey, qpid::framing::FieldTable* args);
+
+        virtual ~TopicExchange();
+    };
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/broker/inc/TopicExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/AutoDelete.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/AutoDelete.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/AutoDelete.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/AutoDelete.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "AutoDelete.h"
+
+using namespace qpid::broker;
+
+AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry), 
+                                                                            period(_period), 
+                                                                            stopped(true), 
+                                                                            runner(0){}
+
+void AutoDelete::add(Queue::shared_ptr const queue){
+    lock.acquire();
+    queues.push(queue);
+    lock.release();
+}
+
+Queue::shared_ptr const AutoDelete::pop(){
+    Queue::shared_ptr next;
+    lock.acquire();
+    if(!queues.empty()){
+        next = queues.front();
+	queues.pop();
+    }
+    lock.release();
+    return next;
+}
+
+void AutoDelete::process(){
+    Queue::shared_ptr seen;
+    for(Queue::shared_ptr q = pop(); q; q = pop()){
+        if(seen == q){
+            add(q);
+            break;
+        }else if(q->canAutoDelete()){
+            std::string name(q->getName());
+            registry->destroy(name);
+            std::cout << "INFO: Auto-deleted queue named " << name << std::endl;
+        }else{
+            add(q);
+            if(!seen) seen = q;
+        }
+    }
+}
+
+void AutoDelete::run(){
+    monitor.acquire();
+    while(!stopped){
+        process();
+        monitor.wait(period);
+    }
+    monitor.release();
+}
+
+void AutoDelete::start(){
+    monitor.acquire();
+    if(stopped){
+        runner = factory.create(this);
+        stopped = false;
+        monitor.release();
+        runner->start();
+    }else{
+        monitor.release();
+    }
+}
+
+void AutoDelete::stop(){
+    monitor.acquire();
+    if(!stopped){
+        stopped = true;
+        monitor.notify();
+        monitor.release();
+        runner->join();
+        delete runner;
+    }else{
+        monitor.release();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/AutoDelete.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Broker.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Broker.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Broker.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include <memory>
+#include "apr_signal.h"
+
+#include "Acceptor.h"
+#include "Configuration.h"
+#include "QpidError.h"
+#include "SessionHandlerFactoryImpl.h"
+
+//optional includes:
+#ifdef _USE_APR_IO_
+
+#include "BlockingAPRAcceptor.h"
+#include "LFAcceptor.h"
+
+#endif 
+
+using namespace qpid::broker;
+using namespace qpid::io;
+
+void handle_signal(int signal);
+
+Acceptor* createAcceptor(Configuration& config);
+
+int main(int argc, char** argv)
+{
+    SessionHandlerFactoryImpl factory;
+    Configuration config;
+    try{
+
+        config.parse(argc, argv);
+        if(config.isHelp()){
+            config.usage();
+        }else{
+#ifdef _USE_APR_IO_         
+            apr_signal(SIGINT, handle_signal);
+#endif
+            try{    
+                std::auto_ptr<Acceptor> acceptor(createAcceptor(config));
+                try{             
+                    acceptor->bind(config.getPort(), &factory);
+                }catch(qpid::QpidError error){
+                    std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+                }
+            }catch(qpid::QpidError error){
+                std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+            }
+        }            
+    }catch(Configuration::ParseException error){
+        std::cout << "Error: " << error.error << std::endl;        
+    }
+
+    return 1;
+}
+
+Acceptor* createAcceptor(Configuration& config){
+    const string type(config.getAcceptor());
+#ifdef _USE_APR_IO_         
+    if("blocking" == type){
+        std::cout << "Using blocking acceptor " << std::endl;
+        return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
+    }else if("non-blocking" == type){
+        std::cout << "Using non-blocking acceptor " << std::endl;
+        return new LFAcceptor(config.isTrace(), 
+                              config.getConnectionBacklog(), 
+                              config.getWorkerThreads(),
+                              config.getMaxConnections());
+    }
+#endif
+    throw Configuration::ParseException("Unrecognised acceptor: " + type);
+}
+
+void handle_signal(int signal){
+    std::cout << "Shutting down..." << std::endl;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/Broker.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Channel.h"
+#include "QpidError.h"
+#include <iostream>
+#include <sstream>
+#include <assert.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out), 
+                                                                       id(_id), 
+                                                                       framesize(_framesize),
+                                                                       transactional(false),
+                                                                       deliveryTag(1),
+                                                                       tagGenerator("sgen"){}
+
+Channel::~Channel(){
+    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+        std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl;
+        delete (i->second);
+    }
+}
+
+bool Channel::exists(string& consumerTag){
+    return consumers.find(consumerTag) != consumers.end();
+}
+
+void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
+    if(tag.empty()) tag = tagGenerator.generate();
+
+    ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection));
+    try{
+        queue->consume(c, exclusive);//may throw exception
+        consumers[tag] = c;
+    }catch(ExclusiveAccessException& e){
+        delete c;
+        throw e;
+    }
+}
+
+void Channel::cancel(string& tag){
+    ConsumerImpl* c = consumers[tag];
+    if(c){
+        c->cancel();
+        consumers.erase(tag);
+        delete c;
+    }
+}
+
+void Channel::close(){
+    //cancel all consumers
+    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
+        ConsumerImpl* c = i->second;
+        c->cancel();
+        consumers.erase(i);
+        delete c;
+    }
+}
+
+void Channel::begin(){
+    transactional = true;
+}
+
+void Channel::commit(){
+
+}
+
+void Channel::rollback(){
+
+}
+
+void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){
+    //send deliver method, header and content(s)
+    msg->deliver(out, id, consumerTag, deliveryTag++, framesize);
+}
+
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, 
+                                    Queue::shared_ptr _queue, 
+                                    ConnectionToken* const _connection) : parent(_parent), 
+                                                                         tag(_tag), 
+                                                                         queue(_queue),
+                                                                         connection(_connection){
+}
+
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+    if(connection != msg->getPublisher()){
+        parent->deliver(msg, tag);
+        return true;
+    }else{
+        return false;
+    }
+}
+
+void Channel::ConsumerImpl::cancel(){
+    if(queue) queue->cancel(this);
+}
+
+void Channel::handlePublish(Message* msg){
+    if(message.get()){
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+    }
+    message = Message::shared_ptr(msg);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header, ExchangeRegistry* exchanges){
+    if(!message.get()){
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got header before publish.");
+    }
+    message->setHeader(header);
+    if(message->isComplete()){
+        publish(exchanges);
+    }
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content, ExchangeRegistry* exchanges){
+    if(!message.get()){
+        THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got content before publish.");
+    }
+    message->addContent(content);
+    if(message->isComplete()){
+        publish(exchanges);
+    }
+}
+
+void Channel::publish(ExchangeRegistry* exchanges){
+    if(!route(message, exchanges)){
+        std::cout << "WARNING: Could not route message." << std::endl;
+    }
+    message.reset();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/Channel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/Configuration.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/Configuration.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/Configuration.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/Configuration.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,195 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "Configuration.h"
+
+using namespace qpid::broker;
+using namespace std;
+
+Configuration::Configuration() : 
+    trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
+    port('p', "port", "Sets the port to listen on (default=5672)", 5672),
+    workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
+    maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+    connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
+    acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
+    help("help", "Prints usage information", false)
+{
+    options.push_back(&trace);
+    options.push_back(&port);
+    options.push_back(&workerThreads);
+    options.push_back(&maxConnections);
+    options.push_back(&connectionBacklog);
+    options.push_back(&acceptor);
+    options.push_back(&help);
+}
+
+Configuration::~Configuration(){}
+
+void Configuration::parse(int argc, char** argv){
+    int position = 1;
+    while(position < argc){
+        bool matched(false);
+        for(op_iterator i = options.begin(); i < options.end() && !matched; i++){
+            matched = (*i)->parse(position, argv, argc);
+        }
+        if(!matched){
+            std::cout << "Warning: skipping unrecognised option " << argv[position] << std::endl;
+            position++;
+        }
+    }
+}
+
+void Configuration::usage(){
+    for(op_iterator i = options.begin(); i < options.end(); i++){
+        (*i)->print(std::cout);
+    }
+}
+
+bool Configuration::isHelp(){
+    return help.getValue();
+}
+
+bool Configuration::isTrace(){
+    return trace.getValue();
+}
+
+int Configuration::getPort(){
+    return port.getValue();
+}
+
+int Configuration::getWorkerThreads(){
+    return workerThreads.getValue();
+}
+
+int Configuration::getMaxConnections(){
+    return maxConnections.getValue();
+}
+
+int Configuration::getConnectionBacklog(){
+    return connectionBacklog.getValue();
+}
+
+const string& Configuration::getAcceptor(){
+    return acceptor.getValue();
+}
+
+Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : 
+    flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
+
+Configuration::Option::Option(const string& _name, const string& _desc) : 
+    flag(""), name("--" + _name), desc(_desc) {}
+
+Configuration::Option::~Option(){}
+
+bool Configuration::Option::match(const string& arg){
+    return flag == arg || name == arg;
+}
+
+bool Configuration::Option::parse(int& i, char** argv, int argc){
+    const string arg(argv[i]);
+    if(match(arg)){
+        if(needsValue()){
+            if(++i < argc) setValue(argv[i]);
+            else throw ParseException("Argument " + arg + " requires a value!");
+        }else{
+            setValue("");
+        }
+        i++;
+        return true;
+    }else{
+        return false;
+    }
+}
+
+void Configuration::Option::print(ostream& out) const {
+    out << "    ";
+    if(flag.length() > 0){
+        out << flag << " or ";
+    }
+    out << name;
+    if(needsValue()) out << "<value>";
+    out << std::endl;
+    out << "            " << desc << std::endl;
+}
+
+
+// String Option:
+
+Configuration::StringOption::StringOption(const char _flag, const string& _name, const string& _desc, const string _value) : 
+    Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::StringOption(const string& _name, const string& _desc, const string _value) : 
+    Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::StringOption::~StringOption(){}
+
+const string& Configuration::StringOption::getValue() const {
+    return value;
+}
+
+bool Configuration::StringOption::needsValue() const {
+    return true;
+}
+
+void Configuration::StringOption::setValue(const std::string& _value){
+    value = _value;
+}
+
+// Int Option:
+
+Configuration::IntOption::IntOption(const char _flag, const string& _name, const string& _desc, const int _value) : 
+    Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::IntOption(const string& _name, const string& _desc, const int _value) : 
+    Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::IntOption::~IntOption(){}
+
+int Configuration::IntOption::getValue() const {
+    return value;
+}
+
+bool Configuration::IntOption::needsValue() const {
+    return true;
+}
+
+void Configuration::IntOption::setValue(const std::string& _value){
+    value = atoi(_value.c_str());
+}
+
+// Bool Option:
+
+Configuration::BoolOption::BoolOption(const char _flag, const string& _name, const string& _desc, const bool _value) : 
+    Option(_flag,_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::BoolOption(const string& _name, const string& _desc, const bool _value) : 
+    Option(_name,_desc), defaultValue(_value), value(_value) {}
+
+Configuration::BoolOption::~BoolOption(){}
+
+bool Configuration::BoolOption::getValue() const {
+    return value;
+}
+
+bool Configuration::BoolOption::needsValue() const {
+    return false;
+}
+
+void Configuration::BoolOption::setValue(const std::string& _value){
+    value = true;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/Configuration.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "DirectExchange.h"
+#include "ExchangeBinding.h"
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+DirectExchange::DirectExchange(const string& _name) : name(_name) {
+
+}
+
+void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+    lock.acquire();
+    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+    std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+    if(i == queues.end()){
+        bindings[routingKey].push_back(queue);
+        queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+    }
+    lock.release();
+}
+
+void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+    lock.acquire();
+    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+
+    std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
+    if(i < queues.end()){
+        queues.erase(i);
+        if(queues.empty()){
+            bindings.erase(routingKey);
+        }
+    }
+    lock.release();
+}
+
+void DirectExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+    lock.acquire();
+    std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
+    int count(0);
+    for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
+        (*i)->deliver(msg);
+    }
+    if(!count){
+        std::cout << "WARNING: DirectExchange " << name << " could not route message with key " << routingKey << std::endl;
+    }
+    lock.release();
+}
+
+DirectExchange::~DirectExchange(){
+
+}
+
+
+const std::string DirectExchange::typeName("direct");

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/DirectExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeBinding.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeBinding.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeBinding.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeBinding.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,32 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ExchangeBinding.h"
+#include "Exchange.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+ExchangeBinding::ExchangeBinding(Exchange* _e, Queue::shared_ptr _q, const string& _key, FieldTable* _args) : e(_e), q(_q), key(_key), args(_args){}
+
+void ExchangeBinding::cancel(){
+    e->unbind(q, key, args);
+    delete this;
+}
+
+ExchangeBinding::~ExchangeBinding(){
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeBinding.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp Tue Sep 19 15:06:50 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "ExchangeRegistry.h"
+#include "MonitorImpl.h"
+
+using namespace qpid::broker;
+using namespace qpid::concurrent;
+
+ExchangeRegistry::ExchangeRegistry() : lock(new MonitorImpl()){}
+
+ExchangeRegistry::~ExchangeRegistry(){
+    delete lock;
+}
+
+void ExchangeRegistry::declare(Exchange* exchange){
+    exchanges[exchange->getName()] = exchange;
+}
+
+void ExchangeRegistry::destroy(const string& name){
+    if(exchanges[name]){
+        delete exchanges[name];
+        exchanges.erase(name);
+    }
+}
+
+Exchange* ExchangeRegistry::get(const string& name){
+    return exchanges[name];
+}

Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/ExchangeRegistry.cpp
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message