activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r505113 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: ./ activemq/concurrent/ activemq/connector/openwire/
Date Fri, 09 Feb 2007 01:14:36 GMT
Author: tabish
Date: Thu Feb  8 17:14:36 2007
New Revision: 505113

URL: http://svn.apache.org/viewvc?view=rev&rev=505113
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?view=diff&rev=505113&r1=505112&r2=505113
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Thu Feb  8 17:14:36 2007
@@ -52,6 +52,7 @@
     activemq/connector/openwire/OpenWireFormat.cpp \
     activemq/connector/openwire/OpenWireCommandReader.cpp \
     activemq/connector/openwire/OpenWireCommandWriter.cpp \
+    activemq/connector/openwire/OpenWireFormatNegotiator.cpp \
     activemq/connector/openwire/marshal/BaseDataStreamMarshaller.cpp \
     activemq/connector/openwire/utils/HexTable.cpp \
     activemq/connector/openwire/utils/BooleanStream.cpp \
@@ -196,6 +197,7 @@
     activemq/connector/openwire/OpenWireFormat.h \
     activemq/connector/openwire/OpenWireCommandReader.h \
     activemq/connector/openwire/OpenWireCommandWriter.h \
+    activemq/connector/openwire/OpenWireFormatNegotiator.h \
     activemq/connector/openwire/marshal/BaseDataStreamMarshaller.h \
     activemq/connector/openwire/utils/HexTable.h \
     activemq/connector/openwire/utils/BooleanStream.h \

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.cpp?view=auto&rev=505113
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.cpp
Thu Feb  8 17:14:36 2007
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CountDownLatch.h"
+
+using namespace activemq;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+CountDownLatch::CountDownLatch( int count )
+{
+    this->count = count;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+CountDownLatch::~CountDownLatch()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatch::await() throw ( cms::CMSException ) {
+
+    try {
+
+        synchronized( &mutex ) {
+            if( count == 0 ){
+                return;
+            }
+
+            mutex.wait();
+        }
+    }
+    AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatch::await( long long timeOut ) throw ( cms::CMSException ) {
+    try {
+
+        synchronized( &mutex ) {
+            if( count == 0 ){
+                return;
+            }
+
+            mutex.wait( timeOut );
+        }
+    }
+    AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void CountDownLatch::countDown() {
+    try {
+
+        synchronized( &mutex ) {
+            count--;
+
+            // Signal when done.
+            if( count == 0 ){
+                mutex.notifyAll();
+            }
+        }
+    }
+    AMQ_CATCHALL_NOTHROW()
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.h?view=auto&rev=505113
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/concurrent/CountDownLatch.h
Thu Feb  8 17:14:36 2007
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONCURRENT_COUNTDOWNLATCH_H_
+#define _ACTIVEMQ_CONCURRENT_COUNTDOWNLATCH_H_
+
+#include <activemq/concurrent/Mutex.h>
+#include <cms/CMSException.h>
+
+namespace activemq{
+namespace concurrent{
+
+    class CountDownLatch
+    {
+    private:
+
+        /**
+         * number to count down to
+         */
+        int count;
+
+        /**
+         * Mutex to protect the counts, and wait on.
+         */
+        Mutex mutex;
+
+    public:
+
+        /**
+         * Constructor
+         * @param count - number to count down from.
+         */
+        CountDownLatch( int count );
+
+        virtual ~CountDownLatch();
+
+        /**
+         * Waits for the Count to be zero, and then
+         */
+        virtual void await() throw ( cms::CMSException );
+
+        /**
+         * Waits for the Count to hit zero, or a timeout.
+         * @param timeOut - time in milliseconds to wait.
+         */
+        virtual void await( long long timeOut )  throw ( cms::CMSException );
+
+        /**
+         * Counts down the latch, releasing all waiting threads when
+         * the count hits zero.
+         */
+        virtual void countDown();
+
+        /**
+         * Gets the current count
+         * @returns int count value
+         */
+        virtual int getCount() const {
+            return this->count;
+        }
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_COUNTDOWNLATCH_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?view=auto&rev=505113
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
Thu Feb  8 17:14:36 2007
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireFormatNegotiator.h"
+
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::connector;
+using namespace activemq::connector::openwire;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
+                                                    Transport* next,
+                                                    const bool own ) :
+    TransportFilter( next, own )
+{
+    this->firstTime = true;
+    this->openWireFormat = openWireFormat;
+    this->closed = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireFormatNegotiator::~OpenWireFormatNegotiator()
+{
+    // Close the transport and destroy it.
+    close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::oneway( Command* command )
+    throw( CommandIOException, exceptions::UnsupportedOperationException )
+{
+
+    try{
+
+        if( closed || next == NULL ){
+            throw CommandIOException( __FILE__, __LINE__,
+                "transport already closed" );
+        }
+
+        next->oneway( command );
+    }
+    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::onCommand( Command* command ){
+
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::start() throw( cms::CMSException ){
+
+    /**
+     * We're already started.
+     */
+    if( !closed ){
+        return;
+    }
+
+    if( commandlistener == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "commandListener is invalid" );
+    }
+
+    if( exceptionListener == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "exceptionListener is invalid" );
+    }
+
+    if( next == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "next transport is NULL" );
+    }
+
+    // Start the delegate transport object.
+    next->start();
+
+    if( firstTime = true ) {
+
+        try {
+
+            // The First Time is now over with
+            firstTime = false;
+
+            // We first send the WireFormat that we'd prefer.
+            //next.Oneway( wireFormat->getPreferedWireFormatInfo() );
+
+        } catch( ActiveMQException& ex ) {
+
+            //wireInfoSentDownLatch.countDown();
+            ex.setMark( __FILE__, __LINE__ );
+            throw ex;
+        }
+    }
+
+    // Mark it as open.
+    closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::close() throw( cms::CMSException ){
+
+    if( !closed && next != NULL ){
+        next->close();
+    }
+
+    closed = true;
+}
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h?view=auto&rev=505113
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
Thu Feb  8 17:14:36 2007
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_OPENWIRE_OPENWIREFORMATNEGOTIATOR_H_
+#define _ACTIVEMQ_CONNECTOR_OPENWIRE_OPENWIREFORMATNEGOTIATOR_H_
+
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/transport/Command.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/connector/openwire/OpenWireFormat.h>
+
+namespace activemq{
+namespace connector{
+namespace openwire{
+
+    class OpenWireFormatNegotiator : public transport::TransportFilter
+    {
+    private:
+
+        /**
+         * Time to wait before we declare that the negotiation has timed out.
+         */
+        static const int negotiationTimeout = 15000;
+
+        /**
+         * Have we started already?
+         */
+        bool firstTime;
+
+        /**
+         * The OpenWireFormat object that we use in negotiation.
+         */
+        OpenWireFormat* openWireFormat;
+
+        /**
+         * Indicates Transport has shut down
+         */
+        bool closed;
+
+    public:
+
+        /**
+         * Constructor - Initializes this object around another Transport
+         * @param openWireFormat - The WireFormat object we use to negotiate
+         * @param next - The next transport in the chain
+         * @param own - do we own the Transport pointer.
+         */
+        OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
+                                  transport::Transport* next,
+                                  const bool own = true );
+
+        virtual ~OpenWireFormatNegotiator();
+
+        /**
+         * Sends a one-way command.  Does not wait for any response from the
+         * broker.
+         * @param command the command to be sent.
+         * @throws CommandIOException if an exception occurs during writing of
+         * the command.
+         * @throws UnsupportedOperationException if this method is not implemented
+         * by this transport.
+         */
+        virtual void oneway( transport::Command* command )
+            throw( transport::CommandIOException, exceptions::UnsupportedOperationException
);
+
+        /**
+         * This is called in the context of the nested transport's
+         * reading thread.  In the case of a response object,
+         * updates the request map and notifies those waiting on the
+         * response.  Non-response messages are just delegated to
+         * the command listener.
+         * @param command the received from the nested transport.
+         */
+        virtual void onCommand( transport::Command* command );
+
+        /**
+         * Starts this transport object and creates the thread for
+         * polling on the input stream for commands.  If this object
+         * has been closed, throws an exception.  Before calling start,
+         * the caller must set the IO streams and the reader and writer
+         * objects.
+         * @throws CMSException if an error occurs or if this transport
+         * has already been closed.
+         */
+        virtual void start() throw( cms::CMSException );
+
+        /**
+         * Stops the polling thread and closes the streams.  This can
+         * be called explicitly, but is also called in the destructor. Once
+         * this object has been closed, it cannot be restarted.
+         * @throws CMSException if errors occur.
+         */
+        virtual void close() throw( cms::CMSException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_OPENWIRE_OPENWIREFORMATNEGOTIATOR_H_*/



Mime
View raw message