Repository: zookeeper
Updated Branches:
refs/heads/master 536dda2c4 -> 28de451aa
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/28de451a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc
new file mode 100644
index 0000000..5446d9b
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestClient.cc
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <pthread.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/select.h>
+#include <cppunit/TestAssert.h>
+
+
+using namespace std;
+
+#include <cstring>
+#include <list>
+
+#include <zookeeper.h>
+#include <zoo_queue.h>
+
+static void yield(zhandle_t *zh, int i)
+{
+ sleep(i);
+}
+
+typedef struct evt {
+ string path;
+ int type;
+} evt_t;
+
+typedef struct watchCtx {
+private:
+ list<evt_t> events;
+public:
+ bool connected;
+ zhandle_t *zh;
+
+ watchCtx() {
+ connected = false;
+ zh = 0;
+ }
+ ~watchCtx() {
+ if (zh) {
+ zookeeper_close(zh);
+ zh = 0;
+ }
+ }
+
+ evt_t getEvent() {
+ evt_t evt;
+ evt = events.front();
+ events.pop_front();
+ return evt;
+ }
+
+ int countEvents() {
+ int count;
+ count = events.size();
+ return count;
+ }
+
+ void putEvent(evt_t evt) {
+ events.push_back(evt);
+ }
+
+ bool waitForConnected(zhandle_t *zh) {
+ time_t expires = time(0) + 10;
+ while(!connected && time(0) < expires) {
+ yield(zh, 1);
+ }
+ return connected;
+ }
+ bool waitForDisconnected(zhandle_t *zh) {
+ time_t expires = time(0) + 15;
+ while(connected && time(0) < expires) {
+ yield(zh, 1);
+ }
+ return !connected;
+ }
+} watchctx_t;
+
+extern "C" {
+
+ const char *thread_test_string="Hello World!";
+
+ void *offer_thread_shared_queue(void *queue_handle){
+ zkr_queue_t *queue = (zkr_queue_t *) queue_handle;
+
+ int test_string_buffer_length = strlen(thread_test_string) + 1;
+ int offer_rc = zkr_queue_offer(queue, thread_test_string, test_string_buffer_length);
+ pthread_exit(NULL);
+ }
+
+ void *take_thread_shared_queue(void *queue_handle){
+ zkr_queue_t *queue = (zkr_queue_t *) queue_handle;
+
+ int test_string_buffer_length = strlen(thread_test_string) + 1;
+ int receive_buffer_capacity = test_string_buffer_length;
+ int receive_buffer_length = receive_buffer_capacity;
+ char *receive_buffer = (char *) malloc(sizeof(char) * receive_buffer_capacity);
+
+ int remove_rc = zkr_queue_take(queue, receive_buffer, &receive_buffer_length);
+ switch(remove_rc){
+ case ZOK:
+ pthread_exit(receive_buffer);
+ default:
+ free(receive_buffer);
+ pthread_exit(NULL);
+ }
+ }
+
+ int valid_test_string(void *result){
+ char *result_string = (char *) result;
+ return !strncmp(result_string, thread_test_string, strlen(thread_test_string));
+ }
+}
+
+class Zookeeper_queuetest : public CPPUNIT_NS::TestFixture
+{
+ CPPUNIT_TEST_SUITE(Zookeeper_queuetest);
+ CPPUNIT_TEST(testInitDestroy);
+ CPPUNIT_TEST(testOffer1);
+ CPPUNIT_TEST(testOfferRemove1);
+ CPPUNIT_TEST(testOfferRemove2);
+ CPPUNIT_TEST(testOfferRemove3);
+ CPPUNIT_TEST(testOfferRemove4);
+ CPPUNIT_TEST(testOfferRemove5);
+ CPPUNIT_TEST(testOfferRemove6);
+ CPPUNIT_TEST(testOfferTake1);
+ CPPUNIT_TEST(testOfferTake2);
+ CPPUNIT_TEST(testOfferTake3);
+ CPPUNIT_TEST(testOfferTake4);
+ CPPUNIT_TEST(testOfferTake5);
+ CPPUNIT_TEST(testOfferTake6);
+ CPPUNIT_TEST_SUITE_END();
+
+ static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
+ watchctx_t *ctx = (watchctx_t*)v;
+
+ if (state == ZOO_CONNECTED_STATE) {
+ ctx->connected = true;
+ } else {
+ ctx->connected = false;
+ }
+ if (type != ZOO_SESSION_EVENT) {
+ evt_t evt;
+ evt.path = path;
+ evt.type = type;
+ ctx->putEvent(evt);
+ }
+ }
+
+ static const char hostPorts[];
+
+ const char *getHostPorts() {
+ return hostPorts;
+ }
+
+ zhandle_t *createClient(watchctx_t *ctx) {
+ zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0,
+ ctx, 0);
+ ctx->zh = zk;
+ sleep(1);
+ return zk;
+ }
+
+public:
+
+#define ZKSERVER_CMD "./tests/zkServer.sh"
+
+ void setUp()
+ {
+ char cmd[1024];
+ sprintf(cmd, "%s startClean %s", ZKSERVER_CMD, getHostPorts());
+ CPPUNIT_ASSERT(system(cmd) == 0);
+ }
+
+
+ void startServer() {
+ char cmd[1024];
+ sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts());
+ CPPUNIT_ASSERT(system(cmd) == 0);
+ }
+
+ void stopServer() {
+ tearDown();
+ }
+
+ void tearDown()
+ {
+ char cmd[1024];
+ sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts());
+ CPPUNIT_ASSERT(system(cmd) == 0);
+ }
+
+ void initializeQueuesAndHandles(int num_clients, zhandle_t *zoohandles[],
+ watchctx_t ctxs[], zkr_queue_t queues[], char *path){
+ int i;
+ for(i=0; i< num_clients; i++){
+ zoohandles[i] = createClient(&ctxs[i]);
+ zkr_queue_init(&queues[i], zoohandles[i], path, &ZOO_OPEN_ACL_UNSAFE);
+ }
+ }
+
+ void cleanUpQueues(int num_clients, zkr_queue_t queues[]){
+ int i;
+ for(i=0; i < num_clients; i++){
+ zkr_queue_destroy(&queues[i]);
+ }
+ }
+
+ void testInitDestroy(){
+ int num_clients = 1;
+ watchctx_t ctxs[num_clients];
+ zhandle_t *zoohandles[num_clients];
+ zkr_queue_t queues[num_clients];
+ char *path= (char *)"/testInitDestroy";
+
+ int i;
+ for(i=0; i< num_clients; i++){
+ zoohandles[i] = createClient(&ctxs[i]);
+ zkr_queue_init(&queues[i], zoohandles[i], path, &ZOO_OPEN_ACL_UNSAFE);
+ }
+
+ for(i=0; i< num_clients; i++){
+ zkr_queue_destroy(&queues[i]);
+ }
+
+ }
+
+ void testOffer1(){
+ int num_clients = 1;
+ watchctx_t ctxs[num_clients];
+ zhandle_t *zoohandles[num_clients];
+ zkr_queue_t queues[num_clients];
+ char *path= (char *)"/testOffer1";
+
+ initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+ const char *test_string="Hello World!";
+ int test_string_length = strlen(test_string);
+ int test_string_buffer_length = test_string_length + 1;
+ char buffer[test_string_buffer_length];
+
+ int offer_rc = zkr_queue_offer(&queues[0], test_string, test_string_buffer_length);
+ CPPUNIT_ASSERT(offer_rc == ZOK);
+
+ int removed_element_buffer_length = test_string_buffer_length;
+ int remove_rc = zkr_queue_remove(&queues[0], buffer, &removed_element_buffer_length);
+ CPPUNIT_ASSERT(remove_rc == ZOK);
+ CPPUNIT_ASSERT(removed_element_buffer_length == test_string_buffer_length);
+ CPPUNIT_ASSERT(strncmp(test_string,buffer,test_string_length)==0);
+
+ cleanUpQueues(num_clients,queues);
+ }
+
+ void create_n_remove_m(char *path, int n, int m){
+ int num_clients = 2;
+ watchctx_t ctxs[num_clients];
+ zhandle_t *zoohandles[num_clients];
+ zkr_queue_t queues[num_clients];
+
+ initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+ int i;
+ int max_digits = sizeof(int)*3;
+ const char *test_string = "Hello World!";
+ int buffer_length = strlen(test_string) + max_digits + 1;
+ char correct_buffer[buffer_length];
+ char receive_buffer[buffer_length];
+
+ for(i = 0; i < n; i++){
+ snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+ int offer_rc = zkr_queue_offer(&queues[0], correct_buffer, buffer_length);
+ CPPUNIT_ASSERT(offer_rc == ZOK);
+ }
+ printf("Offers\n");
+ for(i=0; i<m ;i++){
+ snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+ int receive_buffer_length=buffer_length;
+ int remove_rc = zkr_queue_remove(&queues[1], receive_buffer, &receive_buffer_length);
+ CPPUNIT_ASSERT(remove_rc == ZOK);
+ if(i >=n){
+ CPPUNIT_ASSERT(receive_buffer_length == -1);
+ }else{
+ CPPUNIT_ASSERT(strncmp(correct_buffer,receive_buffer, buffer_length)==0);
+ }
+ }
+
+ cleanUpQueues(num_clients,queues);
+ }
+
+ void testOfferRemove1(){
+ create_n_remove_m((char *)"/testOfferRemove1", 0,1);
+ }
+
+ void testOfferRemove2(){
+ create_n_remove_m((char *)"/testOfferRemove2", 1,1);
+ }
+
+ void testOfferRemove3(){
+ create_n_remove_m((char *)"/testOfferRemove3", 10,1);
+ }
+
+ void testOfferRemove4(){
+ create_n_remove_m((char *)"/testOfferRemove4", 10,10);
+ }
+
+ void testOfferRemove5(){
+ create_n_remove_m((char *)"/testOfferRemove5", 10,5);
+ }
+
+ void testOfferRemove6(){
+ create_n_remove_m((char *)"/testOfferRemove6", 10,11);
+ }
+
+ void create_n_take_m(char *path, int n, int m){
+ CPPUNIT_ASSERT(m<=n);
+ int num_clients = 2;
+ watchctx_t ctxs[num_clients];
+ zhandle_t *zoohandles[num_clients];
+ zkr_queue_t queues[num_clients];
+
+ initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+ int i;
+ int max_digits = sizeof(int)*3;
+ const char *test_string = "Hello World!";
+ int buffer_length = strlen(test_string) + max_digits + 1;
+ char correct_buffer[buffer_length];
+ char receive_buffer[buffer_length];
+
+ for(i = 0; i < n; i++){
+ snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+ int offer_rc = zkr_queue_offer(&queues[0], correct_buffer, buffer_length);
+ CPPUNIT_ASSERT(offer_rc == ZOK);
+ }
+ printf("Offers\n");
+ for(i=0; i<m ;i++){
+ snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+ int receive_buffer_length=buffer_length;
+ int remove_rc = zkr_queue_take(&queues[1], receive_buffer, &receive_buffer_length);
+ CPPUNIT_ASSERT(remove_rc == ZOK);
+ if(i >=n){
+ CPPUNIT_ASSERT(receive_buffer_length == -1);
+ }else{
+ CPPUNIT_ASSERT(strncmp(correct_buffer,receive_buffer, buffer_length)==0);
+ }
+ }
+
+ cleanUpQueues(num_clients,queues);
+ }
+
+ void testOfferTake1(){
+ create_n_take_m((char *)"/testOfferTake1", 2,1);
+ }
+
+ void testOfferTake2(){
+ create_n_take_m((char *)"/testOfferTake2", 1,1);
+ }
+
+ void testOfferTake3(){
+ create_n_take_m((char *)"/testOfferTake3", 10,1);
+ }
+
+ void testOfferTake4(){
+ create_n_take_m((char *)"/testOfferTake4", 10,10);
+ }
+
+ void testOfferTake5(){
+ create_n_take_m((char *)"/testOfferTake5", 10,5);
+ }
+
+ void testOfferTake6(){
+ create_n_take_m((char *)"/testOfferTake6", 12,11);
+ }
+
+ void testTakeThreaded(){
+ int num_clients = 1;
+ watchctx_t ctxs[num_clients];
+ zhandle_t *zoohandles[num_clients];
+ zkr_queue_t queues[num_clients];
+ char *path=(char *)"/testTakeThreaded";
+
+ initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+ pthread_t take_thread;
+
+ pthread_create(&take_thread, NULL, take_thread_shared_queue, (void *) &queues[0]);
+
+ usleep(1000);
+
+ pthread_t offer_thread;
+ pthread_create(&offer_thread, NULL, offer_thread_shared_queue, (void *) &queues[0]);
+ pthread_join(offer_thread, NULL);
+
+ void *take_thread_result;
+ pthread_join(take_thread, &take_thread_result);
+ CPPUNIT_ASSERT(take_thread_result != NULL);
+ CPPUNIT_ASSERT(valid_test_string(take_thread_result));
+
+ cleanUpQueues(num_clients,queues);
+ }
+
+ void testTakeThreaded2(){
+ int num_clients = 1;
+ watchctx_t ctxs[num_clients];
+ zhandle_t *zoohandles[num_clients];
+ zkr_queue_t queues[num_clients];
+ char *path=(char *)"/testTakeThreaded2";
+
+ initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+ int take_attempts;
+ int num_take_attempts = 2;
+ for(take_attempts=0; take_attempts < num_take_attempts; take_attempts++){
+ pthread_t take_thread;
+
+ pthread_create(&take_thread, NULL, take_thread_shared_queue, (void *) &queues[0]);
+
+ usleep(1000);
+
+ pthread_t offer_thread;
+ pthread_create(&offer_thread, NULL, offer_thread_shared_queue, (void *) &queues[0]);
+ pthread_join(offer_thread, NULL);
+
+ void *take_thread_result;
+ pthread_join(take_thread, &take_thread_result);
+ CPPUNIT_ASSERT(take_thread_result != NULL);
+ CPPUNIT_ASSERT(valid_test_string(take_thread_result));
+
+ }
+ cleanUpQueues(num_clients,queues);
+ }
+};
+
+const char Zookeeper_queuetest::hostPorts[] = "127.0.0.1:22181";
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_queuetest);
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/28de451a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc
new file mode 100644
index 0000000..2b818f4
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/TestDriver.cc
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <cppunit/TestRunner.h>
+#include <cppunit/CompilerOutputter.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/BriefTestProgressListener.h>
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <stdexcept>
+#include <cppunit/Exception.h>
+#include <cppunit/TestFailure.h>
+#include <cppunit/XmlOutputter.h>
+#include <fstream>
+
+#include "Util.h"
+
+using namespace std;
+
+CPPUNIT_NS_BEGIN
+
+class EclipseOutputter: public CompilerOutputter
+{
+public:
+ EclipseOutputter(TestResultCollector *result,ostream &stream):
+ CompilerOutputter(result,stream,"%p:%l: "),stream_(stream)
+ {
+ }
+ virtual void printFailedTestName( TestFailure *failure ){}
+ virtual void printFailureMessage( TestFailure *failure )
+ {
+ stream_<<": ";
+ Message msg = failure->thrownException()->message();
+ stream_<< msg.shortDescription();
+
+ string text;
+ for(int i=0; i<msg.detailCount();i++){
+ text+=msg.detailAt(i);
+ if(i+1!=msg.detailCount())
+ text+=", ";
+ }
+ if(text.length()!=0)
+ stream_ <<" ["<<text<<"]";
+ stream_<<"\n";
+ }
+ ostream& stream_;
+};
+
+CPPUNIT_NS_END
+
+int main( int argc, char* argv[] ) {
+ // if command line contains "-ide" then this is the post build check
+ // => the output must be in the compiler error format.
+ //bool selfTest = (argc > 1) && (std::string("-ide") == argv[1]);
+ globalTestConfig.addConfigFromCmdLine(argc,argv);
+
+ // Create the event manager and test controller
+ CPPUNIT_NS::TestResult controller;
+ // Add a listener that colllects test result
+ CPPUNIT_NS::TestResultCollector result;
+ controller.addListener( &result );
+
+ // Add a listener that print dots as tests run.
+ // CPPUNIT_NS::TextTestProgressListener progress;
+ CPPUNIT_NS::BriefTestProgressListener progress;
+ controller.addListener( &progress );
+
+ CPPUNIT_NS::TestRunner runner;
+ runner.addTest( CPPUNIT_NS::TestFactoryRegistry::getRegistry().makeTest() );
+
+ try
+ {
+ cout << "Running " << globalTestConfig.getTestName();
+ runner.run( controller, globalTestConfig.getTestName());
+ cout<<endl;
+
+ // Print test in a compiler compatible format.
+ CPPUNIT_NS::EclipseOutputter outputter( &result,cout);
+ outputter.write();
+
+ // Uncomment this for XML output
+#ifdef ENABLE_XML_OUTPUT
+ std::ofstream file( "tests.xml" );
+ CPPUNIT_NS::XmlOutputter xml( &result, file );
+ xml.setStyleSheet( "report.xsl" );
+ xml.write();
+ file.close();
+#endif
+ }
+ catch ( std::invalid_argument &e ) // Test path not resolved
+ {
+ cout<<"\nERROR: "<<e.what()<<endl;
+ return 0;
+ }
+
+ return result.wasSuccessful() ? 0 : 1;
+ }
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/28de451a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc
new file mode 100644
index 0000000..26a9a09
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.cc
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Util.h"
+
+const std::string EMPTY_STRING;
+
+TestConfig globalTestConfig;
+
+void millisleep(int ms){
+ timespec ts;
+ ts.tv_sec=ms/1000;
+ ts.tv_nsec=(ms%1000)*1000000; // to nanoseconds
+ nanosleep(&ts,0);
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/28de451a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h
new file mode 100644
index 0000000..95f5420
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/Util.h
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef UTIL_H_
+#define UTIL_H_
+
+#include <map>
+#include <vector>
+#include <string>
+
+// number of elements in array
+#define COUNTOF(array) sizeof(array)/sizeof(array[0])
+
+#define DECLARE_WRAPPER(ret,sym,sig) \
+ extern "C" ret __real_##sym sig; \
+ extern "C" ret __wrap_##sym sig
+
+#define CALL_REAL(sym,params) \
+ __real_##sym params
+
+// must include "src/zookeeper_log.h" to be able to use this macro
+#define TEST_TRACE(x) \
+ log_message(3,__LINE__,__func__,format_log_message x)
+
+extern const std::string EMPTY_STRING;
+
+// *****************************************************************************
+// A bit of wizardry to get to the bare type from a reference or a pointer
+// to the type
+template <class T>
+struct TypeOp {
+ typedef T BareT;
+ typedef T ArgT;
+};
+
+// partial specialization for reference types
+template <class T>
+struct TypeOp<T&>{
+ typedef T& ArgT;
+ typedef typename TypeOp<T>::BareT BareT;
+};
+
+// partial specialization for pointers
+template <class T>
+struct TypeOp<T*>{
+ typedef T* ArgT;
+ typedef typename TypeOp<T>::BareT BareT;
+};
+
+// *****************************************************************************
+// Container utilities
+
+template <class K, class V>
+void putValue(std::map<K,V>& map,const K& k, const V& v){
+ typedef std::map<K,V> Map;
+ typename Map::const_iterator it=map.find(k);
+ if(it==map.end())
+ map.insert(typename Map::value_type(k,v));
+ else
+ map[k]=v;
+}
+
+template <class K, class V>
+bool getValue(const std::map<K,V>& map,const K& k,V& v){
+ typedef std::map<K,V> Map;
+ typename Map::const_iterator it=map.find(k);
+ if(it==map.end())
+ return false;
+ v=it->second;
+ return true;
+}
+
+// *****************************************************************************
+// misc utils
+
+// millisecond sleep
+void millisleep(int ms);
+// evaluate given predicate until it returns true or the timeout
+// (in millis) has expired
+template<class Predicate>
+int ensureCondition(const Predicate& p,int timeout){
+ int elapsed=0;
+ while(!p() && elapsed<timeout){
+ millisleep(2);
+ elapsed+=2;
+ }
+ return elapsed;
+};
+
+// *****************************************************************************
+// test global configuration data
+class TestConfig{
+ typedef std::vector<std::string> CmdLineOptList;
+public:
+ typedef CmdLineOptList::const_iterator const_iterator;
+ TestConfig(){}
+ ~TestConfig(){}
+ void addConfigFromCmdLine(int argc, char* argv[]){
+ if(argc>=2)
+ testName_=argv[1];
+ for(int i=2; i<argc;++i)
+ cmdOpts_.push_back(argv[i]);
+ }
+ const_iterator getExtraOptBegin() const {return cmdOpts_.begin();}
+ const_iterator getExtraOptEnd() const {return cmdOpts_.end();}
+ size_t getExtraOptCount() const {
+ return cmdOpts_.size();
+ }
+ const std::string& getTestName() const {
+ return testName_=="all"?EMPTY_STRING:testName_;
+ }
+private:
+ CmdLineOptList cmdOpts_;
+ std::string testName_;
+};
+
+extern TestConfig globalTestConfig;
+
+#endif /*UTIL_H_*/
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/28de451a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh
new file mode 100755
index 0000000..a22fd30
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/c/tests/zkServer.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+if [ "x$1" == "x" ]
+then
+ echo "USAGE: $0 startClean|start|stop hostPorts"
+ exit 2
+fi
+
+if [ "x$1" == "xstartClean" ]
+then
+ rm -rf /tmp/zkdata
+fi
+
+# Make sure nothing is left over from before
+if [ -r "/tmp/zk.pid" ]
+then
+pid=`cat /tmp/zk.pid`
+kill -9 $pid
+rm -f /tmp/zk.pid
+fi
+
+base_dir="../../../../.."
+
+CLASSPATH="$CLASSPATH:${base_dir}/build/classes"
+CLASSPATH="$CLASSPATH:${base_dir}/conf"
+
+for f in "${base_dir}"/zookeeper-*.jar
+do
+ CLASSPATH="$CLASSPATH:$f"
+done
+
+for i in "${base_dir}"/build/lib/*.jar
+do
+ CLASSPATH="$CLASSPATH:$i"
+done
+
+for i in "${base_dir}"/src/java/lib/*.jar
+do
+ CLASSPATH="$CLASSPATH:$i"
+done
+
+CLASSPATH="$CLASSPATH:${CLOVER_HOME}/lib/clover.jar"
+
+case $1 in
+start|startClean)
+ mkdir -p /tmp/zkdata
+ java -cp $CLASSPATH org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata &>
/tmp/zk.log &
+ echo $! > /tmp/zk.pid
+ sleep 5
+ ;;
+stop)
+ # Already killed above
+ ;;
+*)
+ echo "Unknown command " + $1
+ exit 2
+esac
+
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/28de451a/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
b/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
new file mode 100644
index 0000000..c35c332
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
@@ -0,0 +1,314 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ * A <a href="package.html">protocol to implement a distributed queue</a>.
+ *
+ */
+
+public class DistributedQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
+
+ private final String dir;
+
+ private ZooKeeper zookeeper;
+ private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ private final String prefix = "qn-";
+
+
+ public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){
+ this.dir = dir;
+
+ if(acl != null){
+ this.acl = acl;
+ }
+ this.zookeeper = zookeeper;
+
+ }
+
+
+
+ /**
+ * Returns a Map of the children, ordered by id.
+ * @param watcher optional watcher on getChildren() operation.
+ * @return map from id to child name for all children
+ */
+ private Map<Long,String> orderedChildren(Watcher watcher) throws KeeperException,
InterruptedException {
+ Map<Long,String> orderedChildren = new TreeMap<Long,String>();
+
+ List<String> childNames = null;
+ try{
+ childNames = zookeeper.getChildren(dir, watcher);
+ }catch (KeeperException.NoNodeException e){
+ throw e;
+ }
+
+ for(String childName : childNames){
+ try{
+ //Check format
+ if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+ LOG.warn("Found child node with improper name: " + childName);
+ continue;
+ }
+ String suffix = childName.substring(prefix.length());
+ Long childId = new Long(suffix);
+ orderedChildren.put(childId,childName);
+ }catch(NumberFormatException e){
+ LOG.warn("Found child node with improper format : " + childName + " " + e,e);
+ }
+ }
+
+ return orderedChildren;
+ }
+
+ /**
+ * Find the smallest child node.
+ * @return The name of the smallest child node.
+ */
+ private String smallestChildName() throws KeeperException, InterruptedException {
+ long minId = Long.MAX_VALUE;
+ String minName = "";
+
+ List<String> childNames = null;
+
+ try{
+ childNames = zookeeper.getChildren(dir, false);
+ }catch(KeeperException.NoNodeException e){
+ LOG.warn("Caught: " +e,e);
+ return null;
+ }
+
+ for(String childName : childNames){
+ try{
+ //Check format
+ if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+ LOG.warn("Found child node with improper name: " + childName);
+ continue;
+ }
+ String suffix = childName.substring(prefix.length());
+ long childId = Long.parseLong(suffix);
+ if(childId < minId){
+ minId = childId;
+ minName = childName;
+ }
+ }catch(NumberFormatException e){
+ LOG.warn("Found child node with improper format : " + childName + " " + e,e);
+ }
+ }
+
+
+ if(minId < Long.MAX_VALUE){
+ return minName;
+ }else{
+ return null;
+ }
+ }
+
+ /**
+ * Return the head of the queue without modifying the queue.
+ * @return the data at the head of the queue.
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException
{
+ Map<Long,String> orderedChildren;
+
+ // element, take, and remove follow the same pattern.
+ // We want to return the child node with the smallest sequence number.
+ // Since other clients are remove()ing and take()ing nodes concurrently,
+ // the child with the smallest sequence number in orderedChildren might be gone by
the time we check.
+ // We don't call getChildren again until we have tried the rest of the nodes in sequence
order.
+ while(true){
+ try{
+ orderedChildren = orderedChildren(null);
+ }catch(KeeperException.NoNodeException e){
+ throw new NoSuchElementException();
+ }
+ if(orderedChildren.size() == 0 ) throw new NoSuchElementException();
+
+ for(String headNode : orderedChildren.values()){
+ if(headNode != null){
+ try{
+ return zookeeper.getData(dir+"/"+headNode, false, null);
+ }catch(KeeperException.NoNodeException e){
+ //Another client removed the node first, try next
+ }
+ }
+ }
+
+ }
+ }
+
+
+ /**
+ * Attempts to remove the head of the queue and return it.
+ * @return The former head of the queue
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException
{
+ Map<Long,String> orderedChildren;
+ // Same as for element. Should refactor this.
+ while(true){
+ try{
+ orderedChildren = orderedChildren(null);
+ }catch(KeeperException.NoNodeException e){
+ throw new NoSuchElementException();
+ }
+ if(orderedChildren.size() == 0) throw new NoSuchElementException();
+
+ for(String headNode : orderedChildren.values()){
+ String path = dir +"/"+headNode;
+ try{
+ byte[] data = zookeeper.getData(path, false, null);
+ zookeeper.delete(path, -1);
+ return data;
+ }catch(KeeperException.NoNodeException e){
+ // Another client deleted the node first.
+ }
+ }
+
+ }
+ }
+
+ private class LatchChildWatcher implements Watcher {
+
+ CountDownLatch latch;
+
+ public LatchChildWatcher(){
+ latch = new CountDownLatch(1);
+ }
+
+ public void process(WatchedEvent event){
+ LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
+ event.getState() + " type " + event.getType());
+ latch.countDown();
+ }
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+ }
+
+ /**
+ * Removes the head of the queue and returns it, blocks until it succeeds.
+ * @return The former head of the queue
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] take() throws KeeperException, InterruptedException {
+ Map<Long,String> orderedChildren;
+ // Same as for element. Should refactor this.
+ while(true){
+ LatchChildWatcher childWatcher = new LatchChildWatcher();
+ try{
+ orderedChildren = orderedChildren(childWatcher);
+ }catch(KeeperException.NoNodeException e){
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ continue;
+ }
+ if(orderedChildren.size() == 0){
+ childWatcher.await();
+ continue;
+ }
+
+ for(String headNode : orderedChildren.values()){
+ String path = dir +"/"+headNode;
+ try{
+ byte[] data = zookeeper.getData(path, false, null);
+ zookeeper.delete(path, -1);
+ return data;
+ }catch(KeeperException.NoNodeException e){
+ // Another client deleted the node first.
+ }
+ }
+ }
+ }
+
+ /**
+ * Inserts data into queue.
+ * @param data
+ * @return true if data was successfully added
+ */
+ public boolean offer(byte[] data) throws KeeperException, InterruptedException{
+ for(;;){
+ try{
+ zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
+ return true;
+ }catch(KeeperException.NoNodeException e){
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ }
+ }
+
+ }
+
+ /**
+ * Returns the data at the first element of the queue, or null if the queue is empty.
+ * @return data at the first element of the queue, or null.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] peek() throws KeeperException, InterruptedException{
+ try{
+ return element();
+ }catch(NoSuchElementException e){
+ return null;
+ }
+ }
+
+
+ /**
+ * Attempts to remove the head of the queue and return it. Returns null if the queue
is empty.
+ * @return Head of the queue or null.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] poll() throws KeeperException, InterruptedException {
+ try{
+ return remove();
+ }catch(NoSuchElementException e){
+ return null;
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/28de451a/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
b/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
new file mode 100644
index 0000000..c6cfae2
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
@@ -0,0 +1,286 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.NoSuchElementException;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+public class DistributedQueueTest extends ClientBase {
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ LOG.info("FINISHED " + getTestName());
+ }
+
+
+ @Test
+ public void testOffer1() throws Exception {
+ String dir = "/testOffer1";
+ String testString = "Hello World";
+ final int num_clients = 1;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ queueHandles[0].offer(testString.getBytes());
+
+ byte dequeuedBytes[] = queueHandles[0].remove();
+ Assert.assertEquals(new String(dequeuedBytes), testString);
+ }
+
+ @Test
+ public void testOffer2() throws Exception {
+ String dir = "/testOffer2";
+ String testString = "Hello World";
+ final int num_clients = 2;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ queueHandles[0].offer(testString.getBytes());
+
+ byte dequeuedBytes[] = queueHandles[1].remove();
+ Assert.assertEquals(new String(dequeuedBytes), testString);
+ }
+
+ @Test
+ public void testTake1() throws Exception {
+ String dir = "/testTake1";
+ String testString = "Hello World";
+ final int num_clients = 1;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ queueHandles[0].offer(testString.getBytes());
+
+ byte dequeuedBytes[] = queueHandles[0].take();
+ Assert.assertEquals(new String(dequeuedBytes), testString);
+ }
+
+
+
+ @Test
+ public void testRemove1() throws Exception{
+ String dir = "/testRemove1";
+ String testString = "Hello World";
+ final int num_clients = 1;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ try{
+ queueHandles[0].remove();
+ }catch(NoSuchElementException e){
+ return;
+ }
+ Assert.assertTrue(false);
+ }
+
+ public void createNremoveMtest(String dir,int n,int m) throws Exception{
+ String testString = "Hello World";
+ final int num_clients = 2;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ for(int i=0; i< n; i++){
+ String offerString = testString + i;
+ queueHandles[0].offer(offerString.getBytes());
+ }
+
+ byte data[] = null;
+ for(int i=0; i<m; i++){
+ data=queueHandles[1].remove();
+ }
+ Assert.assertEquals(new String(data), testString+(m-1));
+ }
+
+ @Test
+ public void testRemove2() throws Exception{
+ createNremoveMtest("/testRemove2",10,2);
+ }
+ @Test
+ public void testRemove3() throws Exception{
+ createNremoveMtest("/testRemove3",1000,1000);
+ }
+
+ public void createNremoveMelementTest(String dir,int n,int m) throws Exception{
+ String testString = "Hello World";
+ final int num_clients = 2;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ for(int i=0; i< n; i++){
+ String offerString = testString + i;
+ queueHandles[0].offer(offerString.getBytes());
+ }
+
+ byte data[] = null;
+ for(int i=0; i<m; i++){
+ data=queueHandles[1].remove();
+ }
+ Assert.assertEquals(new String(queueHandles[1].element()), testString+m);
+ }
+
+ @Test
+ public void testElement1() throws Exception {
+ createNremoveMelementTest("/testElement1",1,0);
+ }
+
+ @Test
+ public void testElement2() throws Exception {
+ createNremoveMelementTest("/testElement2",10,2);
+ }
+
+ @Test
+ public void testElement3() throws Exception {
+ createNremoveMelementTest("/testElement3",1000,500);
+ }
+
+ @Test
+ public void testElement4() throws Exception {
+ createNremoveMelementTest("/testElement4",1000,1000-1);
+ }
+
+ @Test
+ public void testTakeWait1() throws Exception{
+ String dir = "/testTakeWait1";
+ final String testString = "Hello World";
+ final int num_clients = 1;
+ final ZooKeeper clients[] = new ZooKeeper[num_clients];
+ final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ final byte[] takeResult[] = new byte[1][];
+ Thread takeThread = new Thread(){
+ public void run(){
+ try{
+ takeResult[0] = queueHandles[0].take();
+ }catch(KeeperException e){
+
+ }catch(InterruptedException e){
+
+ }
+ }
+ };
+ takeThread.start();
+
+ Thread.sleep(1000);
+ Thread offerThread= new Thread() {
+ public void run(){
+ try {
+ queueHandles[0].offer(testString.getBytes());
+ } catch (KeeperException e) {
+
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ offerThread.start();
+ offerThread.join();
+
+ takeThread.join();
+
+ Assert.assertTrue(takeResult[0] != null);
+ Assert.assertEquals(new String(takeResult[0]), testString);
+ }
+
+ @Test
+ public void testTakeWait2() throws Exception{
+ String dir = "/testTakeWait2";
+ final String testString = "Hello World";
+ final int num_clients = 1;
+ final ZooKeeper clients[] = new ZooKeeper[num_clients];
+ final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+ int num_attempts =2;
+ for(int i=0; i< num_attempts; i++){
+ final byte[] takeResult[] = new byte[1][];
+ final String threadTestString = testString + i;
+ Thread takeThread = new Thread(){
+ public void run(){
+ try{
+ takeResult[0] = queueHandles[0].take();
+ }catch(KeeperException e){
+
+ }catch(InterruptedException e){
+
+ }
+ }
+ };
+ takeThread.start();
+
+ Thread.sleep(1000);
+ Thread offerThread= new Thread() {
+ public void run(){
+ try {
+ queueHandles[0].offer(threadTestString.getBytes());
+ } catch (KeeperException e) {
+
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ offerThread.start();
+ offerThread.join();
+
+ takeThread.join();
+
+ Assert.assertTrue(takeResult[0] != null);
+ Assert.assertEquals(new String(takeResult[0]), threadTestString);
+ }
+ }
+}
+
|