zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [01/10] zookeeper git commit: ZOOKEEPER-3033: MAVEN MIGRATION - fix directory structure
Date Fri, 17 Aug 2018 09:56:17 GMT
Repository: zookeeper
Updated Branches:
  refs/heads/master b5d6786b9 -> 2584625cd


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestClient.cc
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestClient.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestClient.cc
new file mode 100644
index 0000000..5446d9b
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestClient.cc
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <pthread.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/select.h>
+#include <cppunit/TestAssert.h>
+
+
+using namespace std;
+
+#include <cstring>
+#include <list>
+
+#include <zookeeper.h>
+#include <zoo_queue.h>
+
+static void yield(zhandle_t *zh, int i)
+{
+    sleep(i);
+}
+
+typedef struct evt {
+    string path;
+    int type;
+} evt_t;
+
+typedef struct watchCtx {
+private:
+    list<evt_t> events;
+public:
+    bool connected;
+    zhandle_t *zh;
+    
+    watchCtx() {
+        connected = false;
+        zh = 0;
+    }
+    ~watchCtx() {
+        if (zh) {
+            zookeeper_close(zh);
+            zh = 0;
+        }
+    }
+
+    evt_t getEvent() {
+        evt_t evt;
+        evt = events.front();
+        events.pop_front();
+        return evt;
+    }
+
+    int countEvents() {
+        int count;
+        count = events.size();
+        return count;
+    }
+
+    void putEvent(evt_t evt) {
+        events.push_back(evt);
+    }
+
+    bool waitForConnected(zhandle_t *zh) {
+        time_t expires = time(0) + 10;
+        while(!connected && time(0) < expires) {
+            yield(zh, 1);
+        }
+        return connected;
+    }
+    bool waitForDisconnected(zhandle_t *zh) {
+        time_t expires = time(0) + 15;
+        while(connected && time(0) < expires) {
+            yield(zh, 1);
+        }
+        return !connected;
+    }
+} watchctx_t; 
+
+extern "C" {
+    
+    const char *thread_test_string="Hello World!";
+   
+    void *offer_thread_shared_queue(void *queue_handle){
+        zkr_queue_t *queue = (zkr_queue_t *) queue_handle;
+
+        int test_string_buffer_length = strlen(thread_test_string) + 1;
+        int offer_rc = zkr_queue_offer(queue, thread_test_string, test_string_buffer_length);
+        pthread_exit(NULL);
+    }
+    
+    void *take_thread_shared_queue(void *queue_handle){
+        zkr_queue_t *queue = (zkr_queue_t *) queue_handle;
+
+        int test_string_buffer_length = strlen(thread_test_string) + 1;
+        int receive_buffer_capacity = test_string_buffer_length;
+        int receive_buffer_length = receive_buffer_capacity;
+        char *receive_buffer = (char *) malloc(sizeof(char) * receive_buffer_capacity);
+
+        int remove_rc = zkr_queue_take(queue, receive_buffer, &receive_buffer_length);
+        switch(remove_rc){
+        case ZOK:
+            pthread_exit(receive_buffer);
+        default:
+            free(receive_buffer);
+            pthread_exit(NULL);
+        }
+    }
+    
+    int valid_test_string(void *result){
+        char *result_string = (char *) result;
+        return !strncmp(result_string, thread_test_string, strlen(thread_test_string));
+    }
+}
+
+class Zookeeper_queuetest : public CPPUNIT_NS::TestFixture
+{
+    CPPUNIT_TEST_SUITE(Zookeeper_queuetest);
+    CPPUNIT_TEST(testInitDestroy);
+    CPPUNIT_TEST(testOffer1);
+    CPPUNIT_TEST(testOfferRemove1);
+    CPPUNIT_TEST(testOfferRemove2);
+    CPPUNIT_TEST(testOfferRemove3);
+    CPPUNIT_TEST(testOfferRemove4);
+    CPPUNIT_TEST(testOfferRemove5);
+    CPPUNIT_TEST(testOfferRemove6);
+    CPPUNIT_TEST(testOfferTake1);
+    CPPUNIT_TEST(testOfferTake2);
+    CPPUNIT_TEST(testOfferTake3);
+    CPPUNIT_TEST(testOfferTake4);
+    CPPUNIT_TEST(testOfferTake5);
+    CPPUNIT_TEST(testOfferTake6);
+    CPPUNIT_TEST_SUITE_END();
+
+    static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
+        watchctx_t *ctx = (watchctx_t*)v;
+
+        if (state == ZOO_CONNECTED_STATE) {
+            ctx->connected = true;
+        } else {
+            ctx->connected = false;
+        }
+        if (type != ZOO_SESSION_EVENT) {
+            evt_t evt;
+            evt.path = path;
+            evt.type = type;
+            ctx->putEvent(evt);
+        }
+    }
+
+    static const char hostPorts[];
+
+    const char *getHostPorts() {
+        return hostPorts;
+    }
+
+    zhandle_t *createClient(watchctx_t *ctx) {
+        zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0,
+                                       ctx, 0);
+        ctx->zh = zk;
+        sleep(1);
+        return zk;
+    }
+    
+public:
+
+#define ZKSERVER_CMD "./tests/zkServer.sh"
+
+    void setUp()
+        {
+            char cmd[1024];
+            sprintf(cmd, "%s startClean %s", ZKSERVER_CMD, getHostPorts());
+            CPPUNIT_ASSERT(system(cmd) == 0);
+        }
+    
+
+    void startServer() {
+        char cmd[1024];
+        sprintf(cmd, "%s start %s", ZKSERVER_CMD, getHostPorts());
+        CPPUNIT_ASSERT(system(cmd) == 0);
+    }
+
+    void stopServer() {
+        tearDown();
+    }
+
+    void tearDown()
+        {
+            char cmd[1024];
+            sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts());
+            CPPUNIT_ASSERT(system(cmd) == 0);
+        }
+
+    void initializeQueuesAndHandles(int num_clients, zhandle_t *zoohandles[], 
+                                    watchctx_t ctxs[], zkr_queue_t queues[], char *path){
+        int i;
+        for(i=0; i< num_clients; i++){
+            zoohandles[i] = createClient(&ctxs[i]);
+            zkr_queue_init(&queues[i], zoohandles[i], path, &ZOO_OPEN_ACL_UNSAFE);
+        }
+    }
+
+    void cleanUpQueues(int num_clients, zkr_queue_t queues[]){
+        int i;
+        for(i=0; i < num_clients; i++){
+            zkr_queue_destroy(&queues[i]);
+        }
+    }
+
+    void testInitDestroy(){
+        int num_clients = 1;
+        watchctx_t ctxs[num_clients];
+        zhandle_t *zoohandles[num_clients];
+        zkr_queue_t queues[num_clients];
+        char *path= (char *)"/testInitDestroy";
+
+        int i;
+        for(i=0; i< num_clients; i++){
+            zoohandles[i] = createClient(&ctxs[i]);
+            zkr_queue_init(&queues[i], zoohandles[i], path, &ZOO_OPEN_ACL_UNSAFE);
+        }
+    
+        for(i=0; i< num_clients; i++){
+            zkr_queue_destroy(&queues[i]);
+        }
+    
+    }
+
+    void testOffer1(){
+        int num_clients = 1;
+        watchctx_t ctxs[num_clients];
+        zhandle_t *zoohandles[num_clients];
+        zkr_queue_t queues[num_clients];
+        char *path= (char *)"/testOffer1";
+
+        initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+        const char *test_string="Hello World!";
+        int test_string_length = strlen(test_string);
+        int test_string_buffer_length = test_string_length + 1;
+        char buffer[test_string_buffer_length];
+
+        int offer_rc = zkr_queue_offer(&queues[0], test_string, test_string_buffer_length);
+        CPPUNIT_ASSERT(offer_rc == ZOK);
+
+        int removed_element_buffer_length = test_string_buffer_length;
+        int remove_rc = zkr_queue_remove(&queues[0], buffer, &removed_element_buffer_length);
+        CPPUNIT_ASSERT(remove_rc == ZOK);
+        CPPUNIT_ASSERT(removed_element_buffer_length == test_string_buffer_length);
+        CPPUNIT_ASSERT(strncmp(test_string,buffer,test_string_length)==0);
+
+        cleanUpQueues(num_clients,queues);
+    }
+
+    void create_n_remove_m(char *path, int n, int m){
+        int num_clients = 2;
+        watchctx_t ctxs[num_clients];
+        zhandle_t *zoohandles[num_clients];
+        zkr_queue_t queues[num_clients];
+    
+        initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+        int i;
+        int max_digits = sizeof(int)*3;
+        const char *test_string = "Hello World!";
+        int buffer_length = strlen(test_string) + max_digits + 1;
+        char correct_buffer[buffer_length];
+        char receive_buffer[buffer_length];
+
+        for(i = 0; i < n; i++){
+            snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+            int offer_rc = zkr_queue_offer(&queues[0], correct_buffer, buffer_length);
+            CPPUNIT_ASSERT(offer_rc == ZOK);
+        }
+        printf("Offers\n");
+        for(i=0; i<m ;i++){
+            snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+            int receive_buffer_length=buffer_length;
+            int remove_rc = zkr_queue_remove(&queues[1], receive_buffer, &receive_buffer_length);
+            CPPUNIT_ASSERT(remove_rc == ZOK);
+            if(i >=n){
+                CPPUNIT_ASSERT(receive_buffer_length == -1);
+            }else{
+                CPPUNIT_ASSERT(strncmp(correct_buffer,receive_buffer, buffer_length)==0);
+            }
+        }
+
+        cleanUpQueues(num_clients,queues);
+    }
+
+    void testOfferRemove1(){
+        create_n_remove_m((char *)"/testOfferRemove1", 0,1);
+    }
+
+    void testOfferRemove2(){
+        create_n_remove_m((char *)"/testOfferRemove2", 1,1);
+    }
+
+    void testOfferRemove3(){
+        create_n_remove_m((char *)"/testOfferRemove3", 10,1);
+    }
+    
+    void testOfferRemove4(){
+        create_n_remove_m((char *)"/testOfferRemove4", 10,10);
+    }
+
+    void testOfferRemove5(){
+        create_n_remove_m((char *)"/testOfferRemove5", 10,5);
+    }
+
+    void testOfferRemove6(){
+        create_n_remove_m((char *)"/testOfferRemove6", 10,11);
+    }
+
+    void create_n_take_m(char *path, int n, int m){
+        CPPUNIT_ASSERT(m<=n);
+        int num_clients = 2;
+        watchctx_t ctxs[num_clients];
+        zhandle_t *zoohandles[num_clients];
+        zkr_queue_t queues[num_clients];
+    
+        initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+        int i;
+        int max_digits = sizeof(int)*3;
+        const char *test_string = "Hello World!";
+        int buffer_length = strlen(test_string) + max_digits + 1;
+        char correct_buffer[buffer_length];
+        char receive_buffer[buffer_length];
+
+        for(i = 0; i < n; i++){
+            snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+            int offer_rc = zkr_queue_offer(&queues[0], correct_buffer, buffer_length);
+            CPPUNIT_ASSERT(offer_rc == ZOK);
+        }
+        printf("Offers\n");
+        for(i=0; i<m ;i++){
+            snprintf(correct_buffer, buffer_length, "%s%d", test_string,i);
+            int receive_buffer_length=buffer_length;
+            int remove_rc = zkr_queue_take(&queues[1], receive_buffer, &receive_buffer_length);
+            CPPUNIT_ASSERT(remove_rc == ZOK);
+            if(i >=n){
+                CPPUNIT_ASSERT(receive_buffer_length == -1);
+            }else{
+                CPPUNIT_ASSERT(strncmp(correct_buffer,receive_buffer, buffer_length)==0);
+            }
+        }
+
+        cleanUpQueues(num_clients,queues);
+    }
+
+    void testOfferTake1(){
+        create_n_take_m((char *)"/testOfferTake1", 2,1);
+    }
+
+    void testOfferTake2(){
+        create_n_take_m((char *)"/testOfferTake2", 1,1);
+    }
+
+    void testOfferTake3(){
+        create_n_take_m((char *)"/testOfferTake3", 10,1);
+    }
+    
+    void testOfferTake4(){
+        create_n_take_m((char *)"/testOfferTake4", 10,10);
+    }
+
+    void testOfferTake5(){
+        create_n_take_m((char *)"/testOfferTake5", 10,5);
+    }
+
+    void testOfferTake6(){
+        create_n_take_m((char *)"/testOfferTake6", 12,11);
+    }
+
+    void testTakeThreaded(){
+        int num_clients = 1;
+        watchctx_t ctxs[num_clients];
+        zhandle_t *zoohandles[num_clients];
+        zkr_queue_t queues[num_clients];
+        char *path=(char *)"/testTakeThreaded";
+    
+        initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+        pthread_t take_thread;
+
+        pthread_create(&take_thread, NULL, take_thread_shared_queue, (void *) &queues[0]);
+
+        usleep(1000);
+
+        pthread_t offer_thread;
+        pthread_create(&offer_thread, NULL, offer_thread_shared_queue, (void *) &queues[0]);
+        pthread_join(offer_thread, NULL);
+
+        void *take_thread_result;
+        pthread_join(take_thread, &take_thread_result);
+        CPPUNIT_ASSERT(take_thread_result != NULL);
+        CPPUNIT_ASSERT(valid_test_string(take_thread_result));
+
+        cleanUpQueues(num_clients,queues);
+    }
+
+    void testTakeThreaded2(){
+        int num_clients = 1;
+        watchctx_t ctxs[num_clients];
+        zhandle_t *zoohandles[num_clients];
+        zkr_queue_t queues[num_clients];
+        char *path=(char *)"/testTakeThreaded2";
+    
+        initializeQueuesAndHandles(num_clients, zoohandles, ctxs, queues, path);
+
+        int take_attempts;
+        int num_take_attempts = 2;
+        for(take_attempts=0; take_attempts < num_take_attempts; take_attempts++){ 
+            pthread_t take_thread;
+    
+            pthread_create(&take_thread, NULL, take_thread_shared_queue, (void *) &queues[0]);
+    
+            usleep(1000);
+    
+            pthread_t offer_thread;
+            pthread_create(&offer_thread, NULL, offer_thread_shared_queue, (void *) &queues[0]);
+            pthread_join(offer_thread, NULL);
+    
+            void *take_thread_result;
+            pthread_join(take_thread, &take_thread_result);
+            CPPUNIT_ASSERT(take_thread_result != NULL);
+            CPPUNIT_ASSERT(valid_test_string(take_thread_result));
+
+        }
+        cleanUpQueues(num_clients,queues);
+    }
+};
+
+const char Zookeeper_queuetest::hostPorts[] = "127.0.0.1:22181";
+CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_queuetest);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestDriver.cc
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestDriver.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestDriver.cc
new file mode 100644
index 0000000..2b818f4
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/TestDriver.cc
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <cppunit/TestRunner.h>
+#include <cppunit/CompilerOutputter.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/BriefTestProgressListener.h>
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <stdexcept>
+#include <cppunit/Exception.h>
+#include <cppunit/TestFailure.h>
+#include <cppunit/XmlOutputter.h>
+#include <fstream>
+
+#include "Util.h"
+
+using namespace std;
+
+CPPUNIT_NS_BEGIN
+
+class EclipseOutputter: public CompilerOutputter
+{
+public:
+  EclipseOutputter(TestResultCollector *result,ostream &stream):
+        CompilerOutputter(result,stream,"%p:%l: "),stream_(stream)
+    {
+    }
+    virtual void printFailedTestName( TestFailure *failure ){}
+    virtual void printFailureMessage( TestFailure *failure )
+    {
+      stream_<<": ";
+      Message msg = failure->thrownException()->message();
+      stream_<< msg.shortDescription();
+
+      string text;
+      for(int i=0; i<msg.detailCount();i++){
+          text+=msg.detailAt(i);
+          if(i+1!=msg.detailCount())
+              text+=", ";
+      }
+      if(text.length()!=0)
+          stream_ <<" ["<<text<<"]";
+      stream_<<"\n";
+    }
+    ostream& stream_;
+};
+
+CPPUNIT_NS_END
+
+int main( int argc, char* argv[] ) { 
+   // if command line contains "-ide" then this is the post build check
+   // => the output must be in the compiler error format.
+   //bool selfTest = (argc > 1) && (std::string("-ide") == argv[1]);
+   globalTestConfig.addConfigFromCmdLine(argc,argv);
+
+   // Create the event manager and test controller
+   CPPUNIT_NS::TestResult controller;
+   // Add a listener that colllects test result
+   CPPUNIT_NS::TestResultCollector result;
+   controller.addListener( &result );
+   
+   // Add a listener that print dots as tests run.
+   // CPPUNIT_NS::TextTestProgressListener progress;
+   CPPUNIT_NS::BriefTestProgressListener progress;
+   controller.addListener( &progress );
+ 
+   CPPUNIT_NS::TestRunner runner;
+   runner.addTest( CPPUNIT_NS::TestFactoryRegistry::getRegistry().makeTest() );
+ 
+   try
+   {
+     cout << "Running "  <<  globalTestConfig.getTestName();
+     runner.run( controller, globalTestConfig.getTestName());
+     cout<<endl;
+
+     // Print test in a compiler compatible format.
+     CPPUNIT_NS::EclipseOutputter outputter( &result,cout);
+     outputter.write(); 
+
+ // Uncomment this for XML output
+#ifdef ENABLE_XML_OUTPUT
+     std::ofstream file( "tests.xml" );
+     CPPUNIT_NS::XmlOutputter xml( &result, file );
+     xml.setStyleSheet( "report.xsl" );
+     xml.write();
+     file.close();
+#endif
+   }
+   catch ( std::invalid_argument &e )  // Test path not resolved
+   {
+     cout<<"\nERROR: "<<e.what()<<endl;
+     return 0;
+   }
+
+   return result.wasSuccessful() ? 0 : 1;
+ }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.cc
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.cc b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.cc
new file mode 100644
index 0000000..26a9a09
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.cc
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Util.h"
+
+const std::string EMPTY_STRING;
+
+TestConfig globalTestConfig;
+
+void millisleep(int ms){
+    timespec ts;
+    ts.tv_sec=ms/1000;
+    ts.tv_nsec=(ms%1000)*1000000; // to nanoseconds
+    nanosleep(&ts,0);
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.h
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.h b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.h
new file mode 100644
index 0000000..95f5420
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/Util.h
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef UTIL_H_
+#define UTIL_H_
+
+#include <map>
+#include <vector>
+#include <string>
+
+// number of elements in array
+#define COUNTOF(array) sizeof(array)/sizeof(array[0])
+
+#define DECLARE_WRAPPER(ret,sym,sig) \
+    extern "C" ret __real_##sym sig; \
+    extern "C" ret __wrap_##sym sig
+
+#define CALL_REAL(sym,params) \
+    __real_##sym params
+
+// must include "src/zookeeper_log.h" to be able to use this macro
+#define TEST_TRACE(x) \
+    log_message(3,__LINE__,__func__,format_log_message x)
+
+extern const std::string EMPTY_STRING;
+
+// *****************************************************************************
+// A bit of wizardry to get to the bare type from a reference or a pointer 
+// to the type
+template <class T>
+struct TypeOp {
+    typedef T BareT;
+    typedef T ArgT;
+};
+
+// partial specialization for reference types
+template <class T>
+struct TypeOp<T&>{
+    typedef T& ArgT;
+    typedef typename TypeOp<T>::BareT BareT;
+};
+
+// partial specialization for pointers
+template <class T>
+struct TypeOp<T*>{
+    typedef T* ArgT;
+    typedef typename TypeOp<T>::BareT BareT;
+};
+
+// *****************************************************************************
+// Container utilities
+
+template <class K, class V>
+void putValue(std::map<K,V>& map,const K& k, const V& v){
+    typedef std::map<K,V> Map;
+    typename Map::const_iterator it=map.find(k);
+    if(it==map.end())
+        map.insert(typename Map::value_type(k,v));
+    else
+        map[k]=v;
+}
+
+template <class K, class V>
+bool getValue(const std::map<K,V>& map,const K& k,V& v){
+    typedef std::map<K,V> Map;
+    typename Map::const_iterator it=map.find(k);
+    if(it==map.end())
+        return false;
+    v=it->second;
+    return true;
+}
+
+// *****************************************************************************
+// misc utils
+
+// millisecond sleep
+void millisleep(int ms);
+// evaluate given predicate until it returns true or the timeout 
+// (in millis) has expired
+template<class Predicate>
+int ensureCondition(const Predicate& p,int timeout){
+    int elapsed=0;
+    while(!p() && elapsed<timeout){
+        millisleep(2);
+        elapsed+=2;
+    }
+    return elapsed;
+};
+
+// *****************************************************************************
+// test global configuration data 
+class TestConfig{
+    typedef std::vector<std::string> CmdLineOptList;
+public:
+    typedef CmdLineOptList::const_iterator const_iterator;
+    TestConfig(){}
+    ~TestConfig(){}
+    void addConfigFromCmdLine(int argc, char* argv[]){
+        if(argc>=2)
+            testName_=argv[1];
+        for(int i=2; i<argc;++i)
+            cmdOpts_.push_back(argv[i]);
+    }
+    const_iterator getExtraOptBegin() const {return cmdOpts_.begin();}
+    const_iterator getExtraOptEnd() const {return cmdOpts_.end();}
+    size_t getExtraOptCount() const {
+        return cmdOpts_.size();
+    }
+    const std::string& getTestName() const {
+        return testName_=="all"?EMPTY_STRING:testName_;
+    }
+private:
+    CmdLineOptList cmdOpts_;
+    std::string testName_;
+};
+
+extern TestConfig globalTestConfig;
+
+#endif /*UTIL_H_*/

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/zkServer.sh
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/zkServer.sh b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/zkServer.sh
new file mode 100755
index 0000000..a22fd30
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/main/c/tests/zkServer.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+if [ "x$1" == "x" ]
+then
+	echo "USAGE: $0 startClean|start|stop hostPorts"
+	exit 2
+fi
+
+if [ "x$1" == "xstartClean" ]
+then
+	rm -rf /tmp/zkdata
+fi
+
+# Make sure nothing is left over from before
+if [ -r "/tmp/zk.pid" ]
+then
+pid=`cat /tmp/zk.pid`
+kill -9 $pid
+rm -f /tmp/zk.pid
+fi
+
+base_dir="../../../../.."
+
+CLASSPATH="$CLASSPATH:${base_dir}/build/classes"
+CLASSPATH="$CLASSPATH:${base_dir}/conf"
+
+for f in "${base_dir}"/zookeeper-*.jar
+do
+    CLASSPATH="$CLASSPATH:$f"
+done
+
+for i in "${base_dir}"/build/lib/*.jar
+do
+    CLASSPATH="$CLASSPATH:$i"
+done
+
+for i in "${base_dir}"/src/java/lib/*.jar
+do
+    CLASSPATH="$CLASSPATH:$i"
+done
+
+CLASSPATH="$CLASSPATH:${CLOVER_HOME}/lib/clover.jar"
+
+case $1 in
+start|startClean)
+	mkdir -p /tmp/zkdata
+	java -cp $CLASSPATH org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata &> /tmp/zk.log &
+        echo $! > /tmp/zk.pid
+        sleep 5
+	;;
+stop)
+	# Already killed above
+	;;
+*)
+	echo "Unknown command " + $1
+	exit 2
+esac
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java b/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
new file mode 100644
index 0000000..c35c332
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/main/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
@@ -0,0 +1,314 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * 
+ * A <a href="package.html">protocol to implement a distributed queue</a>.
+ * 
+ */
+
+public class DistributedQueue {
+    private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
+
+    private final String dir;
+
+    private ZooKeeper zookeeper;
+    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+    private final String prefix = "qn-";
+
+
+    public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){
+        this.dir = dir;
+
+        if(acl != null){
+            this.acl = acl;
+        }
+        this.zookeeper = zookeeper;
+
+    }
+
+
+
+    /**
+     * Returns a Map of the children, ordered by id.
+     * @param watcher optional watcher on getChildren() operation.
+     * @return map from id to child name for all children
+     */
+    private Map<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {
+        Map<Long,String> orderedChildren = new TreeMap<Long,String>();
+
+        List<String> childNames = null;
+        try{
+            childNames = zookeeper.getChildren(dir, watcher);
+        }catch (KeeperException.NoNodeException e){
+            throw e;
+        }
+
+        for(String childName : childNames){
+            try{
+                //Check format
+                if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+                    LOG.warn("Found child node with improper name: " + childName);
+                    continue;
+                }
+                String suffix = childName.substring(prefix.length());
+                Long childId = new Long(suffix);
+                orderedChildren.put(childId,childName);
+            }catch(NumberFormatException e){
+                LOG.warn("Found child node with improper format : " + childName + " " + e,e);
+            }
+        }
+
+        return orderedChildren;
+    }
+
+    /**
+     * Find the smallest child node.
+     * @return The name of the smallest child node.
+     */
+    private String smallestChildName() throws KeeperException, InterruptedException {
+        long minId = Long.MAX_VALUE;
+        String minName = "";
+
+        List<String> childNames = null;
+
+        try{
+            childNames = zookeeper.getChildren(dir, false);
+        }catch(KeeperException.NoNodeException e){
+            LOG.warn("Caught: " +e,e);
+            return null;
+        }
+
+        for(String childName : childNames){
+            try{
+                //Check format
+                if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+                    LOG.warn("Found child node with improper name: " + childName);
+                    continue;
+                }
+                String suffix = childName.substring(prefix.length());
+                long childId = Long.parseLong(suffix);
+                if(childId < minId){
+                    minId = childId;
+                    minName = childName;
+                }
+            }catch(NumberFormatException e){
+                LOG.warn("Found child node with improper format : " + childName + " " + e,e);
+            }
+        }
+
+
+        if(minId < Long.MAX_VALUE){
+            return minName;
+        }else{
+            return null;
+        }
+    }
+
+    /**
+     * Return the head of the queue without modifying the queue.
+     * @return the data at the head of the queue.
+     * @throws NoSuchElementException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {
+        Map<Long,String> orderedChildren;
+
+        // element, take, and remove follow the same pattern.
+        // We want to return the child node with the smallest sequence number.
+        // Since other clients are remove()ing and take()ing nodes concurrently, 
+        // the child with the smallest sequence number in orderedChildren might be gone by the time we check.
+        // We don't call getChildren again until we have tried the rest of the nodes in sequence order.
+        while(true){
+            try{
+                orderedChildren = orderedChildren(null);
+            }catch(KeeperException.NoNodeException e){
+                throw new NoSuchElementException();
+            }
+            if(orderedChildren.size() == 0 ) throw new NoSuchElementException();
+
+            for(String headNode : orderedChildren.values()){
+                if(headNode != null){
+                    try{
+                        return zookeeper.getData(dir+"/"+headNode, false, null);
+                    }catch(KeeperException.NoNodeException e){
+                        //Another client removed the node first, try next
+                    }
+                }
+            }
+
+        }
+    }
+
+
+    /**
+     * Attempts to remove the head of the queue and return it.
+     * @return The former head of the queue
+     * @throws NoSuchElementException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
+        Map<Long,String> orderedChildren;
+        // Same as for element.  Should refactor this.
+        while(true){
+            try{
+                orderedChildren = orderedChildren(null);
+            }catch(KeeperException.NoNodeException e){
+                throw new NoSuchElementException();
+            }
+            if(orderedChildren.size() == 0) throw new NoSuchElementException();
+
+            for(String headNode : orderedChildren.values()){
+                String path = dir +"/"+headNode;
+                try{
+                    byte[] data = zookeeper.getData(path, false, null);
+                    zookeeper.delete(path, -1);
+                    return data;
+                }catch(KeeperException.NoNodeException e){
+                    // Another client deleted the node first.
+                }
+            }
+
+        }
+    }
+
+    private class LatchChildWatcher implements Watcher {
+
+        CountDownLatch latch;
+
+        public LatchChildWatcher(){
+            latch = new CountDownLatch(1);
+        }
+
+        public void process(WatchedEvent event){
+            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + 
+                    event.getState() + " type " + event.getType());
+            latch.countDown();
+        }
+        public void await() throws InterruptedException {
+            latch.await();
+        }
+    }
+
+    /**
+     * Removes the head of the queue and returns it, blocks until it succeeds.
+     * @return The former head of the queue
+     * @throws NoSuchElementException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] take() throws KeeperException, InterruptedException {
+        Map<Long,String> orderedChildren;
+        // Same as for element.  Should refactor this.
+        while(true){
+            LatchChildWatcher childWatcher = new LatchChildWatcher();
+            try{
+                orderedChildren = orderedChildren(childWatcher);
+            }catch(KeeperException.NoNodeException e){
+                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+                continue;
+            }
+            if(orderedChildren.size() == 0){
+                childWatcher.await();
+                continue;
+            }
+
+            for(String headNode : orderedChildren.values()){
+                String path = dir +"/"+headNode;
+                try{
+                    byte[] data = zookeeper.getData(path, false, null);
+                    zookeeper.delete(path, -1);
+                    return data;
+                }catch(KeeperException.NoNodeException e){
+                    // Another client deleted the node first.
+                }
+            }
+        }
+    }
+
+    /**
+     * Inserts data into queue.
+     * @param data
+     * @return true if data was successfully added
+     */
+    public boolean offer(byte[] data) throws KeeperException, InterruptedException{
+        for(;;){
+            try{
+                zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
+                return true;
+            }catch(KeeperException.NoNodeException e){
+                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+            }
+        }
+
+    }
+
+    /**
+     * Returns the data at the first element of the queue, or null if the queue is empty.
+     * @return data at the first element of the queue, or null.
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] peek() throws KeeperException, InterruptedException{
+        try{
+            return element();
+        }catch(NoSuchElementException e){
+            return null;
+        }
+    }
+
+
+    /**
+     * Attempts to remove the head of the queue and return it. Returns null if the queue is empty.
+     * @return Head of the queue or null.
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public byte[] poll() throws KeeperException, InterruptedException {
+        try{
+            return remove();
+        }catch(NoSuchElementException e){
+            return null;
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/src/test/java/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/src/test/java/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java b/zookeeper-recipes/zookeeper-recipes-queue/src/test/java/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
new file mode 100644
index 0000000..c6cfae2
--- /dev/null
+++ b/zookeeper-recipes/zookeeper-recipes-queue/src/test/java/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
@@ -0,0 +1,286 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.NoSuchElementException;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+public class DistributedQueueTest extends ClientBase {
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        LOG.info("FINISHED " + getTestName());
+    }
+
+
+    @Test
+    public void testOffer1() throws Exception {
+        String dir = "/testOffer1";
+        String testString = "Hello World";
+        final int num_clients = 1;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        queueHandles[0].offer(testString.getBytes());
+
+        byte dequeuedBytes[] = queueHandles[0].remove();
+        Assert.assertEquals(new String(dequeuedBytes), testString);
+    }
+
+    @Test
+    public void testOffer2() throws Exception {
+        String dir = "/testOffer2";
+        String testString = "Hello World";
+        final int num_clients = 2;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        queueHandles[0].offer(testString.getBytes());
+
+        byte dequeuedBytes[] = queueHandles[1].remove();
+        Assert.assertEquals(new String(dequeuedBytes), testString);
+    }
+
+    @Test
+    public void testTake1() throws Exception {
+        String dir = "/testTake1";
+        String testString = "Hello World";
+        final int num_clients = 1;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        queueHandles[0].offer(testString.getBytes());
+
+        byte dequeuedBytes[] = queueHandles[0].take();
+        Assert.assertEquals(new String(dequeuedBytes), testString);
+    }
+
+
+
+    @Test
+    public void testRemove1() throws Exception{
+        String dir = "/testRemove1";
+        String testString = "Hello World";
+        final int num_clients = 1;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        try{
+            queueHandles[0].remove();
+        }catch(NoSuchElementException e){
+            return;
+        }
+        Assert.assertTrue(false);
+    }
+
+    public void createNremoveMtest(String dir,int n,int m) throws Exception{
+        String testString = "Hello World";
+        final int num_clients = 2;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        for(int i=0; i< n; i++){
+            String offerString = testString + i;
+            queueHandles[0].offer(offerString.getBytes());
+        }
+
+        byte data[] = null;
+        for(int i=0; i<m; i++){
+            data=queueHandles[1].remove();
+        }
+        Assert.assertEquals(new String(data), testString+(m-1));
+    }
+
+    @Test
+    public void testRemove2() throws Exception{
+        createNremoveMtest("/testRemove2",10,2);
+    }
+    @Test
+    public void testRemove3() throws Exception{
+        createNremoveMtest("/testRemove3",1000,1000);
+    }
+
+    public void createNremoveMelementTest(String dir,int n,int m) throws Exception{
+        String testString = "Hello World";
+        final int num_clients = 2;
+        ZooKeeper clients[] = new ZooKeeper[num_clients];
+        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        for(int i=0; i< n; i++){
+            String offerString = testString + i;
+            queueHandles[0].offer(offerString.getBytes());
+        }
+
+        byte data[] = null;
+        for(int i=0; i<m; i++){
+            data=queueHandles[1].remove();
+        }
+        Assert.assertEquals(new String(queueHandles[1].element()), testString+m);
+    }
+
+    @Test
+    public void testElement1() throws Exception {
+        createNremoveMelementTest("/testElement1",1,0);
+    }
+
+    @Test
+    public void testElement2() throws Exception {
+        createNremoveMelementTest("/testElement2",10,2);
+    }
+
+    @Test
+    public void testElement3() throws Exception {
+        createNremoveMelementTest("/testElement3",1000,500);
+    }
+
+    @Test
+    public void testElement4() throws Exception {
+        createNremoveMelementTest("/testElement4",1000,1000-1);
+    }
+
+    @Test
+    public void testTakeWait1() throws Exception{
+        String dir = "/testTakeWait1";
+        final String testString = "Hello World";
+        final int num_clients = 1;
+        final ZooKeeper clients[] = new ZooKeeper[num_clients];
+        final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+
+        final byte[] takeResult[] = new byte[1][];
+        Thread takeThread = new Thread(){
+            public void run(){
+                try{
+                    takeResult[0] = queueHandles[0].take();
+                }catch(KeeperException e){
+
+                }catch(InterruptedException e){
+
+                }
+            }
+        };
+        takeThread.start();
+
+        Thread.sleep(1000);
+        Thread offerThread= new Thread() {
+            public void run(){
+                try {
+                    queueHandles[0].offer(testString.getBytes());
+                } catch (KeeperException e) {
+
+                } catch (InterruptedException e) {
+
+                }
+            }
+        };
+        offerThread.start();
+        offerThread.join();
+
+        takeThread.join();
+
+        Assert.assertTrue(takeResult[0] != null);
+        Assert.assertEquals(new String(takeResult[0]), testString);
+    }
+
+    @Test
+    public void testTakeWait2() throws Exception{
+        String dir = "/testTakeWait2";
+        final String testString = "Hello World";
+        final int num_clients = 1;
+        final ZooKeeper clients[] = new ZooKeeper[num_clients];
+        final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+        for(int i=0; i < clients.length; i++){
+            clients[i] = createClient();
+            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+        }
+        int num_attempts =2;
+        for(int i=0; i< num_attempts; i++){
+            final byte[] takeResult[] = new byte[1][];
+            final String threadTestString = testString + i;
+            Thread takeThread = new Thread(){
+                public void run(){
+                    try{
+                        takeResult[0] = queueHandles[0].take();
+                    }catch(KeeperException e){
+
+                    }catch(InterruptedException e){
+
+                    }
+                }
+            };
+            takeThread.start();
+
+            Thread.sleep(1000);
+            Thread offerThread= new Thread() {
+                public void run(){
+                    try {
+                        queueHandles[0].offer(threadTestString.getBytes());
+                    } catch (KeeperException e) {
+
+                    } catch (InterruptedException e) {
+
+                    }
+                }
+            };
+            offerThread.start();
+            offerThread.join();
+
+            takeThread.join();
+
+            Assert.assertTrue(takeResult[0] != null);
+            Assert.assertEquals(new String(takeResult[0]), threadTestString);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/2584625c/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java b/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
deleted file mode 100644
index c6cfae2..0000000
--- a/zookeeper-recipes/zookeeper-recipes-queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.zookeeper.recipes.queue;
-
-import java.util.NoSuchElementException;
-
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.test.ClientBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-
-public class DistributedQueueTest extends ClientBase {
-
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-        LOG.info("FINISHED " + getTestName());
-    }
-
-
-    @Test
-    public void testOffer1() throws Exception {
-        String dir = "/testOffer1";
-        String testString = "Hello World";
-        final int num_clients = 1;
-        ZooKeeper clients[] = new ZooKeeper[num_clients];
-        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-
-        queueHandles[0].offer(testString.getBytes());
-
-        byte dequeuedBytes[] = queueHandles[0].remove();
-        Assert.assertEquals(new String(dequeuedBytes), testString);
-    }
-
-    @Test
-    public void testOffer2() throws Exception {
-        String dir = "/testOffer2";
-        String testString = "Hello World";
-        final int num_clients = 2;
-        ZooKeeper clients[] = new ZooKeeper[num_clients];
-        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-
-        queueHandles[0].offer(testString.getBytes());
-
-        byte dequeuedBytes[] = queueHandles[1].remove();
-        Assert.assertEquals(new String(dequeuedBytes), testString);
-    }
-
-    @Test
-    public void testTake1() throws Exception {
-        String dir = "/testTake1";
-        String testString = "Hello World";
-        final int num_clients = 1;
-        ZooKeeper clients[] = new ZooKeeper[num_clients];
-        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-
-        queueHandles[0].offer(testString.getBytes());
-
-        byte dequeuedBytes[] = queueHandles[0].take();
-        Assert.assertEquals(new String(dequeuedBytes), testString);
-    }
-
-
-
-    @Test
-    public void testRemove1() throws Exception{
-        String dir = "/testRemove1";
-        String testString = "Hello World";
-        final int num_clients = 1;
-        ZooKeeper clients[] = new ZooKeeper[num_clients];
-        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-
-        try{
-            queueHandles[0].remove();
-        }catch(NoSuchElementException e){
-            return;
-        }
-        Assert.assertTrue(false);
-    }
-
-    public void createNremoveMtest(String dir,int n,int m) throws Exception{
-        String testString = "Hello World";
-        final int num_clients = 2;
-        ZooKeeper clients[] = new ZooKeeper[num_clients];
-        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-
-        for(int i=0; i< n; i++){
-            String offerString = testString + i;
-            queueHandles[0].offer(offerString.getBytes());
-        }
-
-        byte data[] = null;
-        for(int i=0; i<m; i++){
-            data=queueHandles[1].remove();
-        }
-        Assert.assertEquals(new String(data), testString+(m-1));
-    }
-
-    @Test
-    public void testRemove2() throws Exception{
-        createNremoveMtest("/testRemove2",10,2);
-    }
-    @Test
-    public void testRemove3() throws Exception{
-        createNremoveMtest("/testRemove3",1000,1000);
-    }
-
-    public void createNremoveMelementTest(String dir,int n,int m) throws Exception{
-        String testString = "Hello World";
-        final int num_clients = 2;
-        ZooKeeper clients[] = new ZooKeeper[num_clients];
-        DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-
-        for(int i=0; i< n; i++){
-            String offerString = testString + i;
-            queueHandles[0].offer(offerString.getBytes());
-        }
-
-        byte data[] = null;
-        for(int i=0; i<m; i++){
-            data=queueHandles[1].remove();
-        }
-        Assert.assertEquals(new String(queueHandles[1].element()), testString+m);
-    }
-
-    @Test
-    public void testElement1() throws Exception {
-        createNremoveMelementTest("/testElement1",1,0);
-    }
-
-    @Test
-    public void testElement2() throws Exception {
-        createNremoveMelementTest("/testElement2",10,2);
-    }
-
-    @Test
-    public void testElement3() throws Exception {
-        createNremoveMelementTest("/testElement3",1000,500);
-    }
-
-    @Test
-    public void testElement4() throws Exception {
-        createNremoveMelementTest("/testElement4",1000,1000-1);
-    }
-
-    @Test
-    public void testTakeWait1() throws Exception{
-        String dir = "/testTakeWait1";
-        final String testString = "Hello World";
-        final int num_clients = 1;
-        final ZooKeeper clients[] = new ZooKeeper[num_clients];
-        final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-
-        final byte[] takeResult[] = new byte[1][];
-        Thread takeThread = new Thread(){
-            public void run(){
-                try{
-                    takeResult[0] = queueHandles[0].take();
-                }catch(KeeperException e){
-
-                }catch(InterruptedException e){
-
-                }
-            }
-        };
-        takeThread.start();
-
-        Thread.sleep(1000);
-        Thread offerThread= new Thread() {
-            public void run(){
-                try {
-                    queueHandles[0].offer(testString.getBytes());
-                } catch (KeeperException e) {
-
-                } catch (InterruptedException e) {
-
-                }
-            }
-        };
-        offerThread.start();
-        offerThread.join();
-
-        takeThread.join();
-
-        Assert.assertTrue(takeResult[0] != null);
-        Assert.assertEquals(new String(takeResult[0]), testString);
-    }
-
-    @Test
-    public void testTakeWait2() throws Exception{
-        String dir = "/testTakeWait2";
-        final String testString = "Hello World";
-        final int num_clients = 1;
-        final ZooKeeper clients[] = new ZooKeeper[num_clients];
-        final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
-        for(int i=0; i < clients.length; i++){
-            clients[i] = createClient();
-            queueHandles[i] = new DistributedQueue(clients[i], dir, null);
-        }
-        int num_attempts =2;
-        for(int i=0; i< num_attempts; i++){
-            final byte[] takeResult[] = new byte[1][];
-            final String threadTestString = testString + i;
-            Thread takeThread = new Thread(){
-                public void run(){
-                    try{
-                        takeResult[0] = queueHandles[0].take();
-                    }catch(KeeperException e){
-
-                    }catch(InterruptedException e){
-
-                    }
-                }
-            };
-            takeThread.start();
-
-            Thread.sleep(1000);
-            Thread offerThread= new Thread() {
-                public void run(){
-                    try {
-                        queueHandles[0].offer(threadTestString.getBytes());
-                    } catch (KeeperException e) {
-
-                    } catch (InterruptedException e) {
-
-                    }
-                }
-            };
-            offerThread.start();
-            offerThread.join();
-
-            takeThread.join();
-
-            Assert.assertTrue(takeResult[0] != null);
-            Assert.assertEquals(new String(takeResult[0]), threadTestString);
-        }
-    }
-}
-


Mime
View raw message