Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9AA7AD6A8 for ; Mon, 16 Jul 2012 10:35:54 +0000 (UTC) Received: (qmail 7915 invoked by uid 500); 16 Jul 2012 10:35:54 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 7613 invoked by uid 500); 16 Jul 2012 10:35:47 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 7526 invoked by uid 99); 16 Jul 2012 10:35:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jul 2012 10:35:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jul 2012 10:35:40 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 61F8B23888D2 for ; Mon, 16 Jul 2012 10:35:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1361968 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/m4/ hedwig-client/src/main/cpp/scripts/ hedwig-client/src/main/cpp/test/ Date: Mon, 16 Jul 2012 10:35:20 -0000 To: commits@zookeeper.apache.org From: sijie@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120716103521.61F8B23888D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sijie Date: Mon Jul 16 10:35:19 2012 New Revision: 1361968 URL: http://svn.apache.org/viewvc?rev=1361968&view=rev Log: BOOKKEEPER-310: Changes in hedwig server to support JMS spec (ivank via sijie) Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/m4/gtest.m4 (with props) Removed: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubdatatest.cpp Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/README zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/utiltest.cpp Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Jul 16 10:35:19 2012 @@ -60,6 +60,10 @@ Trunk (unreleased changes) BOOKKEEPER-329: provide stop scripts for hub server (sijie via ivank) + hedwig-client: + + BOOKKEEPER-310: Changes in hedwig server to support JMS spec (ivank via sijie) + Release 4.1.0 - 2012-06-07 Non-backward compatible changes: Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/README URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/README?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/README (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/README Mon Jul 16 10:35:19 2012 @@ -1,10 +1,38 @@ += BUILDING = + To build: $ libtoolize $ autoreconf -fi $ ./configure $ make -The devel packages for protobuf, cppunit, log4cxx & boost are required. +The devel packages for protobuf, log4cxx & boost are required to build. + += TESTING = + +To test, Google Test(http://code.google.com/p/googletest/) is required. +The project must be configured with the location of gtest. Making with +the target "check" will run all the tests. + + $ ./configure --enable-gtest=/home/user/src/gtest-1.6.0 + $ make check + +To run individual tests, first start a test cluster. We provide a +convenience script to do this. + + $ sh scripts/tester.sh start-cluster + +Once the cluster is running, you can run individual tests using the test +harness. + + $ test/hedwigtest --gtest_filter=PublishTest.testAsyncPublish + +To get a list of tests: + + $ test/hedwigtest --gtest_list_tests + +test/hedwigtest is a libtool wrapper, which cannot be used directly with +gdb. To run a test with gdb: -To test: - $ make check \ No newline at end of file + $ libtool --mode=execute gdb test/hedwigtest + (gdb) run --gtest_filter=PublishTest.testAsyncPublish Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac Mon Jul 16 10:35:19 2012 @@ -27,7 +27,9 @@ AC_CONFIG_FILES([Makefile lib/Makefile t AC_PROG_LIBTOOL AC_CONFIG_MACRO_DIR([m4]) PKG_CHECK_MODULES([DEPS], [liblog4cxx protobuf]) -PKG_CHECK_MODULES([TESTDEPS], [cppunit]) + +GTEST_LIB_CHECK([1.5.0], [AC_MSG_RESULT([GoogleTest found, Tests Enabled])], + [AC_MSG_WARN([GoogleTest not found, Tests disabled])]) AX_BOOST_BASE AX_BOOST_ASIO Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/m4/gtest.m4 URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/m4/gtest.m4?rev=1361968&view=auto ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/m4/gtest.m4 (added) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/m4/gtest.m4 Mon Jul 16 10:35:19 2012 @@ -0,0 +1,74 @@ +dnl GTEST_LIB_CHECK([minimum version [, +dnl action if found [,action if not found]]]) +dnl +dnl Check for the presence of the Google Test library, optionally at a minimum +dnl version, and indicate a viable version with the HAVE_GTEST flag. It defines +dnl standard variables for substitution including GTEST_CPPFLAGS, +dnl GTEST_CXXFLAGS, GTEST_LDFLAGS, and GTEST_LIBS. It also defines +dnl GTEST_VERSION as the version of Google Test found. Finally, it provides +dnl optional custom action slots in the event GTEST is found or not. +AC_DEFUN([GTEST_LIB_CHECK], +[ +dnl Provide a flag to enable or disable Google Test usage. +AC_ARG_ENABLE([gtest], + [AS_HELP_STRING([--enable-gtest], + [Enable tests using the Google C++ Testing Framework. + (Default is disabled.)])], + [], + [enable_gtest=no]) +AC_ARG_VAR([GTEST_CONFIG], + [The exact path of Google Test's 'gtest-config' script.]) +AC_ARG_VAR([GTEST_CPPFLAGS], + [C-like preprocessor flags for Google Test.]) +AC_ARG_VAR([GTEST_CXXFLAGS], + [C++ compile flags for Google Test.]) +AC_ARG_VAR([GTEST_LDFLAGS], + [Linker path and option flags for Google Test.]) +AC_ARG_VAR([GTEST_LIBS], + [Library linking flags for Google Test.]) +AC_ARG_VAR([GTEST_VERSION], + [The version of Google Test available.]) +HAVE_GTEST="no" +AS_IF([test "x${enable_gtest}" != "xno"], + [AC_MSG_CHECKING([for 'gtest-config']) + AS_IF([test "x${enable_gtest}" != "xyes"], + [AS_IF([test -x "${enable_gtest}/scripts/gtest-config"], + [GTEST_CONFIG="${enable_gtest}/scripts/gtest-config"], + [GTEST_CONFIG="${enable_gtest}/bin/gtest-config"]) + AS_IF([test -x "${GTEST_CONFIG}"], [], + [AC_MSG_RESULT([no]) + AC_MSG_ERROR([dnl +Unable to locate either a built or installed Google Test. +The specific location '${enable_gtest}' was provided for a built or installed +Google Test, but no 'gtest-config' script could be found at this location.]) + ])], + [AC_PATH_PROG([GTEST_CONFIG], [gtest-config])]) + AS_IF([test -x "${GTEST_CONFIG}"], + [AC_MSG_RESULT([${GTEST_CONFIG}]) + m4_ifval([$1], + [_gtest_min_version="--min-version=$1" + AC_MSG_CHECKING([for Google Test at least version >= $1])], + [_gtest_min_version="--min-version=0" + AC_MSG_CHECKING([for Google Test])]) + AS_IF([${GTEST_CONFIG} ${_gtest_min_version}], + [AC_MSG_RESULT([yes]) + HAVE_GTEST='yes'], + [AC_MSG_RESULT([no])])], + [AC_MSG_RESULT([no])]) + AS_IF([test "x${HAVE_GTEST}" = "xyes"], + [GTEST_CPPFLAGS=`${GTEST_CONFIG} --cppflags` + GTEST_CXXFLAGS=`${GTEST_CONFIG} --cxxflags` + GTEST_LDFLAGS=`${GTEST_CONFIG} --ldflags` + GTEST_LIBS=`${GTEST_CONFIG} --libs` + GTEST_VERSION=`${GTEST_CONFIG} --version` + AC_DEFINE([HAVE_GTEST],[1],[Defined when Google Test is available.])], + [AS_IF([test "x${enable_gtest}" = "xyes"], + [AC_MSG_ERROR([dnl +Google Test was enabled, but no viable version could be found.]) + ])])]) +AC_SUBST([HAVE_GTEST]) +AM_CONDITIONAL([HAVE_GTEST],[test "x$HAVE_GTEST" = "xyes"]) +AS_IF([test "x$HAVE_GTEST" = "xyes"], + [m4_ifval([$2], [$2])], + [m4_ifval([$3], [$3])]) +]) Propchange: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/m4/gtest.m4 ------------------------------------------------------------------------------ svn:eol-style = native Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf Mon Jul 16 10:35:19 2012 @@ -26,24 +26,18 @@ log4j.appender.rootAppender.layout=org.a log4j.appender.hedwig=org.apache.log4j.ConsoleAppender #log4j.appender.hedwig.fileName=./testLog.log log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout -log4j.appender.hedwig.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %c %p - %m%n -log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout -log4j.appender.hedwig.layout.ConversionPattern=%.5m%n +log4j.appender.hedwig.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %p %c - %m%n log4j.appender.hedwigtest=org.apache.log4j.ConsoleAppender #log4j.appender.hedwig.fileName=./testLog.log log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout -log4j.appender.hedwigtest.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %c %p - %m%n -log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout -log4j.appender.hedwigtest.layout.ConversionPattern=%.5m%n +log4j.appender.hedwigtest.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %p %c - %m%n # category -log4j.category.hedwig=INFO, hedwig -log4j.rootCategory=INFO +log4j.category.hedwig=OFF, hedwig +log4j.category.hedwigtest=OFF, hedwigtest + +log4j.rootCategory=OFF #log4j.category.hedwig.channel=ERROR -log4j.category.hedwig.util=ERROR -log4j.category.hedwigtest.servercontrol=ERROR -log4j.category.hedwigtest=INFO, hedwigtest -log4j.rootCategory=INFO Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh Mon Jul 16 10:35:19 2012 @@ -67,7 +67,7 @@ singletest() { stop_cluster; start_cluster; - ../test/hedwigtest $1 + ../test/hedwigtest --gtest_filter=$1 RESULT=$? stop_cluster; Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am Mon Jul 16 10:35:19 2012 @@ -16,11 +16,19 @@ # limitations under the License. # +if HAVE_GTEST bin_PROGRAMS = hedwigtest -hedwigtest_SOURCES = main.cpp utiltest.cpp pubsubdatatest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp -hedwigtest_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) $(TESTDEPS_CFLAGS) $(BOOST_CPPFLAGS) -hedwigtest_LDADD = $(DEPS_LIBS) $(TESTDEPS_LIBS) -L$(top_builddir)/lib -lhedwig01 -hedwigtest_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB) +hedwigtest_SOURCES = main.cpp utiltest.cpp publishtest.cpp subscribetest.cpp pubsubtest.cpp messageboundtest.cpp +hedwigtest_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS) $(GTEST_CPPFLAGS) $(BOOST_CPPFLAGS) +hedwigtest_CXXFLAGS = $(GTEST_CXXFLAGS) +hedwigtest_LDADD = $(DEPS_LIBS) $(GTEST_LIBS) -L$(top_builddir)/lib -lhedwig01 +hedwigtest_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB) $(GTEST_LDFLAGS) check: hedwigtest bash ../scripts/tester.sh all +else +check: + @echo "\n\nYou haven't configured with gtest. Run the ./configure command with --enable-gtest=" + @echo "i.e. ./configure --enable-gtest=/home/user/src/gtest-1.6.0" + @echo "See the README for more info\n\n\b" +endif Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp Mon Jul 16 10:35:19 2012 @@ -32,13 +32,7 @@ #include "util.h" -#include -#include - -#include -#include - -HedwigCppTextTestProgressListener gprogress; +#include "gtest/gtest.h" int main( int argc, char **argv) { @@ -54,29 +48,10 @@ int main( int argc, char **argv) } catch (...) { std::cerr << "unknown exception while configuring log4cpp vi'." << std::endl; } - std::string testPath = (argc > 2) ? std::string(argv[2]) : ""; - - CppUnit::TextTestRunner runner; - - if (argc > 1) { - CppUnit::TestFactoryRegistry ®istry = CppUnit::TestFactoryRegistry::getRegistry(argv[1]); - - runner.addTest( registry.makeTest() ); - } else { - CppUnit::TestFactoryRegistry ®istry = CppUnit::TestFactoryRegistry::getRegistry("*"); - registry.addRegistry("Util"); - registry.addRegistry("Subscribe"); - registry.addRegistry("Publish"); - registry.addRegistry("PubSub"); - registry.addRegistry("MessageBound"); - - runner.addTest( registry.makeTest() ); - } - runner.eventManager().addListener( &gprogress ); - - bool ret = runner.run(testPath); + ::testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); google::protobuf::ShutdownProtobufLibrary(); - - return (ret == true) ? 0 : 1; + + return ret; } Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp Mon Jul 16 10:35:19 2012 @@ -19,9 +19,7 @@ #include #endif -#include -#include -#include +#include "gtest/gtest.h" #include "../lib/clientimpl.h" #include @@ -35,175 +33,152 @@ static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); -class MessageBoundTestSuite : public CppUnit::TestFixture { - class MessageBoundConfiguration : public Hedwig::Configuration { - public: - MessageBoundConfiguration() : address("localhost:4081") {} + +class MessageBoundConfiguration : public Hedwig::Configuration { +public: + MessageBoundConfiguration() : address("localhost:4081") {} - virtual int getInt(const std::string& key, int defaultVal) const { - if (key == Configuration::SUBSCRIPTION_MESSAGE_BOUND) { - return 5; - } - return defaultVal; + virtual int getInt(const std::string& key, int defaultVal) const { + if (key == Configuration::SUBSCRIPTION_MESSAGE_BOUND) { + return 5; } + return defaultVal; + } - virtual const std::string get(const std::string& key, const std::string& defaultVal) const { - if (key == Configuration::DEFAULT_SERVER) { - return address; - } else { - return defaultVal; - } - } - - virtual bool getBool(const std::string& /*key*/, bool defaultVal) const { + virtual const std::string get(const std::string& key, const std::string& defaultVal) const { + if (key == Configuration::DEFAULT_SERVER) { + return address; + } else { return defaultVal; } - - protected: - const std::string address; - }; + } -private: - CPPUNIT_TEST_SUITE( MessageBoundTestSuite ); - CPPUNIT_TEST(testMessageBound); - CPPUNIT_TEST(testMultipleSubscribers); - CPPUNIT_TEST_SUITE_END(); + virtual bool getBool(const std::string& /*key*/, bool defaultVal) const { + return defaultVal; + } +protected: + const std::string address; +}; + +class MessageBoundOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback { public: - MessageBoundTestSuite() { + MessageBoundOrderCheckingMessageHandlerCallback(const int nextExpectedMsg) + : nextExpectedMsg(nextExpectedMsg) { } - ~MessageBoundTestSuite() { + virtual void consume(const std::string& topic, const std::string& subscriberId, + const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { + boost::lock_guard lock(mutex); + + int thisMsg = atoi(msg.body().c_str()); + LOG4CXX_DEBUG(logger, "received message " << thisMsg); + if (thisMsg == nextExpectedMsg) { + nextExpectedMsg++; + } + // checking msgId + callback->operationComplete(); } - void setUp() - { - } - - void tearDown() - { + int nextExpected() { + return nextExpectedMsg; } - class MyOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback { - public: - MyOrderCheckingMessageHandlerCallback(const int nextExpectedMsg) - : nextExpectedMsg(nextExpectedMsg) { - } - - virtual void consume(const std::string& topic, const std::string& subscriberId, - const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { - boost::lock_guard lock(mutex); - - int thisMsg = atoi(msg.body().c_str()); - LOG4CXX_DEBUG(logger, "received message " << thisMsg); - if (thisMsg == nextExpectedMsg) { - nextExpectedMsg++; - } - // checking msgId - callback->operationComplete(); - } - - int nextExpected() { - return nextExpectedMsg; - } +protected: + boost::mutex mutex; + int nextExpectedMsg; +}; - protected: - boost::mutex mutex; - int nextExpectedMsg; - }; - - void sendXExpectLastY(Hedwig::Publisher& pub, Hedwig::Subscriber& sub, const std::string& topic, - const std::string& subid, int X, int Y) { - for (int i = 0; i < X; i++) { - std::stringstream oss; - oss << i; - pub.publish(topic, oss.str()); - } +void sendXExpectLastY(Hedwig::Publisher& pub, Hedwig::Subscriber& sub, const std::string& topic, + const std::string& subid, int X, int Y) { + for (int i = 0; i < X; i++) { + std::stringstream oss; + oss << i; + pub.publish(topic, oss.str()); + } - sub.subscribe(topic, subid, Hedwig::SubscribeRequest::ATTACH); + sub.subscribe(topic, subid, Hedwig::SubscribeRequest::ATTACH); - MyOrderCheckingMessageHandlerCallback* cb = - new MyOrderCheckingMessageHandlerCallback(X - Y); + MessageBoundOrderCheckingMessageHandlerCallback* cb = + new MessageBoundOrderCheckingMessageHandlerCallback(X - Y); - Hedwig::MessageHandlerCallbackPtr handler(cb); - sub.startDelivery(topic, subid, handler); + Hedwig::MessageHandlerCallbackPtr handler(cb); + sub.startDelivery(topic, subid, handler); - for (int i = 0; i < 100; i++) { - if (cb->nextExpected() == X) { - break; - } else { - sleep(1); - } + for (int i = 0; i < 100; i++) { + if (cb->nextExpected() == X) { + break; + } else { + sleep(1); } - CPPUNIT_ASSERT(cb->nextExpected() == X); - - sub.stopDelivery(topic, subid); - sub.closeSubscription(topic, subid); } + ASSERT_TRUE(cb->nextExpected() == X); - void testMessageBound() { - Hedwig::Configuration* conf = new MessageBoundConfiguration(); - std::auto_ptr confptr(conf); + sub.stopDelivery(topic, subid); + sub.closeSubscription(topic, subid); +} + +TEST(MessageBoundTest, testMessageBound) { + Hedwig::Configuration* conf = new MessageBoundConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); - std::string topic = "testTopic"; - std::string subid = "testSubId"; - sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.closeSubscription(topic, subid); + std::string topic = "testTopic"; + std::string subid = "testSubId"; + sub.subscribe(topic, subid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + sub.closeSubscription(topic, subid); - sendXExpectLastY(pub, sub, topic, subid, 100, 5); - } + sendXExpectLastY(pub, sub, topic, subid, 100, 5); +} - void testMultipleSubscribers() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); +TEST(MessageBoundTest, testMultipleSubscribers) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); - Hedwig::SubscriptionOptions options5; - options5.set_messagebound(5); - options5.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - Hedwig::SubscriptionOptions options20; - options20.set_messagebound(20); - options20.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - Hedwig::SubscriptionOptions optionsUnlimited; - optionsUnlimited.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - std::string topic = "testTopic"; - std::string subid5 = "testSubId5"; - std::string subid20 = "testSubId20"; - std::string subidUnlimited = "testSubIdUnlimited"; - - sub.subscribe(topic, subid5, options5); - sub.closeSubscription(topic, subid5); - - sendXExpectLastY(pub, sub, topic, subid5, 1000, 5); - - sub.subscribe(topic, subid20, options20); - sub.closeSubscription(topic, subid20); - sendXExpectLastY(pub, sub, topic, subid20, 1000, 20); - - sub.subscribe(topic, subidUnlimited, optionsUnlimited); - sub.closeSubscription(topic, subidUnlimited); - - sendXExpectLastY(pub, sub, topic, subidUnlimited, 1000, 1000); - sub.unsubscribe(topic, subidUnlimited); - - sendXExpectLastY(pub, sub, topic, subid20, 1000, 20); - sub.unsubscribe(topic, subid20); - - sendXExpectLastY(pub, sub, topic, subid5, 1000, 5); - sub.unsubscribe(topic, subid5); - } -}; + Hedwig::SubscriptionOptions options5; + options5.set_messagebound(5); + options5.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + Hedwig::SubscriptionOptions options20; + options20.set_messagebound(20); + options20.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + Hedwig::SubscriptionOptions optionsUnlimited; + optionsUnlimited.set_createorattach(Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + + std::string topic = "testTopic"; + std::string subid5 = "testSubId5"; + std::string subid20 = "testSubId20"; + std::string subidUnlimited = "testSubIdUnlimited"; + + sub.subscribe(topic, subid5, options5); + sub.closeSubscription(topic, subid5); + + sendXExpectLastY(pub, sub, topic, subid5, 1000, 5); + + sub.subscribe(topic, subid20, options20); + sub.closeSubscription(topic, subid20); + sendXExpectLastY(pub, sub, topic, subid20, 1000, 20); + + sub.subscribe(topic, subidUnlimited, optionsUnlimited); + sub.closeSubscription(topic, subidUnlimited); + + sendXExpectLastY(pub, sub, topic, subidUnlimited, 1000, 1000); + sub.unsubscribe(topic, subidUnlimited); + + sendXExpectLastY(pub, sub, topic, subid20, 1000, 20); + sub.unsubscribe(topic, subid20); + + sendXExpectLastY(pub, sub, topic, subid5, 1000, 5); + sub.unsubscribe(topic, subid5); +} -CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( MessageBoundTestSuite, "MessageBound" ); Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/publishtest.cpp Mon Jul 16 10:35:19 2012 @@ -19,9 +19,7 @@ #include #endif -#include -#include -#include +#include "gtest/gtest.h" #include "../lib/clientimpl.h" #include @@ -35,116 +33,87 @@ static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); -using namespace CppUnit; - -class PublishTestSuite : public CppUnit::TestFixture { -private: - CPPUNIT_TEST_SUITE( PublishTestSuite ); - CPPUNIT_TEST(testSyncPublish); - CPPUNIT_TEST(testAsyncPublish); - CPPUNIT_TEST(testPublishByMessage); - CPPUNIT_TEST(testMultipleAsyncPublish); - // CPPUNIT_TEST(simplePublish); - //CPPUNIT_TEST(simplePublishAndSubscribe); - //CPPUNIT_TEST(publishAndSubscribeWithRedirect); - CPPUNIT_TEST_SUITE_END(); - -public: - PublishTestSuite() { - } - - ~PublishTestSuite() { - } - - void setUp() - { - } - - void tearDown() - { - } - - void testPublishByMessage() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - Hedwig::Client* client = new Hedwig::Client(*conf); - Hedwig::Publisher& pub = client->getPublisher(); - - Hedwig::Message syncMsg; - syncMsg.set_body("sync publish by Message"); - pub.publish("testTopic", syncMsg); - - SimpleWaitCondition* cond = new SimpleWaitCondition(); - Hedwig::OperationCallbackPtr testcb(new TestCallback(cond)); - Hedwig::Message asyncMsg; - asyncMsg.set_body("async publish by Message"); - pub.asyncPublish("testTopic", asyncMsg, testcb); - cond->wait(); - CPPUNIT_ASSERT(cond->wasSuccess()); - delete cond; - - delete client; - delete conf; - } - - void testSyncPublish() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - - Hedwig::Client* client = new Hedwig::Client(*conf); - Hedwig::Publisher& pub = client->getPublisher(); - - pub.publish("testTopic", "testMessage 1"); - - delete client; - delete conf; - } - - void testAsyncPublish() { - SimpleWaitCondition* cond = new SimpleWaitCondition(); - - Hedwig::Configuration* conf = new TestServerConfiguration(); - Hedwig::Client* client = new Hedwig::Client(*conf); - Hedwig::Publisher& pub = client->getPublisher(); - - Hedwig::OperationCallbackPtr testcb(new TestCallback(cond)); - pub.asyncPublish("testTopic", "async test message", testcb); - - cond->wait(); - - CPPUNIT_ASSERT(cond->wasSuccess()); - - delete cond; - delete client; - delete conf; - } - - void testMultipleAsyncPublish() { - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - SimpleWaitCondition* cond2 = new SimpleWaitCondition(); - SimpleWaitCondition* cond3 = new SimpleWaitCondition(); - - Hedwig::Configuration* conf = new TestServerConfiguration(); - Hedwig::Client* client = new Hedwig::Client(*conf); - Hedwig::Publisher& pub = client->getPublisher(); +TEST(PublishTest, testPublishByMessage) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + Hedwig::Client* client = new Hedwig::Client(*conf); + Hedwig::Publisher& pub = client->getPublisher(); + + Hedwig::Message syncMsg; + syncMsg.set_body("sync publish by Message"); + pub.publish("testTopic", syncMsg); + + SimpleWaitCondition* cond = new SimpleWaitCondition(); + Hedwig::OperationCallbackPtr testcb(new TestCallback(cond)); + Hedwig::Message asyncMsg; + asyncMsg.set_body("async publish by Message"); + pub.asyncPublish("testTopic", asyncMsg, testcb); + cond->wait(); + ASSERT_TRUE(cond->wasSuccess()); + delete cond; + + delete client; + delete conf; +} + +TEST(PublishTest, testSyncPublish) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + + Hedwig::Client* client = new Hedwig::Client(*conf); + Hedwig::Publisher& pub = client->getPublisher(); + + pub.publish("testTopic", "testMessage 1"); + + delete client; + delete conf; +} + +TEST(PublishTest, testAsyncPublish) { + SimpleWaitCondition* cond = new SimpleWaitCondition(); + + Hedwig::Configuration* conf = new TestServerConfiguration(); + Hedwig::Client* client = new Hedwig::Client(*conf); + Hedwig::Publisher& pub = client->getPublisher(); + + Hedwig::OperationCallbackPtr testcb(new TestCallback(cond)); + pub.asyncPublish("testTopic", "async test message", testcb); + + cond->wait(); + + ASSERT_TRUE(cond->wasSuccess()); + + delete cond; + delete client; + delete conf; +} + +TEST(PublishTest, testMultipleAsyncPublish) { + SimpleWaitCondition* cond1 = new SimpleWaitCondition(); + SimpleWaitCondition* cond2 = new SimpleWaitCondition(); + SimpleWaitCondition* cond3 = new SimpleWaitCondition(); + + Hedwig::Configuration* conf = new TestServerConfiguration(); + Hedwig::Client* client = new Hedwig::Client(*conf); + Hedwig::Publisher& pub = client->getPublisher(); - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2)); - Hedwig::OperationCallbackPtr testcb3(new TestCallback(cond3)); - - pub.asyncPublish("testTopic", "async test message #1", testcb1); - pub.asyncPublish("testTopic", "async test message #2", testcb2); - pub.asyncPublish("testTopic", "async test message #3", testcb3); - - cond3->wait(); - CPPUNIT_ASSERT(cond3->wasSuccess()); - cond2->wait(); - CPPUNIT_ASSERT(cond2->wasSuccess()); - cond1->wait(); - CPPUNIT_ASSERT(cond1->wasSuccess()); - - delete cond3; delete cond2; delete cond1; - delete client; - delete conf; - } + Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); + Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2)); + Hedwig::OperationCallbackPtr testcb3(new TestCallback(cond3)); + + pub.asyncPublish("testTopic", "async test message #1", testcb1); + pub.asyncPublish("testTopic", "async test message #2", testcb2); + pub.asyncPublish("testTopic", "async test message #3", testcb3); + + cond3->wait(); + ASSERT_TRUE(cond3->wasSuccess()); + cond2->wait(); + ASSERT_TRUE(cond2->wasSuccess()); + cond1->wait(); + ASSERT_TRUE(cond1->wasSuccess()); + + delete cond3; delete cond2; delete cond1; + delete client; + delete conf; +} /* void simplePublish() { LOG4CXX_DEBUG(logger, ">>> simplePublish"); SimpleWaitCondition* cond = new SimpleWaitCondition(); @@ -267,6 +236,3 @@ public: delete publishconf; delete subscribeconf; }*/ -}; - -CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PublishTestSuite, "Publish"); Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp Mon Jul 16 10:35:19 2012 @@ -21,9 +21,7 @@ #include -#include -#include -#include +#include "gtest/gtest.h" #include #include "../lib/clientimpl.h" @@ -38,572 +36,540 @@ static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); -class PubSubTestSuite : public CppUnit::TestFixture { -private: - CPPUNIT_TEST_SUITE( PubSubTestSuite ); - CPPUNIT_TEST(testPubSubOrderChecking); - CPPUNIT_TEST(testRandomDelivery); - CPPUNIT_TEST(testPubSubContinuousOverClose); - // CPPUNIT_TEST(testPubSubContinuousOverServerDown); - CPPUNIT_TEST(testMultiTopic); - CPPUNIT_TEST(testBigMessage); - CPPUNIT_TEST(testMultiTopicMultiSubscriber); - CPPUNIT_TEST(testPubSubInMultiDispatchThreads); - CPPUNIT_TEST_SUITE_END(); - +class PubSubMessageHandlerCallback : public Hedwig::MessageHandlerCallback { public: - PubSubTestSuite() { - } - - ~PubSubTestSuite() { - } - - void setUp() - { - } - - void tearDown() - { + PubSubMessageHandlerCallback(const std::string& topic, const std::string& subscriberId) : messagesReceived(0), topic(topic), subscriberId(subscriberId) { } - class MyMessageHandlerCallback : public Hedwig::MessageHandlerCallback { - public: - MyMessageHandlerCallback(const std::string& topic, const std::string& subscriberId) : messagesReceived(0), topic(topic), subscriberId(subscriberId) { + virtual void consume(const std::string& topic, const std::string& subscriberId, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { + if (topic == this->topic && subscriberId == this->subscriberId) { + boost::lock_guard lock(mutex); + messagesReceived++; + lastMessage = msg.body(); + callback->operationComplete(); } - - virtual void consume(const std::string& topic, const std::string& subscriberId, const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { - if (topic == this->topic && subscriberId == this->subscriberId) { - boost::lock_guard lock(mutex); - - messagesReceived++; - lastMessage = msg.body(); - callback->operationComplete(); - } - } + } - std::string getLastMessage() { - boost::lock_guard lock(mutex); - std::string s = lastMessage; - return s; - } + std::string getLastMessage() { + boost::lock_guard lock(mutex); + std::string s = lastMessage; + return s; + } + + int numMessagesReceived() { + boost::lock_guard lock(mutex); + int i = messagesReceived; + return i; + } + +protected: + boost::mutex mutex; + int messagesReceived; + std::string lastMessage; + std::string topic; + std::string subscriberId; +}; - int numMessagesReceived() { - boost::lock_guard lock(mutex); - int i = messagesReceived; - return i; - } - - protected: - boost::mutex mutex; - int messagesReceived; - std::string lastMessage; - std::string topic; - std::string subscriberId; - }; - - // order checking callback - class MyOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback { - public: - MyOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume) - : messagesReceived(0), topic(topic), subscriberId(subscriberId), startMsgId(startMsgId), - isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) { - } +// order checking callback +class PubSubOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback { +public: + PubSubOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume) + : messagesReceived(0), topic(topic), subscriberId(subscriberId), startMsgId(startMsgId), + isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) { + } - virtual void consume(const std::string& topic, const std::string& subscriberId, - const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { - if (topic == this->topic && subscriberId == this->subscriberId) { - boost::lock_guard lock(mutex); + virtual void consume(const std::string& topic, const std::string& subscriberId, + const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) { + if (topic == this->topic && subscriberId == this->subscriberId) { + boost::lock_guard lock(mutex); - messagesReceived++; + messagesReceived++; - int newMsgId = atoi(msg.body().c_str()); - // checking msgId - LOG4CXX_DEBUG(logger, "received message " << newMsgId); - if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0 - if (isInOrder) { - if (newMsgId != startMsgId + 1) { - LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId); - isInOrder = false; - } else { - startMsgId = newMsgId; - } - } - } else { // we set first msg id as startMsgId when startMsgId is -1 - startMsgId = newMsgId; - } - callback->operationComplete(); - sleep(sleepTimeInConsume); + int newMsgId = atoi(msg.body().c_str()); + // checking msgId + LOG4CXX_DEBUG(logger, "received message " << newMsgId); + if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0 + if (isInOrder) { + if (newMsgId != startMsgId + 1) { + LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId); + isInOrder = false; + } else { + startMsgId = newMsgId; + } + } + } else { // we set first msg id as startMsgId when startMsgId is -1 + startMsgId = newMsgId; } + callback->operationComplete(); + sleep(sleepTimeInConsume); } + } - int numMessagesReceived() { - boost::lock_guard lock(mutex); - int i = messagesReceived; - return i; - } - - bool inOrder() { - boost::lock_guard lock(mutex); - return isInOrder; - } - - protected: - boost::mutex mutex; - int messagesReceived; - std::string topic; - std::string subscriberId; - int startMsgId; - bool isInOrder; - int sleepTimeInConsume; - }; - - // Publisher integer until finished - class IntegerPublisher { - public: - IntegerPublisher(const std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub, long runTime) - : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub), running(true), runTime(runTime) { - } - - void operator()() { - int i = 1; - long beginTime = curTime(); - long elapsedTime = 0; - - while (running) { - try { - int msg = startMsgId + i; - std::stringstream ss; - ss << msg; - pub.publish(topic, ss.str()); - sleep(sleepTime); - if (numMsgs > 0 && i >= numMsgs) { - running = false; - } else { - if (i % 100 == 0 && - (elapsedTime = (curTime() - beginTime)) >= runTime) { - LOG4CXX_DEBUG(logger, "Elapsed time : " << elapsedTime); - running = false; - } - } - ++i; - } catch (std::exception &e) { - LOG4CXX_WARN(logger, "Exception when publishing messages : " << e.what()); - } - } - } + int numMessagesReceived() { + boost::lock_guard lock(mutex); + int i = messagesReceived; + return i; + } - long curTime() { - struct timeval tv; - long mtime; - gettimeofday(&tv, NULL); - mtime = tv.tv_sec * 1000 + tv.tv_usec / 1000.0 + 0.5; - return mtime; - } - - private: - std::string topic; - int startMsgId; - int numMsgs; - int sleepTime; - Hedwig::Publisher& pub; - bool running; - long runTime; - }; - - // test startDelivery / stopDelivery randomly - void testRandomDelivery() { - std::string topic = "randomDeliveryTopic"; - std::string subscriber = "mysub-randomDelivery"; - - int nLoops = 300; - int sleepTimePerLoop = 1; - int syncTimeout = 10000; - - Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout); - std::auto_ptr confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - // subscribe topic - sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - // start thread to publish message - IntegerPublisher intPublisher = IntegerPublisher(topic, 0, 0, 0, pub, nLoops * sleepTimePerLoop * 1000); - boost::thread pubThread(intPublisher); - - // start random delivery - MyOrderCheckingMessageHandlerCallback* cb = - new MyOrderCheckingMessageHandlerCallback(topic, subscriber, 0, 0); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - for (int i = 0; i < nLoops; i++) { - LOG4CXX_DEBUG(logger, "Randomly Delivery : " << i); - sub.startDelivery(topic, subscriber, handler); - // sleep random time - sleep(sleepTimePerLoop); - sub.stopDelivery(topic, subscriber); - CPPUNIT_ASSERT(cb->inOrder()); - } - - pubThread.join(); + bool inOrder() { + boost::lock_guard lock(mutex); + return isInOrder; } - - // check message ordering - void testPubSubOrderChecking() { - std::string topic = "orderCheckingTopic"; - std::string sid = "mysub-0"; - - int numMessages = 5; - int sleepTimeInConsume = 1; - // sync timeout - int syncTimeout = 10000; - - // in order to guarantee message order, message queue should be locked - // so message received in io thread would be blocked, which also block - // sent operations (publish). because we have only one io thread now - // so increase sync timeout to 10s, which is more than numMessages * sleepTimeInConsume - Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout); - std::auto_ptr confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - // we don't start delivery first, so the message will be queued - // publish ${numMessages} messages, so the messages will be queued - for (int i=1; i<=numMessages; i++) { - std::stringstream ss; - ss << i; - pub.publish(topic, ss.str()); - } - - MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, 0, sleepTimeInConsume); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - // create a thread to publish another ${numMessages} messages - boost::thread pubThread(IntegerPublisher(topic, numMessages, numMessages, sleepTimeInConsume, pub, 0)); +protected: + boost::mutex mutex; + int messagesReceived; + std::string topic; + std::string subscriberId; + int startMsgId; + bool isInOrder; + int sleepTimeInConsume; +}; - // start delivery will consumed the queued messages - // new message will recevied and the queued message should be consumed - // hedwig should ensure the message are received in order - sub.startDelivery(topic, sid, handler); - - // wait until message are all published - pubThread.join(); - - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() == 2 * numMessages) { - break; - } - } - CPPUNIT_ASSERT(cb->inOrder()); +// Publisher integer until finished +class IntegerPublisher { +public: + IntegerPublisher(const std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub, long runTime) + : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub), running(true), runTime(runTime) { } - // check message ordering - void testPubSubInMultiDispatchThreads() { - std::string topic = "PubSubInMultiDispatchThreadsTopic-"; - std::string sid = "mysub-0"; - - int syncTimeout = 10000; - int numDispatchThreads = 4; - int numMessages = 100; - int numTopics = 20; - - Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout, numDispatchThreads); - std::auto_ptr confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - std::vector callbacks; - - for (int i=0; i 0 && i >= numMsgs) { + running = false; + } else { + if (i % 100 == 0 && + (elapsedTime = (curTime() - beginTime)) >= runTime) { + LOG4CXX_DEBUG(logger, "Elapsed time : " << elapsedTime); + running = false; + } + } + ++i; + } catch (std::exception &e) { + LOG4CXX_WARN(logger, "Exception when publishing messages : " << e.what()); + } + } + } - MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(ss.str(), sid, 0, 0); - Hedwig::MessageHandlerCallbackPtr handler(cb); - sub.startDelivery(ss.str(), sid, handler); - callbacks.push_back(handler); - } + long curTime() { + struct timeval tv; + long mtime; + gettimeofday(&tv, NULL); + mtime = tv.tv_sec * 1000 + tv.tv_usec / 1000.0 + 0.5; + return mtime; + } - std::vector > threads; +private: + std::string topic; + int startMsgId; + int numMsgs; + int sleepTime; + Hedwig::Publisher& pub; + bool running; + long runTime; +}; - for (int i=0; i t = boost::shared_ptr( - new boost::thread(IntegerPublisher(ss.str(), 0, numMessages, 0, pub, 0))); - threads.push_back(t); - } +// test startDelivery / stopDelivery randomly +TEST(PubSubTest, testRandomDelivery) { + std::string topic = "randomDeliveryTopic"; + std::string subscriber = "mysub-randomDelivery"; + + int nLoops = 300; + int sleepTimePerLoop = 1; + int syncTimeout = 10000; + + Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout); + std::auto_ptr confptr(conf); + + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); + + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); + + // subscribe topic + sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + + // start thread to publish message + IntegerPublisher intPublisher = IntegerPublisher(topic, 0, 0, 0, pub, nLoops * sleepTimePerLoop * 1000); + boost::thread pubThread(intPublisher); + + // start random delivery + PubSubOrderCheckingMessageHandlerCallback* cb = + new PubSubOrderCheckingMessageHandlerCallback(topic, subscriber, 0, 0); + Hedwig::MessageHandlerCallbackPtr handler(cb); + + for (int i = 0; i < nLoops; i++) { + LOG4CXX_DEBUG(logger, "Randomly Delivery : " << i); + sub.startDelivery(topic, subscriber, handler); + // sleep random time + usleep(rand()%1000000); + sub.stopDelivery(topic, subscriber); + ASSERT_TRUE(cb->inOrder()); + } + + pubThread.join(); + } + + // check message ordering + TEST(PubSubTest, testPubSubOrderChecking) { + std::string topic = "orderCheckingTopic"; + std::string sid = "mysub-0"; + + int numMessages = 5; + int sleepTimeInConsume = 1; + // sync timeout + int syncTimeout = 10000; + + // in order to guarantee message order, message queue should be locked + // so message received in io thread would be blocked, which also block + // sent operations (publish). because we have only one io thread now + // so increase sync timeout to 10s, which is more than numMessages * sleepTimeInConsume + Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout); + std::auto_ptr confptr(conf); + + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); + + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); + + sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + + // we don't start delivery first, so the message will be queued + // publish ${numMessages} messages, so the messages will be queued + for (int i=1; i<=numMessages; i++) { + std::stringstream ss; + ss << i; + pub.publish(topic, ss.str()); + } + + PubSubOrderCheckingMessageHandlerCallback* cb = new PubSubOrderCheckingMessageHandlerCallback(topic, sid, 0, sleepTimeInConsume); + Hedwig::MessageHandlerCallbackPtr handler(cb); + + // create a thread to publish another ${numMessages} messages + boost::thread pubThread(IntegerPublisher(topic, numMessages, numMessages, sleepTimeInConsume, pub, 0)); + + // start delivery will consumed the queued messages + // new message will recevied and the queued message should be consumed + // hedwig should ensure the message are received in order + sub.startDelivery(topic, sid, handler); + + // wait until message are all published + pubThread.join(); + + for (int i = 0; i < 10; i++) { + sleep(3); + if (cb->numMessagesReceived() == 2 * numMessages) { + break; + } + } + ASSERT_TRUE(cb->inOrder()); + } + + // check message ordering + TEST(PubSubTest, testPubSubInMultiDispatchThreads) { + std::string topic = "PubSubInMultiDispatchThreadsTopic-"; + std::string sid = "mysub-0"; + + int syncTimeout = 10000; + int numDispatchThreads = 4; + int numMessages = 100; + int numTopics = 20; + + Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout, numDispatchThreads); + std::auto_ptr confptr(conf); + + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); + + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); + + std::vector callbacks; + + for (int i=0; i > threads; + + for (int i=0; i t = boost::shared_ptr( + new boost::thread(IntegerPublisher(ss.str(), 0, numMessages, 0, pub, 0))); + threads.push_back(t); + } + + for (int i=0; ijoin(); + } + threads.clear(); + + for (int j=0; jnumMessagesReceived() == numMessages) { + break; + } + sleep(3); + } + ASSERT_TRUE(cb->inOrder()); + } + callbacks.clear(); + } + + + TEST(PubSubTest, testPubSubContinuousOverClose) { + std::string topic = "pubSubTopic"; + std::string sid = "MySubscriberid-1"; + + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); + + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); + + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); + + sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); + Hedwig::MessageHandlerCallbackPtr handler(cb); + + sub.startDelivery(topic, sid, handler); + pub.publish(topic, "Test Message 1"); + bool pass = false; + for (int i = 0; i < 10; i++) { + sleep(3); + if (cb->numMessagesReceived() > 0) { + if (cb->getLastMessage() == "Test Message 1") { + pass = true; + break; + } + } + } + ASSERT_TRUE(pass); + sub.closeSubscription(topic, sid); + + pub.publish(topic, "Test Message 2"); + + sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + sub.startDelivery(topic, sid, handler); + pass = false; + for (int i = 0; i < 10; i++) { + sleep(3); + if (cb->numMessagesReceived() > 0) { + if (cb->getLastMessage() == "Test Message 2") { + pass = true; + break; + } + } + } + ASSERT_TRUE(pass); + } + + + /* void testPubSubContinuousOverServerDown() { + std::string topic = "pubSubTopic"; + std::string sid = "MySubscriberid-1"; + + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); + + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); + + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); + + sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); + Hedwig::MessageHandlerCallbackPtr handler(cb); + + sub.startDelivery(topic, sid, handler); + pub.publish(topic, "Test Message 1"); + bool pass = false; + for (int i = 0; i < 10; i++) { + sleep(3); + if (cb->numMessagesReceived() > 0) { + if (cb->getLastMessage() == "Test Message 1") { + pass = true; + break; + } + } + } + CPPUNIT_ASSERT(pass); + sub.closeSubscription(topic, sid); + + pub.publish(topic, "Test Message 2"); + + sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + sub.startDelivery(topic, sid, handler); + pass = false; + for (int i = 0; i < 10; i++) { + sleep(3); + if (cb->numMessagesReceived() > 0) { + if (cb->getLastMessage() == "Test Message 2") { + pass = true; + break; + } + } + } + CPPUNIT_ASSERT(pass); + }*/ + + TEST(PubSubTest, testMultiTopic) { + std::string topicA = "pubSubTopicA"; + std::string topicB = "pubSubTopicB"; + std::string sid = "MySubscriberid-3"; + + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - for (int i=0; ijoin(); - } - threads.clear(); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - for (int j=0; jnumMessagesReceived() == numMessages) { - break; - } - sleep(3); - } - CPPUNIT_ASSERT(cb->inOrder()); - } - callbacks.clear(); - } + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); + sub.subscribe(topicA, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + sub.subscribe(topicB, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + + PubSubMessageHandlerCallback* cbA = new PubSubMessageHandlerCallback(topicA, sid); + Hedwig::MessageHandlerCallbackPtr handlerA(cbA); + sub.startDelivery(topicA, sid, handlerA); - void testPubSubContinuousOverClose() { - std::string topic = "pubSubTopic"; - std::string sid = "MySubscriberid-1"; + PubSubMessageHandlerCallback* cbB = new PubSubMessageHandlerCallback(topicB, sid); + Hedwig::MessageHandlerCallbackPtr handlerB(cbB); + sub.startDelivery(topicB, sid, handlerB); - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); + pub.publish(topicA, "Test Message A"); + pub.publish(topicB, "Test Message B"); + int passA = false, passB = false; - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - MyMessageHandlerCallback* cb = new MyMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - sub.startDelivery(topic, sid, handler); - pub.publish(topic, "Test Message 1"); - bool pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 1") { - pass = true; - break; - } + for (int i = 0; i < 10; i++) { + sleep(3); + if (cbA->numMessagesReceived() > 0) { + if (cbA->getLastMessage() == "Test Message A") { + passA = true; } } - CPPUNIT_ASSERT(pass); - sub.closeSubscription(topic, sid); - - pub.publish(topic, "Test Message 2"); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.startDelivery(topic, sid, handler); - pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 2") { - pass = true; - break; - } + if (cbB->numMessagesReceived() > 0) { + if (cbB->getLastMessage() == "Test Message B") { + passB = true; } } - CPPUNIT_ASSERT(pass); + if (passA && passB) { + break; + } } + ASSERT_TRUE(passA && passB); +} +TEST(PubSubTest, testMultiTopicMultiSubscriber) { + std::string topicA = "pubSubTopicA"; + std::string topicB = "pubSubTopicB"; + std::string sidA = "MySubscriberid-4"; + std::string sidB = "MySubscriberid-5"; - /* void testPubSubContinuousOverServerDown() { - std::string topic = "pubSubTopic"; - std::string sid = "MySubscriberid-1"; - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - MyMessageHandlerCallback* cb = new MyMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - sub.startDelivery(topic, sid, handler); - pub.publish(topic, "Test Message 1"); - bool pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 1") { - pass = true; - break; - } - } - } - CPPUNIT_ASSERT(pass); - sub.closeSubscription(topic, sid); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - pub.publish(topic, "Test Message 2"); - - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.startDelivery(topic, sid, handler); - pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 2") { - pass = true; - break; - } - } - } - CPPUNIT_ASSERT(pass); - }*/ + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); - void testMultiTopic() { - std::string topicA = "pubSubTopicA"; - std::string topicB = "pubSubTopicB"; - std::string sid = "MySubscriberid-3"; - - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + sub.subscribe(topicA, sidA, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + sub.subscribe(topicB, sidB, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + + PubSubMessageHandlerCallback* cbA = new PubSubMessageHandlerCallback(topicA, sidA); + Hedwig::MessageHandlerCallbackPtr handlerA(cbA); + sub.startDelivery(topicA, sidA, handlerA); - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); + PubSubMessageHandlerCallback* cbB = new PubSubMessageHandlerCallback(topicB, sidB); + Hedwig::MessageHandlerCallbackPtr handlerB(cbB); + sub.startDelivery(topicB, sidB, handlerB); - sub.subscribe(topicA, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.subscribe(topicB, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - MyMessageHandlerCallback* cbA = new MyMessageHandlerCallback(topicA, sid); - Hedwig::MessageHandlerCallbackPtr handlerA(cbA); - sub.startDelivery(topicA, sid, handlerA); - - MyMessageHandlerCallback* cbB = new MyMessageHandlerCallback(topicB, sid); - Hedwig::MessageHandlerCallbackPtr handlerB(cbB); - sub.startDelivery(topicB, sid, handlerB); - - pub.publish(topicA, "Test Message A"); - pub.publish(topicB, "Test Message B"); - int passA = false, passB = false; + pub.publish(topicA, "Test Message A"); + pub.publish(topicB, "Test Message B"); + int passA = false, passB = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cbA->numMessagesReceived() > 0) { - if (cbA->getLastMessage() == "Test Message A") { - passA = true; - } - } - if (cbB->numMessagesReceived() > 0) { - if (cbB->getLastMessage() == "Test Message B") { - passB = true; - } + for (int i = 0; i < 10; i++) { + sleep(3); + if (cbA->numMessagesReceived() > 0) { + if (cbA->getLastMessage() == "Test Message A") { + passA = true; } - if (passA && passB) { - break; + } + if (cbB->numMessagesReceived() > 0) { + if (cbB->getLastMessage() == "Test Message B") { + passB = true; } } - CPPUNIT_ASSERT(passA && passB); + if (passA && passB) { + break; + } } + ASSERT_TRUE(passA && passB); +} - void testMultiTopicMultiSubscriber() { - std::string topicA = "pubSubTopicA"; - std::string topicB = "pubSubTopicB"; - std::string sidA = "MySubscriberid-4"; - std::string sidB = "MySubscriberid-5"; +static const int BIG_MESSAGE_SIZE = 16436*2; // MTU to lo0 is 16436 by default on linux - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); +TEST(PubSubTest, testBigMessage) { + std::string topic = "pubSubTopic"; + std::string sid = "MySubscriberid-6"; - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); - - sub.subscribe(topicA, sidA, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.subscribe(topicB, sidB, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - - MyMessageHandlerCallback* cbA = new MyMessageHandlerCallback(topicA, sidA); - Hedwig::MessageHandlerCallbackPtr handlerA(cbA); - sub.startDelivery(topicA, sidA, handlerA); - - MyMessageHandlerCallback* cbB = new MyMessageHandlerCallback(topicB, sidB); - Hedwig::MessageHandlerCallbackPtr handlerB(cbB); - sub.startDelivery(topicB, sidB, handlerB); - - pub.publish(topicA, "Test Message A"); - pub.publish(topicB, "Test Message B"); - int passA = false, passB = false; + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - for (int i = 0; i < 10; i++) { - sleep(3); - if (cbA->numMessagesReceived() > 0) { - if (cbA->getLastMessage() == "Test Message A") { - passA = true; - } - } - if (cbB->numMessagesReceived() > 0) { - if (cbB->getLastMessage() == "Test Message B") { - passB = true; - } - } - if (passA && passB) { - break; - } - } - CPPUNIT_ASSERT(passA && passB); - } + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - static const int BIG_MESSAGE_SIZE = 16436*2; // MTU to lo0 is 16436 by default on linux + Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Publisher& pub = client->getPublisher(); - void testBigMessage() { - std::string topic = "pubSubTopic"; - std::string sid = "MySubscriberid-6"; + sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + PubSubMessageHandlerCallback* cb = new PubSubMessageHandlerCallback(topic, sid); + Hedwig::MessageHandlerCallbackPtr handler(cb); - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); - - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); - - Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::Publisher& pub = client->getPublisher(); + sub.startDelivery(topic, sid, handler); - sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - MyMessageHandlerCallback* cb = new MyMessageHandlerCallback(topic, sid); - Hedwig::MessageHandlerCallbackPtr handler(cb); - - sub.startDelivery(topic, sid, handler); - - char buf[BIG_MESSAGE_SIZE]; - std::string bigmessage(buf, BIG_MESSAGE_SIZE); - pub.publish(topic, bigmessage); - pub.publish(topic, "Test Message 1"); - bool pass = false; - for (int i = 0; i < 10; i++) { - sleep(3); - if (cb->numMessagesReceived() > 0) { - if (cb->getLastMessage() == "Test Message 1") { - pass = true; - break; - } + char buf[BIG_MESSAGE_SIZE]; + std::string bigmessage(buf, BIG_MESSAGE_SIZE); + pub.publish(topic, bigmessage); + pub.publish(topic, "Test Message 1"); + bool pass = false; + for (int i = 0; i < 10; i++) { + sleep(3); + if (cb->numMessagesReceived() > 0) { + if (cb->getLastMessage() == "Test Message 1") { + pass = true; + break; } } - CPPUNIT_ASSERT(pass); } -}; - -CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( PubSubTestSuite, "PubSub" ); + ASSERT_TRUE(pass); +} Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp Mon Jul 16 10:35:19 2012 @@ -19,9 +19,7 @@ #include #endif -#include -#include -#include +#include "gtest/gtest.h" #include "../lib/clientimpl.h" #include @@ -35,165 +33,134 @@ static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__)); -class SubscribeTestSuite : public CppUnit::TestFixture { -private: - CPPUNIT_TEST_SUITE( SubscribeTestSuite ); - CPPUNIT_TEST(testSyncSubscribe); - CPPUNIT_TEST(testSyncSubscribeAttach); - CPPUNIT_TEST(testAsyncSubscribe); - CPPUNIT_TEST(testAsyncSubcribeAndUnsubscribe); - CPPUNIT_TEST(testAsyncSubcribeAndSyncUnsubscribe); - CPPUNIT_TEST(testAsyncSubcribeCloseSubscriptionAndThenResubscribe); - CPPUNIT_TEST(testUnsubscribeWithoutSubscribe); - CPPUNIT_TEST(testSubscribeTwice); - CPPUNIT_TEST_SUITE_END(); - -public: - SubscribeTestSuite() { - - } - - ~SubscribeTestSuite() { - } - - void setUp() - { - } - - void tearDown() - { - } - - void testSyncSubscribe() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); +TEST(SubscribeTest, testSyncSubscribe) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - sub.subscribe("testTopic", "mySubscriberId-1", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - } + sub.subscribe("testTopic", "mySubscriberId-1", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); +} - void testSyncSubscribeAttach() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); +TEST(SubscribeTest, testSyncSubscribeAttach) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - CPPUNIT_ASSERT_THROW(sub.subscribe("iAmATopicWhoDoesNotExist", "mySubscriberId-2", Hedwig::SubscribeRequest::ATTACH), Hedwig::ClientException); - } + ASSERT_THROW(sub.subscribe("iAmATopicWhoDoesNotExist", "mySubscriberId-2", Hedwig::SubscribeRequest::ATTACH), Hedwig::ClientException); +} - void testAsyncSubscribe() { - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr cond1ptr(cond1); +TEST(SubscribeTest, testAsyncSubscribe) { + SimpleWaitCondition* cond1 = new SimpleWaitCondition(); + std::auto_ptr cond1ptr(cond1); - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); + Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - sub.asyncSubscribe("testTopic", "mySubscriberId-3", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); + sub.asyncSubscribe("testTopic", "mySubscriberId-3", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); - cond1->wait(); - CPPUNIT_ASSERT(cond1->wasSuccess()); - } + cond1->wait(); + ASSERT_TRUE(cond1->wasSuccess()); +} - void testAsyncSubcribeAndUnsubscribe() { - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr cond1ptr(cond1); - SimpleWaitCondition* cond2 = new SimpleWaitCondition(); - std::auto_ptr cond2ptr(cond2); +TEST(SubscribeTest, testAsyncSubcribeAndUnsubscribe) { + SimpleWaitCondition* cond1 = new SimpleWaitCondition(); + std::auto_ptr cond1ptr(cond1); + SimpleWaitCondition* cond2 = new SimpleWaitCondition(); + std::auto_ptr cond2ptr(cond2); - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2)); + Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); + Hedwig::OperationCallbackPtr testcb2(new TestCallback(cond2)); - sub.asyncSubscribe("testTopic", "mySubscriberId-4", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); - cond1->wait(); - CPPUNIT_ASSERT(cond1->wasSuccess()); + sub.asyncSubscribe("testTopic", "mySubscriberId-4", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); + cond1->wait(); + ASSERT_TRUE(cond1->wasSuccess()); - sub.asyncUnsubscribe("testTopic", "mySubscriberId-4", testcb2); - cond2->wait(); - CPPUNIT_ASSERT(cond2->wasSuccess()); - } + sub.asyncUnsubscribe("testTopic", "mySubscriberId-4", testcb2); + cond2->wait(); + ASSERT_TRUE(cond2->wasSuccess()); +} - void testAsyncSubcribeAndSyncUnsubscribe() { - SimpleWaitCondition* cond1 = new SimpleWaitCondition(); - std::auto_ptr cond1ptr(cond1); +TEST(SubscribeTest, testAsyncSubcribeAndSyncUnsubscribe) { + SimpleWaitCondition* cond1 = new SimpleWaitCondition(); + std::auto_ptr cond1ptr(cond1); - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); + Hedwig::OperationCallbackPtr testcb1(new TestCallback(cond1)); - sub.asyncSubscribe("testTopic", "mySubscriberId-5", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); - cond1->wait(); - CPPUNIT_ASSERT(cond1->wasSuccess()); + sub.asyncSubscribe("testTopic", "mySubscriberId-5", Hedwig::SubscribeRequest::CREATE_OR_ATTACH, testcb1); + cond1->wait(); + ASSERT_TRUE(cond1->wasSuccess()); - sub.unsubscribe("testTopic", "mySubscriberId-5"); - } + sub.unsubscribe("testTopic", "mySubscriberId-5"); +} - void testAsyncSubcribeCloseSubscriptionAndThenResubscribe() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); +TEST(SubscribeTest, testAsyncSubcribeCloseSubscriptionAndThenResubscribe) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.closeSubscription("testTopic", "mySubscriberId-6"); - sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - sub.unsubscribe("testTopic", "mySubscriberId-6"); - } + sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + sub.closeSubscription("testTopic", "mySubscriberId-6"); + sub.subscribe("testTopic", "mySubscriberId-6", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + sub.unsubscribe("testTopic", "mySubscriberId-6"); +} - void testUnsubscribeWithoutSubscribe() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); +TEST(SubscribeTest, testUnsubscribeWithoutSubscribe) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - CPPUNIT_ASSERT_THROW(sub.unsubscribe("testTopic", "mySubscriberId-7"), Hedwig::NotSubscribedException); - } + ASSERT_THROW(sub.unsubscribe("testTopic", "mySubscriberId-7"), Hedwig::NotSubscribedException); +} - void testSubscribeTwice() { - Hedwig::Configuration* conf = new TestServerConfiguration(); - std::auto_ptr confptr(conf); +TEST(SubscribeTest, testSubscribeTwice) { + Hedwig::Configuration* conf = new TestServerConfiguration(); + std::auto_ptr confptr(conf); - Hedwig::Client* client = new Hedwig::Client(*conf); - std::auto_ptr clientptr(client); + Hedwig::Client* client = new Hedwig::Client(*conf); + std::auto_ptr clientptr(client); - Hedwig::Subscriber& sub = client->getSubscriber(); + Hedwig::Subscriber& sub = client->getSubscriber(); - sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); - CPPUNIT_ASSERT_THROW(sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH), Hedwig::AlreadySubscribedException); - } -}; + sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH); + ASSERT_THROW(sub.subscribe("testTopic", "mySubscriberId-8", Hedwig::SubscribeRequest::CREATE_OR_ATTACH), Hedwig::AlreadySubscribedException); +} -CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( SubscribeTestSuite, "Subscribe" ); Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Mon Jul 16 10:35:19 2012 @@ -23,11 +23,6 @@ #include #include - -#include -#include -#include - #include static log4cxx::LoggerPtr utillogger(log4cxx::Logger::getLogger("hedwig."__FILE__)); @@ -126,28 +121,3 @@ private: const int numThreads; }; - -class HedwigCppTextTestProgressListener : public CppUnit::TextTestProgressListener -{ - public: - void startTest( CppUnit::Test *test ) { - std::cout << "\n****\n\nStarting " << test->getName() << "\n\n****" << std::endl; - current_test = test->getName(); - } - - void addFailure( const CppUnit::TestFailure &failure ) { - std::cout << "\n!!!!!\n\nFailed\n\n!!!!!" << std::endl; - - } - - void endTestRun( CppUnit::Test *test, - CppUnit::TestResult *eventManager ) { - } - - std::string& getTestName() { - return current_test; - } - -private: - std::string current_test; -}; Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/utiltest.cpp URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/utiltest.cpp?rev=1361968&r1=1361967&r2=1361968&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/utiltest.cpp (original) +++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/utiltest.cpp Mon Jul 16 10:35:19 2012 @@ -19,76 +19,56 @@ #include #endif -#include -#include -#include +#include "gtest/gtest.h" #include "../lib/util.h" #include #include -using namespace CppUnit; +TEST(UtilTest, testHostAddress) { + // good address (no ports) + Hedwig::HostAddress a1 = Hedwig::HostAddress::fromString("www.yahoo.com"); + ASSERT_TRUE(a1.port() == 4080); + + // good address with ip (no ports) + Hedwig::HostAddress a2 = Hedwig::HostAddress::fromString("127.0.0.1"); + ASSERT_TRUE(a2.port() == 4080); + ASSERT_TRUE(a2.ip() == ((127 << 24) | 1)); + + // good address + Hedwig::HostAddress a3 = Hedwig::HostAddress::fromString("www.yahoo.com:80"); + ASSERT_TRUE(a3.port() == 80); + + // good address with ip + Hedwig::HostAddress a4 = Hedwig::HostAddress::fromString("127.0.0.1:80"); + ASSERT_TRUE(a4.port() == 80); + ASSERT_TRUE(a4.ip() == ((127 << 24) | 1)); + + // good address (with ssl) + Hedwig::HostAddress a5 = Hedwig::HostAddress::fromString("www.yahoo.com:80:443"); + ASSERT_TRUE(a5.port() == 80); + + // good address with ip + Hedwig::HostAddress a6 = Hedwig::HostAddress::fromString("127.0.0.1:80:443"); + ASSERT_TRUE(a6.port() == 80); + ASSERT_TRUE(a6.ip() == ((127 << 24) | 1)); -class UtilTestSuite : public CppUnit::TestFixture { - CPPUNIT_TEST_SUITE( UtilTestSuite ); - CPPUNIT_TEST(testHostAddress); - CPPUNIT_TEST_SUITE_END(); - -public: - void setUp() - { - } - - void tearDown() - { - } - - void testHostAddress() { - // good address (no ports) - Hedwig::HostAddress a1 = Hedwig::HostAddress::fromString("www.yahoo.com"); - CPPUNIT_ASSERT(a1.port() == 4080); - - // good address with ip (no ports) - Hedwig::HostAddress a2 = Hedwig::HostAddress::fromString("127.0.0.1"); - CPPUNIT_ASSERT(a2.port() == 4080); - CPPUNIT_ASSERT(a2.ip() == ((127 << 24) | 1)); - - // good address - Hedwig::HostAddress a3 = Hedwig::HostAddress::fromString("www.yahoo.com:80"); - CPPUNIT_ASSERT(a3.port() == 80); - - // good address with ip - Hedwig::HostAddress a4 = Hedwig::HostAddress::fromString("127.0.0.1:80"); - CPPUNIT_ASSERT(a4.port() == 80); - CPPUNIT_ASSERT(a4.ip() == ((127 << 24) | 1)); - - // good address (with ssl) - Hedwig::HostAddress a5 = Hedwig::HostAddress::fromString("www.yahoo.com:80:443"); - CPPUNIT_ASSERT(a5.port() == 80); - - // good address with ip - Hedwig::HostAddress a6 = Hedwig::HostAddress::fromString("127.0.0.1:80:443"); - CPPUNIT_ASSERT(a6.port() == 80); - CPPUNIT_ASSERT(a6.ip() == ((127 << 24) | 1)); - - // nothing - CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString(""), Hedwig::HostResolutionException); + // nothing + ASSERT_THROW(Hedwig::HostAddress::fromString(""), Hedwig::HostResolutionException); - // nothing but colons - CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString("::::::::::::::::"), Hedwig::ConfigurationException); + // nothing but colons + ASSERT_THROW(Hedwig::HostAddress::fromString("::::::::::::::::"), Hedwig::ConfigurationException); - // only port number - CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString(":80"), Hedwig::HostResolutionException); + // only port number + ASSERT_THROW(Hedwig::HostAddress::fromString(":80"), Hedwig::HostResolutionException); - // text after colon (isn't supported) - CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString("www.yahoo.com:http"), Hedwig::ConfigurationException); + // text after colon (isn't supported) + ASSERT_THROW(Hedwig::HostAddress::fromString("www.yahoo.com:http"), Hedwig::ConfigurationException); - // invalid hostname - CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString("com.oohay.www:80"), Hedwig::HostResolutionException); + // invalid hostname + ASSERT_THROW(Hedwig::HostAddress::fromString("com.oohay.www:80"), Hedwig::HostResolutionException); - // null - CPPUNIT_ASSERT_THROW(Hedwig::HostAddress::fromString(NULL), std::logic_error); - } -}; + // null + ASSERT_THROW(Hedwig::HostAddress::fromString(NULL), std::logic_error); +} -CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( UtilTestSuite, "Util" );