Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/TestDriver.cc Thu Nov 12 21:27:47
2009
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+#include <cppunit/TestRunner.h>
+#include <cppunit/CompilerOutputter.h>
+#include <cppunit/TestResult.h>
+#include <cppunit/TestResultCollector.h>
+#include <cppunit/TextTestProgressListener.h>
+#include <cppunit/BriefTestProgressListener.h>
+#include <cppunit/extensions/TestFactoryRegistry.h>
+#include <stdexcept>
+#include <cppunit/Exception.h>
+#include <cppunit/TestFailure.h>
+#include <cppunit/XmlOutputter.h>
+#include <fstream>
+
+#include "Util.h"
+
+using namespace std;
+
+CPPUNIT_NS_BEGIN
+
+class EclipseOutputter: public CompilerOutputter
+{
+public:
+ EclipseOutputter(TestResultCollector *result,ostream &stream):
+ CompilerOutputter(result,stream,"%p:%l: "),stream_(stream)
+ {
+ }
+ virtual void printFailedTestName( TestFailure *failure ){}
+ virtual void printFailureMessage( TestFailure *failure )
+ {
+ stream_<<": ";
+ Message msg = failure->thrownException()->message();
+ stream_<< msg.shortDescription();
+
+ string text;
+ for(int i=0; i<msg.detailCount();i++){
+ text+=msg.detailAt(i);
+ if(i+1!=msg.detailCount())
+ text+=", ";
+ }
+ if(text.length()!=0)
+ stream_ <<" ["<<text<<"]";
+ stream_<<"\n";
+ }
+ ostream& stream_;
+};
+
+CPPUNIT_NS_END
+
+int main( int argc, char* argv[] ) {
+ // if command line contains "-ide" then this is the post build check
+ // => the output must be in the compiler error format.
+ //bool selfTest = (argc > 1) && (std::string("-ide") == argv[1]);
+ globalTestConfig.addConfigFromCmdLine(argc,argv);
+
+ // Create the event manager and test controller
+ CPPUNIT_NS::TestResult controller;
+ // Add a listener that colllects test result
+ CPPUNIT_NS::TestResultCollector result;
+ controller.addListener( &result );
+
+ // Add a listener that print dots as tests run.
+ // CPPUNIT_NS::TextTestProgressListener progress;
+ CPPUNIT_NS::BriefTestProgressListener progress;
+ controller.addListener( &progress );
+
+ CPPUNIT_NS::TestRunner runner;
+ runner.addTest( CPPUNIT_NS::TestFactoryRegistry::getRegistry().makeTest() );
+
+ try
+ {
+ cout << "Running " << globalTestConfig.getTestName();
+ runner.run( controller, globalTestConfig.getTestName());
+ cout<<endl;
+
+ // Print test in a compiler compatible format.
+ CPPUNIT_NS::EclipseOutputter outputter( &result,cout);
+ outputter.write();
+
+ // Uncomment this for XML output
+#ifdef ENABLE_XML_OUTPUT
+ std::ofstream file( "tests.xml" );
+ CPPUNIT_NS::XmlOutputter xml( &result, file );
+ xml.setStyleSheet( "report.xsl" );
+ xml.write();
+ file.close();
+#endif
+ }
+ catch ( std::invalid_argument &e ) // Test path not resolved
+ {
+ cout<<"\nERROR: "<<e.what()<<endl;
+ return 0;
+ }
+
+ return result.wasSuccessful() ? 0 : 1;
+ }
Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.cc Thu Nov 12 21:27:47 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Util.h"
+
+const std::string EMPTY_STRING;
+
+TestConfig globalTestConfig;
+
+void millisleep(int ms){
+ timespec ts;
+ ts.tv_sec=ms/1000;
+ ts.tv_nsec=(ms%1000)*1000000; // to nanoseconds
+ nanosleep(&ts,0);
+}
Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/Util.h Thu Nov 12 21:27:47 2009
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef UTIL_H_
+#define UTIL_H_
+
+#include <map>
+#include <vector>
+#include <string>
+
+// number of elements in array
+#define COUNTOF(array) sizeof(array)/sizeof(array[0])
+
+#define DECLARE_WRAPPER(ret,sym,sig) \
+ extern "C" ret __real_##sym sig; \
+ extern "C" ret __wrap_##sym sig
+
+#define CALL_REAL(sym,params) \
+ __real_##sym params
+
+// must include "src/zookeeper_log.h" to be able to use this macro
+#define TEST_TRACE(x) \
+ log_message(3,__LINE__,__func__,format_log_message x)
+
+extern const std::string EMPTY_STRING;
+
+// *****************************************************************************
+// A bit of wizardry to get to the bare type from a reference or a pointer
+// to the type
+template <class T>
+struct TypeOp {
+ typedef T BareT;
+ typedef T ArgT;
+};
+
+// partial specialization for reference types
+template <class T>
+struct TypeOp<T&>{
+ typedef T& ArgT;
+ typedef typename TypeOp<T>::BareT BareT;
+};
+
+// partial specialization for pointers
+template <class T>
+struct TypeOp<T*>{
+ typedef T* ArgT;
+ typedef typename TypeOp<T>::BareT BareT;
+};
+
+// *****************************************************************************
+// Container utilities
+
+template <class K, class V>
+void putValue(std::map<K,V>& map,const K& k, const V& v){
+ typedef std::map<K,V> Map;
+ typename Map::const_iterator it=map.find(k);
+ if(it==map.end())
+ map.insert(typename Map::value_type(k,v));
+ else
+ map[k]=v;
+}
+
+template <class K, class V>
+bool getValue(const std::map<K,V>& map,const K& k,V& v){
+ typedef std::map<K,V> Map;
+ typename Map::const_iterator it=map.find(k);
+ if(it==map.end())
+ return false;
+ v=it->second;
+ return true;
+}
+
+// *****************************************************************************
+// misc utils
+
+// millisecond sleep
+void millisleep(int ms);
+// evaluate given predicate until it returns true or the timeout
+// (in millis) has expired
+template<class Predicate>
+int ensureCondition(const Predicate& p,int timeout){
+ int elapsed=0;
+ while(!p() && elapsed<timeout){
+ millisleep(2);
+ elapsed+=2;
+ }
+ return elapsed;
+};
+
+// *****************************************************************************
+// test global configuration data
+class TestConfig{
+ typedef std::vector<std::string> CmdLineOptList;
+public:
+ typedef CmdLineOptList::const_iterator const_iterator;
+ TestConfig(){}
+ ~TestConfig(){}
+ void addConfigFromCmdLine(int argc, char* argv[]){
+ if(argc>=2)
+ testName_=argv[1];
+ for(int i=2; i<argc;++i)
+ cmdOpts_.push_back(argv[i]);
+ }
+ const_iterator getExtraOptBegin() const {return cmdOpts_.begin();}
+ const_iterator getExtraOptEnd() const {return cmdOpts_.end();}
+ size_t getExtraOptCount() const {
+ return cmdOpts_.size();
+ }
+ const std::string& getTestName() const {
+ return testName_=="all"?EMPTY_STRING:testName_;
+ }
+private:
+ CmdLineOptList cmdOpts_;
+ std::string testName_;
+};
+
+extern TestConfig globalTestConfig;
+
+#endif /*UTIL_H_*/
Added: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh (added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh Thu Nov 12 21:27:47 2009
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+if [ "x$1" == "x" ]
+then
+ echo "USAGE: $0 startClean|start|stop hostPorts"
+ exit 2
+fi
+
+if [ "x$1" == "xstartClean" ]
+then
+ rm -rf /tmp/zkdata
+fi
+
+# Make sure nothing is left over from before
+if [ -r "/tmp/zk.pid" ]
+then
+pid=`cat /tmp/zk.pid`
+kill -9 $pid
+rm -f /tmp/zk.pid
+fi
+
+base_dir="../../../../.."
+
+CLASSPATH="$CLASSPATH:${base_dir}/build/classes"
+CLASSPATH="$CLASSPATH:${base_dir}/conf"
+
+for f in "${base_dir}"/zookeeper-*.jar
+do
+ CLASSPATH="$CLASSPATH:$f"
+done
+
+for i in "${base_dir}"/build/lib/*.jar
+do
+ CLASSPATH="$CLASSPATH:$i"
+done
+
+for i in "${base_dir}"/src/java/lib/*.jar
+do
+ CLASSPATH="$CLASSPATH:$i"
+done
+
+CLASSPATH="$CLASSPATH:${CLOVER_HOME}/lib/clover.jar"
+
+case $1 in
+start|startClean)
+ mkdir -p /tmp/zkdata
+ java -cp $CLASSPATH org.apache.zookeeper.server.ZooKeeperServerMain 22181 /tmp/zkdata &>
/tmp/zk.log &
+ echo $! > /tmp/zk.pid
+ sleep 5
+ ;;
+stop)
+ # Already killed above
+ ;;
+*)
+ echo "Unknown command " + $1
+ exit 2
+esac
+
Propchange: hadoop/zookeeper/trunk/src/recipes/queue/src/c/tests/zkServer.sh
------------------------------------------------------------------------------
svn:executable = *
Added: hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
(added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/src/java/org/apache/zookeeper/recipes/queue/DistributedQueue.java
Thu Nov 12 21:27:47 2009
@@ -0,0 +1,312 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ *
+ * A <a href="package.html">protocol to implement a distributed queue</a>.
+ *
+ */
+
+public class DistributedQueue {
+ private static final Logger LOG = Logger.getLogger(DistributedQueue.class);
+
+ private final String dir;
+
+ private ZooKeeper zookeeper;
+ private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+
+ private final String prefix = "qn-";
+
+
+ public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){
+ this.dir = dir;
+
+ if(acl != null){
+ this.acl = acl;
+ }
+ this.zookeeper = zookeeper;
+
+ }
+
+
+
+ /**
+ * Returns a Map of the children, ordered by id.
+ * @param watcher optional watcher on getChildren() operation.
+ * @return map from id to child name for all children
+ */
+ private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException,
InterruptedException {
+ TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();
+
+ List<String> childNames = null;
+ try{
+ childNames = zookeeper.getChildren(dir, watcher);
+ }catch (KeeperException.NoNodeException e){
+ throw e;
+ }
+
+ for(String childName : childNames){
+ try{
+ //Check format
+ if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+ LOG.warn("Found child node with improper name: " + childName);
+ continue;
+ }
+ String suffix = childName.substring(prefix.length());
+ Long childId = new Long(suffix);
+ orderedChildren.put(childId,childName);
+ }catch(NumberFormatException e){
+ LOG.warn("Found child node with improper format : " + childName + " " + e,e);
+ }
+ }
+
+ return orderedChildren;
+ }
+
+ /**
+ * Find the smallest child node.
+ * @return The name of the smallest child node.
+ */
+ private String smallestChildName() throws KeeperException, InterruptedException {
+ long minId = Long.MAX_VALUE;
+ String minName = "";
+
+ List<String> childNames = null;
+
+ try{
+ childNames = zookeeper.getChildren(dir, false);
+ }catch(KeeperException.NoNodeException e){
+ LOG.warn("Caught: " +e,e);
+ return null;
+ }
+
+ for(String childName : childNames){
+ try{
+ //Check format
+ if(!childName.regionMatches(0, prefix, 0, prefix.length())){
+ LOG.warn("Found child node with improper name: " + childName);
+ continue;
+ }
+ String suffix = childName.substring(prefix.length());
+ long childId = Long.parseLong(suffix);
+ if(childId < minId){
+ minId = childId;
+ minName = childName;
+ }
+ }catch(NumberFormatException e){
+ LOG.warn("Found child node with improper format : " + childName + " " + e,e);
+ }
+ }
+
+
+ if(minId < Long.MAX_VALUE){
+ return minName;
+ }else{
+ return null;
+ }
+ }
+
+ /**
+ * Return the head of the queue without modifying the queue.
+ * @return the data at the head of the queue.
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException
{
+ TreeMap<Long,String> orderedChildren;
+
+ // element, take, and remove follow the same pattern.
+ // We want to return the child node with the smallest sequence number.
+ // Since other clients are remove()ing and take()ing nodes concurrently,
+ // the child with the smallest sequence number in orderedChildren might be gone by
the time we check.
+ // We don't call getChildren again until we have tried the rest of the nodes in sequence
order.
+ while(true){
+ try{
+ orderedChildren = orderedChildren(null);
+ }catch(KeeperException.NoNodeException e){
+ throw new NoSuchElementException();
+ }
+ if(orderedChildren.size() == 0 ) throw new NoSuchElementException();
+
+ for(String headNode : orderedChildren.values()){
+ if(headNode != null){
+ try{
+ return zookeeper.getData(dir+"/"+headNode, false, null);
+ }catch(KeeperException.NoNodeException e){
+ //Another client removed the node first, try next
+ }
+ }
+ }
+
+ }
+ }
+
+
+ /**
+ * Attempts to remove the head of the queue and return it.
+ * @return The former head of the queue
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException
{
+ TreeMap<Long,String> orderedChildren;
+ // Same as for element. Should refactor this.
+ while(true){
+ try{
+ orderedChildren = orderedChildren(null);
+ }catch(KeeperException.NoNodeException e){
+ throw new NoSuchElementException();
+ }
+ if(orderedChildren.size() == 0) throw new NoSuchElementException();
+
+ for(String headNode : orderedChildren.values()){
+ String path = dir +"/"+headNode;
+ try{
+ byte[] data = zookeeper.getData(path, false, null);
+ zookeeper.delete(path, -1);
+ return data;
+ }catch(KeeperException.NoNodeException e){
+ // Another client deleted the node first.
+ }
+ }
+
+ }
+ }
+
+ private class LatchChildWatcher implements Watcher {
+
+ CountDownLatch latch;
+
+ public LatchChildWatcher(){
+ latch = new CountDownLatch(1);
+ }
+
+ public void process(WatchedEvent event){
+ LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
+ event.getState() + " type " + event.getType());
+ latch.countDown();
+ }
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+ }
+
+ /**
+ * Removes the head of the queue and returns it, blocks until it succeeds.
+ * @return The former head of the queue
+ * @throws NoSuchElementException
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] take() throws KeeperException, InterruptedException {
+ TreeMap<Long,String> orderedChildren;
+ // Same as for element. Should refactor this.
+ while(true){
+ LatchChildWatcher childWatcher = new LatchChildWatcher();
+ try{
+ orderedChildren = orderedChildren(childWatcher);
+ }catch(KeeperException.NoNodeException e){
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ continue;
+ }
+ if(orderedChildren.size() == 0){
+ childWatcher.await();
+ continue;
+ }
+
+ for(String headNode : orderedChildren.values()){
+ String path = dir +"/"+headNode;
+ try{
+ byte[] data = zookeeper.getData(path, false, null);
+ zookeeper.delete(path, -1);
+ return data;
+ }catch(KeeperException.NoNodeException e){
+ // Another client deleted the node first.
+ }
+ }
+ }
+ }
+
+ /**
+ * Inserts data into queue.
+ * @param data
+ * @return true if data was successfully added
+ */
+ public boolean offer(byte[] data) throws KeeperException, InterruptedException{
+ for(;;){
+ try{
+ zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
+ return true;
+ }catch(KeeperException.NoNodeException e){
+ zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);
+ }
+ }
+
+ }
+
+ /**
+ * Returns the data at the first element of the queue, or null if the queue is empty.
+ * @return data at the first element of the queue, or null.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] peek() throws KeeperException, InterruptedException{
+ try{
+ return element();
+ }catch(NoSuchElementException e){
+ return null;
+ }
+ }
+
+
+ /**
+ * Attempts to remove the head of the queue and return it. Returns null if the queue
is empty.
+ * @return Head of the queue or null.
+ * @throws KeeperException
+ * @throws InterruptedException
+ */
+ public byte[] poll() throws KeeperException, InterruptedException {
+ try{
+ return remove();
+ }catch(NoSuchElementException e){
+ return null;
+ }
+ }
+
+
+
+}
Added: hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java?rev=835560&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
(added)
+++ hadoop/zookeeper/trunk/src/recipes/queue/test/org/apache/zookeeper/recipes/queue/DistributedQueueTest.java
Thu Nov 12 21:27:47 2009
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.recipes.queue;
+
+import java.util.Calendar;
+import java.util.NoSuchElementException;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.recipes.queue.DistributedQueue;
+import org.apache.zookeeper.test.ClientBase;
+
+
+
+public class DistributedQueueTest extends ClientBase {
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ LOG.info("FINISHED " + getName());
+ }
+
+
+
+ public void testOffer1() throws Exception {
+ String dir = "/testOffer1";
+ String testString = "Hello World";
+ final int num_clients = 1;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ queueHandles[0].offer(testString.getBytes());
+
+ byte dequeuedBytes[] = queueHandles[0].remove();
+ assertEquals(new String(dequeuedBytes), testString);
+ }
+
+ public void testOffer2() throws Exception {
+ String dir = "/testOffer2";
+ String testString = "Hello World";
+ final int num_clients = 2;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ queueHandles[0].offer(testString.getBytes());
+
+ byte dequeuedBytes[] = queueHandles[1].remove();
+ assertEquals(new String(dequeuedBytes), testString);
+ }
+
+ public void testTake1() throws Exception {
+ String dir = "/testTake1";
+ String testString = "Hello World";
+ final int num_clients = 1;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ queueHandles[0].offer(testString.getBytes());
+
+ byte dequeuedBytes[] = queueHandles[0].take();
+ assertEquals(new String(dequeuedBytes), testString);
+ }
+
+
+
+ public void testRemove1() throws Exception{
+ String dir = "/testRemove1";
+ String testString = "Hello World";
+ final int num_clients = 1;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ try{
+ queueHandles[0].remove();
+ }catch(NoSuchElementException e){
+ return;
+ }
+ assertTrue(false);
+ }
+
+ public void createNremoveMtest(String dir,int n,int m) throws Exception{
+ String testString = "Hello World";
+ final int num_clients = 2;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ for(int i=0; i< n; i++){
+ String offerString = testString + i;
+ queueHandles[0].offer(offerString.getBytes());
+ }
+
+ byte data[] = null;
+ for(int i=0; i<m; i++){
+ data=queueHandles[1].remove();
+ }
+ assertEquals(new String(data), testString+(m-1));
+ }
+
+ public void testRemove2() throws Exception{
+ createNremoveMtest("/testRemove2",10,2);
+ }
+ public void testRemove3() throws Exception{
+ createNremoveMtest("/testRemove3",1000,1000);
+ }
+
+ public void createNremoveMelementTest(String dir,int n,int m) throws Exception{
+ String testString = "Hello World";
+ final int num_clients = 2;
+ ZooKeeper clients[] = new ZooKeeper[num_clients];
+ DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ for(int i=0; i< n; i++){
+ String offerString = testString + i;
+ queueHandles[0].offer(offerString.getBytes());
+ }
+
+ byte data[] = null;
+ for(int i=0; i<m; i++){
+ data=queueHandles[1].remove();
+ }
+ assertEquals(new String(queueHandles[1].element()), testString+m);
+ }
+
+ public void testElement1() throws Exception {
+ createNremoveMelementTest("/testElement1",1,0);
+ }
+
+ public void testElement2() throws Exception {
+ createNremoveMelementTest("/testElement2",10,2);
+ }
+
+ public void testElement3() throws Exception {
+ createNremoveMelementTest("/testElement3",1000,500);
+ }
+
+ public void testElement4() throws Exception {
+ createNremoveMelementTest("/testElement4",1000,1000-1);
+ }
+
+ public void testTakeWait1() throws Exception{
+ String dir = "/testTakeWait1";
+ final String testString = "Hello World";
+ final int num_clients = 1;
+ final ZooKeeper clients[] = new ZooKeeper[num_clients];
+ final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+
+ final byte[] takeResult[] = new byte[1][];
+ Thread takeThread = new Thread(){
+ public void run(){
+ try{
+ takeResult[0] = queueHandles[0].take();
+ }catch(KeeperException e){
+
+ }catch(InterruptedException e){
+
+ }
+ }
+ };
+ takeThread.start();
+
+ Thread.sleep(1000);
+ Thread offerThread= new Thread() {
+ public void run(){
+ try {
+ queueHandles[0].offer(testString.getBytes());
+ } catch (KeeperException e) {
+
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ offerThread.start();
+ offerThread.join();
+
+ takeThread.join();
+
+ assertTrue(takeResult[0] != null);
+ assertEquals(new String(takeResult[0]), testString);
+ }
+
+ public void testTakeWait2() throws Exception{
+ String dir = "/testTakeWait2";
+ final String testString = "Hello World";
+ final int num_clients = 1;
+ final ZooKeeper clients[] = new ZooKeeper[num_clients];
+ final DistributedQueue queueHandles[] = new DistributedQueue[num_clients];
+ for(int i=0; i < clients.length; i++){
+ clients[i] = createClient();
+ queueHandles[i] = new DistributedQueue(clients[i], dir, null);
+ }
+ int num_attempts =2;
+ for(int i=0; i< num_attempts; i++){
+ final byte[] takeResult[] = new byte[1][];
+ final String threadTestString = testString + i;
+ Thread takeThread = new Thread(){
+ public void run(){
+ try{
+ takeResult[0] = queueHandles[0].take();
+ }catch(KeeperException e){
+
+ }catch(InterruptedException e){
+
+ }
+ }
+ };
+ takeThread.start();
+
+ Thread.sleep(1000);
+ Thread offerThread= new Thread() {
+ public void run(){
+ try {
+ queueHandles[0].offer(threadTestString.getBytes());
+ } catch (KeeperException e) {
+
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+ offerThread.start();
+ offerThread.join();
+
+ takeThread.join();
+
+ assertTrue(takeResult[0] != null);
+ assertEquals(new String(takeResult[0]), threadTestString);
+ }
+ }
+}
+
|