activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1448438 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/test: ./ activemq/mock/ activemq/transport/failover/
Date Wed, 20 Feb 2013 22:09:10 GMT
Author: tabish
Date: Wed Feb 20 22:09:09 2013
New Revision: 1448438

URL: http://svn.apache.org/r1448438
Log:
Adds a simple Mock broker that can be used in failover tests.  

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.h  
(with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=1448438&r1=1448437&r2=1448438&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Wed Feb 20 22:09:09 2013
@@ -40,6 +40,7 @@ cc_sources = \
     activemq/core/FifoMessageDispatchChannelTest.cpp \
     activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
     activemq/exceptions/ActiveMQExceptionTest.cpp \
+    activemq/mock/MockBrokerService.cpp \
     activemq/state/ConnectionStateTest.cpp \
     activemq/state/ConnectionStateTrackerTest.cpp \
     activemq/state/ConsumerStateTest.cpp \
@@ -282,6 +283,7 @@ h_sources = \
     activemq/core/FifoMessageDispatchChannelTest.h \
     activemq/core/SimplePriorityMessageDispatchChannelTest.h \
     activemq/exceptions/ActiveMQExceptionTest.h \
+    activemq/mock/MockBrokerService.h \
     activemq/state/ConnectionStateTest.h \
     activemq/state/ConnectionStateTrackerTest.h \
     activemq/state/ConsumerStateTest.h \

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp?rev=1448438&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
Wed Feb 20 22:09:09 2013
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MockBrokerService.h"
+
+#include <activemq/wireformat/openwire/OpenWireFormatFactory.h>
+#include <activemq/wireformat/openwire/OpenWireFormat.h>
+#include <activemq/wireformat/openwire/OpenWireResponseBuilder.h>
+#include <activemq/commands/Command.h>
+#include <activemq/commands/Response.h>
+#include <activemq/commands/WireFormatInfo.h>
+#include <activemq/transport/mock/MockTransport.h>
+
+#include <decaf/net/ServerSocket.h>
+#include <decaf/net/Socket.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/Random.h>
+#include <decaf/util/Properties.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/io/InputStream.h>
+#include <decaf/io/OutputStream.h>
+
+using namespace activemq;
+using namespace activemq::mock;
+using namespace activemq::commands;
+using namespace activemq::wireformat;
+using namespace activemq::wireformat::openwire;
+using namespace activemq::transport::mock;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::io;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::net;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace mock {
+
+    class TcpServer : public lang::Thread {
+    private:
+
+        bool done;
+        bool error;
+        Pointer<ServerSocket> server;
+        Pointer<OpenWireFormat> wireFormat;
+        Pointer<OpenWireResponseBuilder> responeBuilder;
+        CountDownLatch started;
+        Random rand;
+
+    public:
+
+        TcpServer() : Thread(), done(false), error(false), server(), wireFormat(),
+                      responeBuilder(), started(1), rand() {
+            server.reset(new ServerSocket(0));
+
+            Properties properties;
+
+            this->wireFormat = OpenWireFormatFactory().createWireFormat(properties).dynamicCast<OpenWireFormat>();
+            this->responeBuilder.reset(new OpenWireResponseBuilder());
+
+            this->rand.setSeed(System::currentTimeMillis());
+        }
+
+        TcpServer(int port) : Thread(), done(false), error(false), server(), wireFormat(),
+                              responeBuilder(), started(1), rand() {
+            server.reset(new ServerSocket(port));
+
+            Properties properties;
+            this->wireFormat = OpenWireFormatFactory().createWireFormat(properties).dynamicCast<OpenWireFormat>();
+            this->responeBuilder.reset(new OpenWireResponseBuilder());
+
+            this->rand.setSeed(System::currentTimeMillis());
+        }
+
+        virtual ~TcpServer() {
+            stop();
+        }
+
+        int getLocalPort() {
+            if (this->server.get() != NULL) {
+                return server->getLocalPort();
+            }
+
+            return 0;
+        }
+
+        void waitUntilStarted() {
+            this->started.await();
+        }
+
+        void waitUntilStopped() {
+            this->join();
+        }
+
+        void stop() {
+            try {
+                done = true;
+                server->close();
+            } catch (...) {}
+        }
+
+        virtual void run() {
+            try {
+
+                started.countDown();
+
+                while (!done) {
+
+                    MockTransport mock(this->wireFormat, this->responeBuilder);
+
+                    std::auto_ptr<Socket> socket(server->accept());
+                    socket->setSoLinger(false, 0);
+
+                    Pointer<WireFormatInfo> preferred = wireFormat->getPreferedWireFormatInfo();
+
+                    OutputStream* os = socket->getOutputStream();
+                    DataOutputStream dataOut(os);
+
+                    InputStream* is = socket->getInputStream();
+                    DataInputStream dataIn(is);
+
+                    wireFormat->marshal(preferred, &mock, &dataOut);
+                    dataOut.flush();
+
+                    while (!done) {
+                        Pointer<Command> command = wireFormat->unmarshal(&mock,
&dataIn);
+                        Pointer<Response> response = responeBuilder->buildResponse(command);
+
+                        if (response != NULL) {
+                            wireFormat->marshal(response, &mock, &dataOut);
+                        }
+                    }
+                }
+
+            } catch (IOException& ex) {
+                error = true;
+            } catch (Exception& ex) {
+                error = true;
+            } catch (...) {
+                error = true;
+            }
+        }
+    };
+
+    class MockBrokerServiceImpl {
+    private:
+
+        MockBrokerServiceImpl(const MockBrokerServiceImpl&);
+        MockBrokerServiceImpl& operator= (const MockBrokerServiceImpl&);
+
+    public:
+
+        Pointer<TcpServer> server;
+        CountDownLatch started;
+        CountDownLatch stopped;
+
+    public:
+
+        MockBrokerServiceImpl() : server(), started(1), stopped(1) {
+            this->server.reset(new TcpServer());
+        }
+
+        MockBrokerServiceImpl(int port) : server(), started(1), stopped(1) {
+            this->server.reset(new TcpServer(port));
+        }
+
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+MockBrokerService::MockBrokerService() : impl(new MockBrokerServiceImpl()) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MockBrokerService::MockBrokerService(int port) : impl(new MockBrokerServiceImpl(port)) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MockBrokerService::~MockBrokerService() {
+    try {
+        stop();
+    }
+    AMQ_CATCHALL_NOTHROW()
+
+    try {
+        delete impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MockBrokerService::start() {
+    this->impl->server->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MockBrokerService::stop() {
+    this->impl->server->stop();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MockBrokerService::waitUntilStarted() {
+    this->impl->server->waitUntilStarted();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MockBrokerService::waitUntilStopped() {
+    this->impl->server->waitUntilStopped();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string MockBrokerService::getConnectString() const {
+    int port = this->impl->server->getLocalPort();
+    return std::string("tcp://localhost:") + Integer::toString(port);
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.h?rev=1448438&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.h Wed
Feb 20 22:09:09 2013
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_MOCK_MOCKBROKERSERVICE_H_
+#define _ACTIVEMQ_MOCK_MOCKBROKERSERVICE_H_
+
+#include <activemq/util/Config.h>
+
+#include <string>
+
+namespace activemq {
+namespace mock {
+
+    class MockBrokerServiceImpl;
+
+    class MockBrokerService {
+    private:
+
+        MockBrokerService(const MockBrokerService&);
+        MockBrokerService& operator= (const MockBrokerService&);
+
+    private:
+
+        MockBrokerServiceImpl* impl;
+
+    public:
+
+        MockBrokerService();
+
+        MockBrokerService(int port);
+
+        virtual ~MockBrokerService();
+
+    public:
+
+        void start();
+
+        void stop();
+
+        void waitUntilStarted();
+
+        void waitUntilStopped();
+
+        std::string getConnectString() const;
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_MOCK_MOCKBROKERSERVICE_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp?rev=1448438&r1=1448437&r2=1448438&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
Wed Feb 20 22:09:09 2013
@@ -23,11 +23,13 @@
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/ActiveMQMessage.h>
 #include <activemq/commands/ConnectionControl.h>
+#include <activemq/mock/MockBrokerService.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/Thread.h>
 #include <decaf/util/UUID.h>
 
 using namespace activemq;
+using namespace activemq::mock;
 using namespace activemq::commands;
 using namespace activemq::transport;
 using namespace activemq::transport::failover;
@@ -668,3 +670,43 @@ void FailoverTransportTest::testUriOptio
 
     transport->close();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testConnectedToMockBroker() {
+
+    MockBrokerService broker1(61616);
+    MockBrokerService broker2(61618);
+
+    broker1.start();
+    broker1.waitUntilStarted();
+
+    std::string uri = "failover://(tcp://localhost:61616,"
+                                  "tcp://localhost:61618)?randomize=false";
+
+    DefaultTransportListener listener;
+    FailoverTransportFactory factory;
+
+    Pointer<Transport> transport( factory.create( uri ) );
+    CPPUNIT_ASSERT( transport != NULL );
+    transport->setTransportListener( &listener );
+
+    FailoverTransport* failover = dynamic_cast<FailoverTransport*>(
+        transport->narrow( typeid( FailoverTransport ) ) );
+
+    CPPUNIT_ASSERT( failover != NULL );
+    CPPUNIT_ASSERT( failover->isRandomize() == false );
+
+    transport->start();
+
+    int count = 0;
+    while (!failover->isConnected() && count++ < 20) {
+        Thread::sleep( 200 );
+    }
+    CPPUNIT_ASSERT( failover->isConnected() == true );
+    CPPUNIT_ASSERT( failover->isConnectedToPriority() == true );
+
+    transport->close();
+
+    broker1.stop();
+    broker1.waitUntilStopped();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h?rev=1448438&r1=1448437&r2=1448438&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
Wed Feb 20 22:09:09 2013
@@ -53,6 +53,7 @@ namespace failover {
         CPPUNIT_TEST( testTransportHandlesConnectionControl );
         CPPUNIT_TEST( testPriorityBackupConfig );
         CPPUNIT_TEST( testUriOptionsApplied );
+        CPPUNIT_TEST( testConnectedToMockBroker );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -73,6 +74,7 @@ namespace failover {
         void testTransportHandlesConnectionControl();
         void testPriorityBackupConfig();
         void testUriOptionsApplied();
+        void testConnectedToMockBroker();
 
     private:
 



Mime
View raw message