Author: phunt
Date: Sat Jan 31 01:04:12 2009
New Revision: 739475
URL: http://svn.apache.org/viewvc?rev=739475&view=rev
Log:
ZOOKEEPER-215. expand system test environment
Added:
hadoop/zookeeper/trunk/src/java/systest/
hadoop/zookeeper/trunk/src/java/systest/README.txt (with props)
hadoop/zookeeper/trunk/src/java/systest/org/
hadoop/zookeeper/trunk/src/java/systest/org/apache/
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/DuplicateNameException.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/Instance.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAssignmentException.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAvailableContainers.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java (with props)
hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java (with props)
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/build.xml
hadoop/zookeeper/trunk/src/contrib/fatjar/build.xml
hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=739475&r1=739474&r2=739475&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Sat Jan 31 01:04:12 2009
@@ -136,6 +136,8 @@
ZOOKEEPER-260. document the recommended values for server id's
(mahadev via phunt)
+ ZOOKEEPER-215. expand system test environment (breed via phunt)
+
NEW FEATURES:
ZOOKEEPER-276. Bookkeeper contribution (Flavio and Luca Telloli via mahadev)
Modified: hadoop/zookeeper/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/build.xml?rev=739475&r1=739474&r2=739475&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/build.xml (original)
+++ hadoop/zookeeper/trunk/build.xml Sat Jan 31 01:04:12 2009
@@ -51,6 +51,7 @@
<property name="build.testclasses" value="${build.dir}/testclasses"/>
<property name="test.build.dir" value="${build.dir}/test" />
<property name="test.src.dir" value="${src.dir}/java/test"/>
+ <property name="systest.src.dir" value="${src.dir}/java/systest"/>
<property name="test.log.dir" value="${test.build.dir}/logs" />
<property name="test.data.dir" value="${test.build.dir}/data" />
<property name="test.data.upgrade.dir" value="${test.data.dir}/upgrade" />
@@ -252,6 +253,10 @@
target="${target.jdk}" debug="on">
<classpath refid="test.classpath"/>
</javac>
+ <javac srcdir="${systest.src.dir}" destdir="${build.testclasses}"
+ target="${target.jdk}" debug="on">
+ <classpath refid="test.classpath"/>
+ </javac>
</target>
<!-- ====================================================== -->
Modified: hadoop/zookeeper/trunk/src/contrib/fatjar/build.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/fatjar/build.xml?rev=739475&r1=739474&r2=739475&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/fatjar/build.xml (original)
+++ hadoop/zookeeper/trunk/src/contrib/fatjar/build.xml Sat Jan 31 01:04:12 2009
@@ -43,7 +43,6 @@
<!-- Override jar target to specify main class -->
<target name="jar" depends="checkMainCompiled, setjarname, compile">
<echo message="contrib: ${name}"/>
- <echo message="build classes: ${build.test}" />
<jar jarfile="${jarname}">
<manifest>
<attribute name="Main-Class" value="org.apache.zookeeper.util.FatJarMain" />
@@ -54,14 +53,14 @@
<attribute name="Implementation-Version" value="${revision}"/>
<attribute name="Implementation-Vendor" value="The Apache Software Foundation"/>
</manifest>
- <fileset file="${zk.root}/LICENSE.txt" />
- <!--fileset dir="${zk.root}/build/classes" excludes="**/.generated"/-->
- <!--fileset dir="${zk.root}/build/test"/-->
- <zipgroupfileset dir="${zk.root}/src/java/lib" includes="*.jar" />
+ <fileset file="${basedir}/conf/mainClasses" />
<fileset dir="${build.classes}"/>
<fileset dir="${build.test}"/>
+ <fileset file="${zk.root}/LICENSE.txt" />
<fileset file="${zk.root}/conf/log4j.properties" />
- <fileset file="${basedir}/conf/mainClasses" />
+ <fileset dir="${zk.root}/build/classes" excludes="**/.generated"/>
+ <fileset dir="${zk.root}/build/testclasses"/>
+ <zipgroupfileset dir="${zk.root}/src/java/lib" includes="*.jar" />
</jar>
</target>
Modified: hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses?rev=739475&r1=739474&r2=739475&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses (original)
+++ hadoop/zookeeper/trunk/src/contrib/fatjar/conf/mainClasses Sat Jan 31 01:04:12 2009
@@ -6,3 +6,5 @@
generateLoad:org.apache.zookeeper.test.GenerateLoad:A distributed load generator for testing
quorumBench:org.apache.zookeeper.server.QuorumBenchmark:A benchmark of just the quorum protocol
abBench:org.apache.zookeeper.server.quorum.AtomicBroadcastBenchmark:A benchmark of just the atomic broadcast
+ic:org.apache.zookeeper.test.system.InstanceContainer:A container that will instantiate classes as directed by an instance manager
+systest:org.apache.zookeeper.test.system.BaseSysTest:Start system test
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=739475&r1=739474&r2=739475&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Sat Jan 31 01:04:12 2009
@@ -530,6 +530,7 @@
connectTimeout = sessionTimeout / serverAddrs.size();
sessionId = conRsp.getSessionId();
sessionPasswd = conRsp.getPasswd();
+ zooKeeper.state = States.CONNECTED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null));
}
@@ -750,7 +751,6 @@
synchronized (this) {
k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
- zooKeeper.state = States.CONNECTED;
}
private void sendPing() {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=739475&r1=739474&r2=739475&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Sat Jan 31 01:04:12 2009
@@ -205,6 +205,11 @@
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception during shutdown", e);
}
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.warn("Selector closing", e);
+ }
if (zks != null) {
zks.shutdown();
}
Added: hadoop/zookeeper/trunk/src/java/systest/README.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/README.txt?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/README.txt (added)
+++ hadoop/zookeeper/trunk/src/java/systest/README.txt Sat Jan 31 01:04:12 2009
@@ -0,0 +1,42 @@
+To run the system test we need to create processing containers that we can
+spawn tasks, called Instances, in. (how is that for a dangling preposition!?!)
+Start up InstanceContainers first. Then run the system test. The system test
+finds the InstanceContainers and directs them through ZooKeeper, so you are
+going to need an instance of ZooKeeper running that they can all access.
+The easiest way to do all of this is to use the zookeeper fat jar.
+
+Steps to run system test
+------------------------
+1) transfer the fatjar from the release directory to all systems
+ participating in the test. fatjar is in contrib/fatjar directory.
+
+ (developers can generate by running "ant jar compile-test"
+ targets in trunk, then compiling using "ant jar" in src/contrib/jarjar)
+
+2) run a zookeeper standalone instance (cluster is ok too)
+
+e.g. java -jar zookeeper-<version>-fatjar.jar server <zoo.cfg>
+
+Note: you must provide zoo.cfg, see sample in conf directory
+
+3) on each host start the system test container
+
+e.g. java -jar zookeeper-<version>-fatjar.jar ic <name> <zkHostPort> /sysTest
+
+name : name of the test container - must be unique
+ typically it's a good idea to name after the host to aid debugging
+
+zkHostPort : the host:port of the server from step 2)
+
+4) initiate the system test using the fatjar:
+
+java -jar build/contrib/fatjar/zookeeper-dev-fatjar.jar systest org.apache.zookeeper.test.system.SimpleSysTest
+
+by default it will access the zk server started in 2) on localhost:2181
+
+or you can specify a remote host:port using
+ -DsysTest.zkHostPort=<host>:<port>,<host>:<port>,...
+
+java -DsysTest.zkHostPort=hostA:2181 -jar build/contrib/fatjar/zookeeper-dev-fatjar.jar systest org.apache.zookeeper.test.system.SimpleSysTest
+
+where hostA is running the zk server started in step 2) above
Propchange: hadoop/zookeeper/trunk/src/java/systest/README.txt
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.security.MessageDigest;
+import java.util.HashMap;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumStats;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.junit.runner.JUnitCore;
+
+public class BaseSysTest extends TestCase {
+ private static int fakeBasePort = 33222;
+ private static String zkHostPort;
+ protected String prefix = "/sysTest";
+ ZooKeeper zk;
+ static {
+ try {
+ zkHostPort = System.getProperty("sysTest.zkHostPort", InetAddress.getLocalHost().getCanonicalHostName() + ":2181");
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+ InstanceManager im;
+ @Override
+ protected void setUp() throws Exception {
+ if (!fakeMachines) {
+ String localHost = InetAddress.getLocalHost().getCanonicalHostName();
+ zk = new ZooKeeper(zkHostPort, 15000, new Watcher() {public void process(WatchedEvent e){}});
+ im = new InstanceManager(zk, prefix);
+ }
+ }
+ @Override
+ protected void tearDown() throws Exception {
+ im.close();
+ }
+
+ int serverCount = defaultServerCount;
+ int clientCount = defaultClientCount;
+ static int defaultServerCount = 5;
+ static int defaultClientCount = 7;
+ static {
+ defaultServerCount = Integer.parseInt(System.getProperty("simpleSysTest.defaultServerCount", Integer.toString(defaultServerCount)));
+ defaultClientCount = Integer.parseInt(System.getProperty("simpleSysTest.defaultClientCount", Integer.toString(defaultClientCount)));
+ }
+
+ String serverHostPort;
+ String quorumHostPort;
+ public String getHostPort() {
+ return serverHostPort;
+ }
+ public int getServerCount() {
+ return serverCount;
+ }
+ public int getClientCount() {
+ return clientCount;
+ }
+
+ public void startServers() throws IOException {
+ for(int i = 0; i < serverCount; i++) {
+ startServer(i);
+ }
+ }
+ public void stopServers() throws IOException {
+ for(int i = 0; i < serverCount; i++) {
+ stopServer(i);
+ }
+ }
+ public void startClients() throws IOException {
+ for(int i = 0; i < clientCount; i++) {
+ startClient(i);
+ }
+ }
+ public void stopClients() throws IOException {
+ for(int i = 0; i < clientCount; i++) {
+ stopClient(i);
+ }
+ }
+
+ private static boolean fakeMachines = System.getProperty("baseSysTest.fakeMachines", "no").equals("yes");
+
+ public void configureServers(int count) throws Exception {
+ serverCount = count;
+ if (fakeMachines) {
+ fakeConfigureServers(count);
+ } else {
+ distributedConfigureServers(count);
+ }
+ }
+
+ private void distributedConfigureServers(int count) throws IOException {
+ StringBuilder sbClient = new StringBuilder();
+ StringBuilder sbServer = new StringBuilder();
+ try {
+ for(int i = 0; i < count; i++) {
+ String r = QuorumPeerInstance.createServer(im, i);
+ if (i > 0) {
+ sbClient.append(',');
+ sbServer.append(',');
+ }
+ String parts[] = r.split(",");
+ sbClient.append(parts[0]);
+ sbServer.append(parts[1]);
+ }
+ serverHostPort = sbClient.toString();
+ quorumHostPort = sbServer.toString();
+ } catch(Exception e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ }
+ }
+
+ private QuorumPeer qps[];
+ private File qpsDirs[];
+ HashMap<Long,QuorumServer> peers;
+ private void fakeConfigureServers(int count) throws IOException {
+ peers = new HashMap<Long,QuorumServer>();
+ qps = new QuorumPeer[count];
+ qpsDirs = new File[count];
+ for(int i = 1; i <= count; i++) {
+ peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress("127.0.0.1", fakeBasePort + i)));
+ }
+ StringBuilder sb = new StringBuilder();
+ for(int i = 0; i < count; i++) {
+ qpsDirs[i] = File.createTempFile("sysTest", ".tmp");
+ qpsDirs[i].delete();
+ qpsDirs[i].mkdir();
+ int port = fakeBasePort+10+i;
+ if (sb.length() > 0) {
+ sb.append(',');
+ }
+ sb.append("localhost:");
+ sb.append(port);
+ }
+ serverHostPort = sb.toString();
+ }
+ final static int tickTime = 2000;
+ final static int initLimit = 3;
+ final static int syncLimit = 3;
+
+ public void startServer(int index) throws IOException {
+ int port = fakeBasePort+10+index;
+ if (fakeMachines) {
+ qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 0, index+1, tickTime, initLimit, syncLimit);
+ qps[index].start();
+ } else {
+ try {
+ QuorumPeerInstance.startInstance(im, quorumHostPort, index);
+ } catch(Exception e) {
+ IOException ioe = new IOException(e.getClass().getName() + ": " + e.getMessage());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ }
+ }
+ }
+ public void stopServer(int index) throws IOException {
+ if (fakeMachines) {
+ qps[index].shutdown();
+ } else {
+ try {
+ QuorumPeerInstance.stopInstance(im, index);
+ } catch(Exception e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.setStackTrace(e.getStackTrace());
+ throw ioe;
+ }
+ }
+ }
+
+ public void configureClients(int count, Class<? extends Instance> clazz, String params) throws Exception {
+ clientCount = count;
+ if (fakeMachines) {
+ fakeConfigureClients(count, clazz, params);
+ } else {
+ distributedConfigureClients(count, clazz, params);
+ }
+ }
+ private Class<? extends Instance> clazz;
+ String params;
+ private void distributedConfigureClients(int count, Class<? extends Instance> clazz, String params) throws IOException {
+ this.clazz = clazz;
+ this.params = params;
+
+ }
+ private Instance fakeBaseClients[];
+ private void fakeConfigureClients(int count, Class<? extends Instance> clazz, String params) throws IOException, ClassNotFoundException {
+ fakeBaseClients = new Instance[count];
+ for(int i = 0; i < count; i++) {
+ try {
+ fakeBaseClients[i] = clazz.newInstance();
+ } catch (InstantiationException e) {
+ e.printStackTrace();
+ return;
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ return;
+ }
+ fakeBaseClients[i].configure(i + " " + params);
+ }
+ }
+ public void startClient(int index) throws IOException {
+ if (fakeMachines) {
+ fakeStartClient(index);
+ } else {
+ distributedStartClient(index);
+ }
+ }
+ private void distributedStartClient(int index) throws IOException {
+ try {
+ im.assignInstance("client" + index, clazz, index + " " + params, 1);
+ } catch (Exception e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+ private void fakeStartClient(int index) {
+ fakeBaseClients[index].start();
+ }
+ public void stopClient(int index) throws IOException {
+ if (fakeMachines) {
+ fakeStopClient(index);
+ } else {
+ distributedStopClient(index);
+ }
+ }
+ private void distributedStopClient(int index) throws IOException {
+ try {
+ im.removeInstance("client"+index);
+ } catch (Exception e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+ private void fakeStopClient(int index) {
+ fakeBaseClients[index].stop();
+ }
+
+ static public void main(String args[]) {
+ JUnitCore.main(args);
+ }
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/BaseSysTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/DuplicateNameException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/DuplicateNameException.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/DuplicateNameException.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/DuplicateNameException.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+public class DuplicateNameException extends Exception {
+ private static final long serialVersionUID = 1L;
+ public DuplicateNameException(String mess) {
+ super(mess);
+ }
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/DuplicateNameException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/Instance.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/Instance.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/Instance.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/Instance.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This interface is implemented by a class that can be run in an
+ * instance container.
+ *
+ */
+public interface Instance {
+ /**
+ * This object is used to report back changes in status.
+ */
+ interface Reporter {
+ void report(String report) throws KeeperException, InterruptedException;
+ }
+ /**
+ * This will be the first method invoked by the InstanceContainer after
+ * an instance of this interface has been constructed. It will only be
+ * invoked once.
+ *
+ * @param r a handle to use to report on status changes.
+ */
+ void setReporter(Reporter r);
+ /**
+ * This will be the second method invoked by the InstanceContainer. It
+ * may be invoked again if the configuration changes.
+ *
+ * @param params parameters that were passed to the InstanceManager when
+ * this instance was scheduled.
+ */
+ void configure(String params);
+ /**
+ * Starts this instance.
+ */
+ void start();
+ /**
+ * Stops this instance.
+ */
+ void stop();
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/Instance.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.test.system.Instance.Reporter;
+
+/**
+ * This class starts up,
+ * @author breed
+ *
+ */
+public class InstanceContainer implements Watcher, AsyncCallback.ChildrenCallback {
+ private final class MyWatcher implements Watcher {
+ String myNode;
+ DataCallback dc;
+ MyWatcher(String myNode, DataCallback dc) {
+ this.myNode = myNode;
+ this.dc = dc;
+ }
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getPath() != null && event.getPath().equals(myNode)) {
+ zk.getData(myNode, this, dc, this);
+ }
+ }
+ }
+ private final class MyDataCallback implements DataCallback {
+ int lastVer;
+ String myNode;
+ Instance myInstance;
+
+ MyDataCallback(String myNode, Instance myInstance, int ver) {
+ this.myNode = myNode;
+ this.myInstance = myInstance;
+ lastVer = ver;
+ }
+ @Override
+ public void processResult(int rc, String path,
+ Object ctx, byte[] data, Stat stat) {
+ if (rc == KeeperException.Code.NONODE.intValue()) {
+ // we can just ignore because the child watcher takes care of this
+ return;
+ }
+ if (rc != KeeperException.Code.OK.intValue()) {
+ zk.getData(myNode, (Watcher)ctx, this, ctx);
+ }
+ int currVer = stat.getVersion();
+ if (currVer != lastVer) {
+ String parts[] = new String(data).split(" ", 2);
+ myInstance.configure(parts[1]);
+ lastVer = currVer;
+ }
+ }
+ }
+ private final class MyReporter implements Reporter {
+ String myReportNode;
+
+ public MyReporter(String child) {
+ myReportNode = reportsNode + '/' + child;
+ }
+
+ @Override
+ public void report(String report) throws KeeperException, InterruptedException {
+ for(int j = 0; j < maxTries; j++) {
+ try {
+ try {
+ zk.setData(myReportNode, report.getBytes(), -1);
+ } catch(NoNodeException e) {
+ zk.create(myReportNode, report.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ break;
+ } catch(ConnectionLossException e) {}
+ }
+ }
+ }
+ private static final Logger LOG = Logger.getLogger(InstanceContainer.class);
+ String name;
+ String zkHostPort;
+ // We only run if the readyNode exists
+ String prefixNode;
+ String statusNode = "available";
+ String reportsNode = "reports";
+ String assignmentsNode = "assignments";
+ ZooKeeper zk;
+ static final int sessTimeout = 5000;
+ static final int maxTries = 3;
+ public InstanceContainer(String name, String zkHostPort, String prefix) throws UnknownHostException {
+ if (name.length() == 0 || name.equals("hostname")) {
+ name = InetAddress.getLocalHost().getCanonicalHostName();
+ }
+ this.name = name;
+ this.zkHostPort = zkHostPort;
+ this.prefixNode = prefix;
+ this.statusNode = prefix + '/' + this.statusNode + '/' + name;
+ this.reportsNode = prefix + '/' + this.reportsNode;
+ this.assignmentsNode = prefix + '/' + this.assignmentsNode + '/' + name;
+ }
+
+ private void rmnod(String path) throws InterruptedException, KeeperException {
+ KeeperException lastException = null;
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ zk.delete(path, -1);
+ lastException = null;
+ break;
+ } catch (KeeperException.NoNodeException e) {
+ // cool this is what we want
+ break;
+ } catch (KeeperException e) {
+ lastException = e;
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
+ }
+ private void mknod_inner(String path, CreateMode mode) throws KeeperException, InterruptedException {
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+ break;
+ } catch (NodeExistsException e) {
+ if (mode != CreateMode.EPHEMERAL) {
+ return;
+ }
+ Stat stat = zk.exists(path, false);
+ if (stat == null) {
+ continue;
+ }
+ if (stat.getEphemeralOwner() != zk.getSessionId()) {
+ throw e;
+ }
+ break;
+ } catch (ConnectionLossException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void mknod(String path, CreateMode mode) throws KeeperException, InterruptedException {
+ String subpath[] = path.split("/");
+ StringBuilder sb = new StringBuilder();
+ // We start at 1 because / will create an empty part first
+ for(int i = 1; i < subpath.length; i++) {
+ sb.append("/");
+ sb.append(subpath[i]);
+ CreateMode m = CreateMode.PERSISTENT;
+ if (i == subpath.length-1) {
+ m = mode;
+ }
+ mknod_inner(sb.toString(), m);
+ }
+ }
+
+ public void run() throws IOException, InterruptedException, KeeperException {
+ zk = new ZooKeeper(zkHostPort, sessTimeout, this);
+ mknod(assignmentsNode, CreateMode.PERSISTENT);
+ mknod(statusNode, CreateMode.EPHEMERAL);
+ mknod(reportsNode, CreateMode.PERSISTENT);
+ // Now we just start watching the assignments directory
+ zk.getChildren(assignmentsNode, true, this, null);
+ }
+
+ /**
+ * @param args the first parameter is the instance name, the second
+ * is the ZooKeeper spec. if the instance name is the empty string
+ * or "hostname", the hostname will be used.
+ * @throws InterruptedException
+ * @throws IOException
+ * @throws UnknownHostException
+ * @throws KeeperException
+ */
+ public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException, KeeperException {
+ if (args.length != 3) {
+ System.err.println("USAGE: " + InstanceContainer.class.getName() + " name zkHostPort znodePrefix");
+ System.exit(2);
+ }
+ new InstanceContainer(args[0], args[1], args[2]).run();
+ while(true) {
+ Thread.sleep(1000);
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (KeeperState.Expired == event.getState()) {
+ // It's all over
+ LOG.fatal("Lost session");
+ System.exit(4);
+ }
+ if (event.getPath() != null && event.getPath().equals(assignmentsNode)) {
+ // children have changed, so read in the new list
+ zk.getChildren(assignmentsNode, true, this, null);
+ }
+ }
+
+ HashMap<String, Instance> instances = new HashMap<String, Instance>();
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ List<String> children) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ // try it again
+ zk.getChildren(assignmentsNode, true, this, null);
+ return;
+ }
+ HashMap<String, Instance> newList = new HashMap<String, Instance>();
+ // check for differences
+ Stat stat = new Stat();
+ for(String child: children) {
+ Instance i = instances.remove(child);
+ if (i == null) {
+ // Start up a new instance
+ byte data[] = null;
+ String myNode = assignmentsNode + '/' + child;
+ while(true) {
+ try {
+ data = zk.getData(myNode, true, stat);
+ break;
+ } catch (NoNodeException e) {
+ // The node doesn't exist anymore, so skip it
+ break;
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ if (data != null) {
+ String instanceSpec = new String(data);
+ int spaceIndex = instanceSpec.indexOf(' ');
+ String clazz;
+ String conf;
+ if (spaceIndex == -1) {
+ clazz = instanceSpec;
+ conf = null;
+ } else {
+ clazz = instanceSpec.substring(0, spaceIndex);
+ conf = instanceSpec.substring(spaceIndex+1);
+ }
+ try {
+ Class c = Class.forName(clazz);
+ i = (Instance)c.newInstance();
+ Reporter reporter = new MyReporter(child);
+ i.setReporter(reporter);
+ i.configure(conf);
+ i.start();
+ newList.put(child, i);
+ int ver = stat.getVersion();
+ Instance myInstance = i;
+ DataCallback dc = new MyDataCallback(myNode, myInstance, ver);
+ Watcher watcher = new MyWatcher(myNode, dc);
+ zk.getData(myNode, watcher, dc, watcher);
+ } catch (Exception e) {
+ LOG.warn("Skipping " + child, e);
+ }
+
+ }
+ } else {
+ // just move it to the new list
+ newList.put(child, i);
+ }
+ }
+ // kill anything that was removed for the children
+ for(Map.Entry<String,Instance> i: instances.entrySet()) {
+ i.getValue().stop();
+ try {
+ rmnod(reportsNode + '/' + i.getKey());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+ }
+ instances = newList;
+ }
+
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceContainer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * This class doles out assignments to InstanceContainers that are registered to
+ * a ZooKeeper znode. The znode will have four child nodes:
+ * * ready: this znode indicates that the InstanceManager is running
+ * * available: the children of this znode are ephemeral nodes representing
+ * running InstanceContainers
+ * * assignments: there will be a child under this znode for each available
+ * InstanceContainer. those znodes will have a child for each
+ * assigned instance
+ * * reports: there will be a child under this znode for each instance that is
+ * running. it will have the report string from the instance.
+ */
+public class InstanceManager implements AsyncCallback.ChildrenCallback, Watcher {
+ final private static Logger LOG = Logger.getLogger(InstanceManager.class);
+ private ZooKeeper zk;
+ private String prefixNode;
+ private String reportsNode = "reports";
+ private String readyNode = "ready";
+ private String assignmentsNode = "assignments";
+ private String statusNode = "available";
+ private static final int maxTries = 3;
+ private static final class Assigned {
+ String container;
+ String instance;
+ int weight;
+ Assigned(String container, String instance, int weight) {
+ this.container = container;
+ this.instance = instance;
+ this.weight = weight;
+ }
+ }
+ private HashMap<String, HashSet<Assigned>> assignments = new HashMap<String, HashSet<Assigned>>();
+ private HashMap<String, Assigned> instanceToAssignment = new HashMap<String, Assigned>();
+ public InstanceManager(ZooKeeper zk, String prefix) throws KeeperException, InterruptedException {
+ this.zk = zk;
+ this.prefixNode = prefix;
+ this.readyNode = prefix + '/' + this.readyNode;
+ this.assignmentsNode = prefix + '/' + this.assignmentsNode;
+ this.reportsNode = prefix + '/' + this.reportsNode;
+ this.statusNode = prefix + '/' + this.statusNode;
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ setupNodes(zk);
+ break;
+ } catch(ConnectionLossException e) {}
+ }
+ ConnectionLossException lastException = null;
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ List<String> children = zk.getChildren(statusNode, this);
+ processResult(0, statusNode, null, children);
+ lastException = null;
+ break;
+ } catch(ConnectionLossException e) {
+ lastException = e;
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
+ }
+ private void setupNodes(ZooKeeper zk) throws KeeperException,
+ InterruptedException {
+ try {
+ zk.create(prefixNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch(NodeExistsException e) { /* this is ok */ }
+ try {
+ zk.create(assignmentsNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch(NodeExistsException e) { /* this is ok */ }
+ try {
+ zk.create(statusNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch(NodeExistsException e) { /* this is ok */ }
+ try {
+ zk.create(reportsNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch(NodeExistsException e) { /* this is ok */ }
+ try {
+ zk.create(readyNode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch(NodeExistsException e) { /* this is ok */ }
+ }
+ @Override
+ synchronized public void processResult(int rc, String path, Object ctx,
+ List<String> children) {
+ if (rc != KeeperException.Code.OK.intValue()) {
+ zk.getChildren(statusNode, this, this, null);
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got " + children + " children from " + path);
+ }
+ HashMap<String, HashSet<Assigned>> newAssignments = new HashMap<String, HashSet<Assigned>>();
+ for(String c: children) {
+ HashSet<Assigned> a = assignments.remove(c);
+ if (a != null) {
+ newAssignments.put(c, a);
+ } else {
+ newAssignments.put(c, new HashSet<Assigned>());
+ }
+ }
+ // Clean up the dead machines
+ for(String dead: assignments.keySet()) {
+ try {
+ removeInstance(dead);
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ assignments = newAssignments;
+ }
+ private void removeAssignmentNode(String dead) throws KeeperException, InterruptedException {
+ String deadNode = assignmentsNode + '/' + dead;
+ List<String> children = zk.getChildren(deadNode, false);
+ for(String c: children) {
+ zk.delete(deadNode + '/' + c, -1);
+ }
+ try {
+ zk.delete(deadNode, -1);
+ } catch(NoNodeException e) { /* this is ok */ }
+ }
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getPath().equals(statusNode)) {
+ zk.getChildren(statusNode, this, this, null);
+ }
+ }
+ synchronized public String assignInstance(String name, Class<? extends Instance> clazz, String params, int weight) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException {
+ if (weight < 1) {
+ // if the weights are not above zero, things will get messed up
+ weight = 1;
+ }
+ String instanceSpec = clazz.getName() + ' ' + params;
+ if (instanceToAssignment.get(name) != null) {
+ throw new DuplicateNameException(name + " already exists");
+ }
+ // find most idle node
+ String mostIdle = null;
+ int mostIdleWeight = Integer.MAX_VALUE;
+ for(Entry<String, HashSet<Assigned>> e: assignments.entrySet()) {
+ int w = 0;
+ for(Assigned a: e.getValue()) {
+ w += a.weight;
+ }
+ if (w < mostIdleWeight) {
+ mostIdleWeight = w;
+ mostIdle = e.getKey();
+ }
+ }
+ if (mostIdle == null) {
+ throw new NoAvailableContainers("No available containers");
+ }
+ Assigned a = new Assigned(mostIdle, name, weight);
+ instanceToAssignment.put(name, a);
+ HashSet<Assigned> as = assignments.get(mostIdle);
+ if (as == null) {
+ as = new HashSet<Assigned>();
+ assignments.put(mostIdle, as);
+ }
+ as.add(a);
+ KeeperException lastException = null;
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ zk.create(assignmentsNode + '/' + mostIdle + '/' + name, instanceSpec.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return mostIdle;
+ } catch(NodeExistsException e) {
+ return mostIdle;
+ } catch (KeeperException e) {
+ lastException = e;
+ }
+ }
+ throw lastException;
+ }
+
+ public void reconfigureInstance(String name, String params) throws NoAssignmentException, InterruptedException, KeeperException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reconfiguring " + name + " with " + params);
+ }
+ Assigned assigned = instanceToAssignment.get(name);
+ if (assigned == null) {
+ throw new NoAssignmentException();
+ }
+ KeeperException lastException = null;
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ zk.setData(assignmentsNode + '/' + assigned.container + '/' + name, ("update " + params).getBytes(), -1);
+ break;
+ } catch (ConnectionLossException e) {
+ lastException = e;
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
+ }
+
+ private void doDelete(String path) throws InterruptedException, KeeperException {
+ KeeperException lastException = null;
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ zk.delete(path, -1);
+ return;
+ } catch(NoNodeException e) {
+ return;
+ } catch (KeeperException e) {
+ lastException = e;
+ }
+ }
+ throw lastException;
+ }
+ synchronized public void removeInstance(String name) throws InterruptedException, KeeperException {
+ Assigned assigned = instanceToAssignment.remove(name);
+ if (assigned == null) {
+ return;
+ }
+ assignments.get(assigned.container).remove(name);
+ doDelete(assignmentsNode + '/' + assigned.container + '/' + name);
+ doDelete(reportsNode + '/' + name);
+ }
+
+ synchronized boolean isAlive(String name) {
+ return instanceToAssignment.get(name) != null;
+ }
+
+ public void resetStatus(String name) throws InterruptedException, KeeperException {
+ KeeperException lastException = null;
+ for(int i = 0; i < maxTries; i++) {
+ try {
+ zk.delete(reportsNode + '/' + name, -1);
+ lastException = null;
+ break;
+ } catch(ConnectionLossException e) {
+ lastException = e;
+ } catch(NoNodeException e) {
+ // great this is what we want!
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
+ }
+
+ public String getStatus(String name, long timeout) throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+ byte data[] = null;
+ long endTime = System.currentTimeMillis() + timeout;
+ KeeperException lastException = null;
+ for(int i = 0; i < maxTries && endTime > System.currentTimeMillis(); i++) {
+ try {
+ data = zk.getData(reportsNode + '/' + name, false, stat);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got Data: " + ((data == null) ? "null" : new String(data)));
+ }
+ lastException = null;
+ break;
+ } catch(ConnectionLossException e) {
+ lastException = e;
+ } catch(NoNodeException e) {
+ final Object eventObj = new Object();
+ synchronized(eventObj) {
+ // wait for the node to appear
+ Stat eStat = zk.exists(reportsNode + '/' + name, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized(eventObj) {
+ eventObj.notifyAll();
+ }
+ }});
+ if (eStat == null) {
+ eventObj.wait(endTime - System.currentTimeMillis());
+ }
+ }
+ lastException = e;
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
+ return new String(data);
+ }
+ synchronized public void close() throws InterruptedException {
+ for(String name: instanceToAssignment.keySet().toArray(new String[0])) {
+ try {
+ removeInstance(name);
+ } catch(KeeperException e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ doDelete(readyNode);
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ }
+ }
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/InstanceManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAssignmentException.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAssignmentException.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAssignmentException.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAssignmentException.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+public class NoAssignmentException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAssignmentException.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAvailableContainers.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAvailableContainers.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAvailableContainers.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAvailableContainers.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+public class NoAvailableContainers extends Exception {
+ public NoAvailableContainers(String string) {
+ super(string);
+ }
+
+ private static final long serialVersionUID = 1L;
+
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/NoAvailableContainers.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+
+class QuorumPeerInstance implements Instance {
+ final private static Logger LOG = Logger.getLogger(QuorumPeerInstance.class);
+ private static final int syncLimit = 3;
+ private static final int initLimit = 3;
+ private static final int tickTime = 2000;
+ String serverHostPort;
+ int serverId;
+ Reporter r;
+ QuorumPeer peer;
+
+ @Override
+ public void setReporter(Reporter r) {
+ this.r = r;
+ }
+
+ InetSocketAddress clientAddr;
+ InetSocketAddress quorumAddr;
+ HashMap<Long, QuorumServer> peers;
+ File dir;
+
+ public QuorumPeerInstance() {
+ try {
+ dir = File.createTempFile("test", ".dir");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ dir.delete();
+ dir.mkdir();
+ }
+
+ @Override
+ public void configure(String params) {
+ if (clientAddr == null) {
+ // The first time we are configured, it is just to tell
+ // us which machine we are
+ serverId = Integer.parseInt(params);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Setting up server " + serverId);
+ }
+ // Let's grab two ports
+ try {
+ ServerSocket ss = new ServerSocket(0, 1, InetAddress.getLocalHost());
+ clientAddr = (InetSocketAddress) ss.getLocalSocketAddress();
+ ss.close();
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ ServerSocket ss = new ServerSocket(0, 1, InetAddress.getLocalHost());
+ quorumAddr = (InetSocketAddress) ss.getLocalSocketAddress();
+ ss.close();
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ String report = clientAddr.getHostName() + ':' + clientAddr.getPort() +
+ ',' + quorumAddr.getHostName() + ':' + quorumAddr.getPort();
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reporting " + report);
+ }
+ r.report(report);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return;
+ } else {
+ int spaceIndex = params.indexOf(' ');
+ if (spaceIndex == -1) {
+ LOG.warn("looking for host:port,... start|stop, but found " + params);
+ return;
+ }
+ String quorumSpecs = params.substring(0, spaceIndex);
+ String cmd = params.substring(spaceIndex+1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running command: " + cmd);
+ }
+ if (!cmd.equals("start")) {
+ if (peer != null) {
+ peer.shutdown();
+ }
+ peer = null;
+ try {
+ for(int i = 0; i < 5; i++) {
+ Thread.sleep(500);
+ try {
+ // Wait until we can't connect
+ new Socket("127.0.0.1", clientAddr.getPort()).close();
+ } catch(IOException e) { break; }
+ }
+ r.report("stopped");
+ } catch (Exception e) {
+ LOG.error("Unhandled error", e);
+ }
+ return;
+ }
+ String parts[] = quorumSpecs.split(",");
+ peers = new HashMap<Long,QuorumServer>();
+ for(int i = 0; i < parts.length; i++) {
+ String subparts[] = parts[i].split(":");
+ peers.put(Long.valueOf(i), new QuorumServer(i, new InetSocketAddress(subparts[0], Integer.parseInt(subparts[1]))));
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting quorumPeer " + serverId + " on port " + clientAddr.getPort());
+ }
+ if (peer != null) {
+ LOG.warn("Peer " + serverId + " already started");
+ return;
+ }
+ peer = new QuorumPeer(peers, dir, dir, clientAddr.getPort(), 0, serverId, tickTime, initLimit, syncLimit);
+ peer.start();
+ for(int i = 0; i < 5; i++) {
+ Thread.sleep(500);
+ try {
+ // Wait until we can connect
+ new Socket("127.0.0.1", clientAddr.getPort()).close();
+ break;
+ } catch(IOException e) {}
+ }
+ r.report("started");
+ } catch (Exception e) {
+ LOG.error("Unhandled exception", e);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ }
+
+ static private void recursiveDelete(File dir) {
+ if (!dir.isDirectory()) {
+ dir.delete();
+ return;
+ }
+ for(File f: dir.listFiles()) {
+ recursiveDelete(f);
+ }
+ dir.delete();
+ }
+ @Override
+ public void stop() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping peer " + serverId);
+ }
+ if (peer != null) {
+ peer.shutdown();
+ }
+ recursiveDelete(dir);
+ }
+
+ /**
+ * This method is used to configure a QuorumPeerInstance
+ *
+ * @param im the InstanceManager that will be managing the new instance
+ * @param i the server number to configure (should be zero based)
+ * @throws NoAvailableContainers
+ * @throws DuplicateNameException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ public static String createServer(InstanceManager im, int i) throws NoAvailableContainers, DuplicateNameException, InterruptedException, KeeperException {
+ im.assignInstance("server"+i, QuorumPeerInstance.class, Integer.toString(i), 50);
+ return im.getStatus("server"+i, 3000);
+
+ }
+
+ /**
+ * Start an instance of the quorumPeer.
+ * @param im the manager of the instance
+ * @param quorumHostPort the comma-separated list of host:port pairs of quorum peers
+ * @param index the zero based index of the server to start.
+ * @throws InterruptedException
+ * @throws KeeperException
+ * @throws NoAssignmentException
+ */
+ public static void startInstance(InstanceManager im, String quorumHostPort, int index) throws InterruptedException, KeeperException, NoAssignmentException {
+ im.resetStatus("server" + index);
+ im.reconfigureInstance("server"+index, quorumHostPort + " start");
+ im.getStatus("server" + index, 5000);
+ }
+
+ /**
+ * Stop an instance of the quorumPeer
+ * @param im the manager of the instance
+ * @param index the zero based index fo the server to stop
+ * @throws InterruptedException
+ * @throws KeeperException
+ * @throws NoAssignmentException
+ */
+ public static void stopInstance(InstanceManager im, int index) throws InterruptedException, KeeperException, NoAssignmentException {
+ im.resetStatus("server" + index);
+ im.reconfigureInstance("server"+index, Integer.toString(index) + " stop");
+ im.getStatus("server" + index, 3000);
+
+ }
+
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/QuorumPeerInstance.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * The client that gets spawned for the SimpleSysTest
+ *
+ */
+public class SimpleClient implements Instance, Watcher, AsyncCallback.DataCallback, StringCallback, StatCallback {
+ private static final long serialVersionUID = 1L;
+ String hostPort;
+ ZooKeeper zk;
+ transient int index;
+ transient String myPath;
+ byte data[];
+ boolean createdEphemeral;
+ @Override
+ public void configure(String params) {
+ String parts[] = params.split(" ");
+ hostPort = parts[1];
+ this.index = Integer.parseInt(parts[0]);
+ myPath = "/simpleCase/" + index;
+ }
+ @Override
+ public void start() {
+ try {
+ zk = new ZooKeeper(hostPort, 15000, this);
+ zk.getData("/simpleCase", true, this, null);
+ r.report("Client " + index + " connecting to " + hostPort);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ if (zk != null) {
+ zk.close();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getPath() != null && event.getPath().equals("/simpleCase")) {
+ zk.getData("/simpleCase", true, this, null);
+ }
+ }
+ @Override
+ public void processResult(int rc, String path, Object ctx, byte[] data,
+ Stat stat) {
+ if (rc != 0) {
+ zk.getData("/simpleCase", true, this, null);
+ } else {
+ this.data = data;
+ String content = new String(data);
+ if (content.equals("die")) {
+ this.stop();
+ return;
+ }
+ if (!createdEphemeral) {
+ zk.create(myPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, this, null);
+ createdEphemeral = true;
+ } else {
+ zk.setData(myPath, data, -1, this, null);
+ }
+ }
+ }
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ if (rc != 0) {
+ zk.create(myPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, this, null);
+ }
+ }
+ @Override
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ if (rc != 0) {
+ zk.setData(myPath, data, -1, this, null);
+ }
+ }
+ @Override
+ public String toString() {
+ return SimpleClient.class.getName() + "[" + index + "] using " + hostPort;
+ }
+
+ Reporter r;
+ @Override
+ public void setReporter(Reporter r) {
+ this.r = r;
+ }
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java?rev=739475&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java Sat Jan 31 01:04:12 2009
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test.system;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Test;
+
+/**
+ * This does a basic system test. It starts up an ensemble of servers and a set of clients.
+ * It makes sure that all the clients come up. It kills off servers while making a change and
+ * then ensures that all clients see the change. And then signals the clients to die and
+ * watches them disappear.
+ *
+ */
+public class SimpleSysTest extends BaseSysTest implements Watcher {
+ int maxTries = 10;
+ boolean connected;
+ final private static Logger LOG = Logger.getLogger(SimpleSysTest.class);
+
+ synchronized private boolean waitForConnect(ZooKeeper zk, long timeout) throws InterruptedException {
+ connected = (zk.getState() == States.CONNECTED);
+ long end = System.currentTimeMillis() + timeout;
+ while(!connected && end > System.currentTimeMillis()) {
+ wait(timeout);
+ connected = (zk.getState() == States.CONNECTED);
+ }
+ return connected;
+ }
+
+ /**
+ * This test checks the following:
+ * 1) All clients connect successfully
+ * 2) Half of the servers die (assuming odd number) and a write succeeds
+ * 3) All servers are restarted and cluster stays alive
+ * 4) Clients see a change by the server
+ * 5) Clients' ephemeral nodes are cleaned up
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleCase() throws Exception {
+ configureServers(serverCount);
+ configureClients(clientCount, SimpleClient.class, getHostPort());
+ Stat stat = new Stat();
+ startServers();
+ LOG.debug("Connecting to " + getHostPort());
+ ZooKeeper zk = new ZooKeeper(getHostPort(), 15000, this);
+ waitForConnect(zk, 10000);
+ zk.create("/simpleCase", "orig".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ startClients();
+
+ // Check that all clients connect properly
+ for(int i = 0; i < getClientCount(); i++) {
+ for(int j = 0; j < maxTries; j++) {
+ try {
+ byte b[] = zk.getData("/simpleCase/" + i, false, stat);
+ assertEquals("orig", new String(b));
+ } catch(NoNodeException e) {
+ if (j+1 == maxTries) {
+ fail("Max tries exceeded on client " + i);
+ }
+ Thread.sleep(1000);
+ }
+ }
+ }
+
+ // Kill half the servers, make a change, restart the dead
+ // servers, and then bounce the other servers one by one
+ for(int i = 0; i < getServerCount(); i++) {
+ stopServer(i);
+ if (i+1 > getServerCount()/2) {
+ startServer(i);
+ } else if (i+1 == getServerCount()/2) {
+ assertTrue("Connection didn't recover", waitForConnect(zk, 10000));
+ try {
+ zk.setData("/simpleCase", "new".getBytes(), -1);
+ } catch(ConnectionLossException e) {
+ assertTrue("Connection didn't recover", waitForConnect(zk, 10000));
+ zk.setData("/simpleCase", "new".getBytes(), -1);
+ }
+ for(int j = 0; j < i; j++) {
+ LOG.info("Starting server " + j);
+ startServer(i);
+ }
+ }
+ }
+ Thread.sleep(100); // wait for things to stabilize
+ assertTrue("Servers didn't bounce", waitForConnect(zk, 15000));
+ try {
+ zk.getData("/simpleCase", false, stat);
+ } catch(ConnectionLossException e) {
+ assertTrue("Servers didn't bounce", waitForConnect(zk, 15000));
+ }
+
+ // check that the change has propagated to everyone
+ for(int i = 0; i < getClientCount(); i++) {
+ for(int j = 0; j < maxTries; j++) {
+ byte data[] = zk.getData("/simpleCase/" + i, false, stat);
+ if (new String(data).equals("new")) {
+ break;
+ }
+ if (j+1 == maxTries) {
+ fail("max tries exceeded for " + i);
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ // send out the kill signal
+ zk.setData("/simpleCase", "die".getBytes(), -1);
+
+ // watch for everyone to die
+ for(int i = 0; i < getClientCount(); i++) {
+ try {
+ for(int j = 0; j < maxTries; j++) {
+ zk.getData("/simpleCase/" + i, false, stat);
+ if (j+1 == maxTries) {
+ fail("max tries exceeded waiting for child " + i + " to die");
+ }
+ Thread.sleep(200);
+ }
+ } catch(NoNodeException e) {
+ // Great this is what we were hoping for!
+ }
+ }
+
+ stopClients();
+ stopServers();
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected) {
+ synchronized(this) {
+ connected = true;
+ notifyAll();
+ }
+ } else if (event.getState() == KeeperState.Disconnected) {
+ synchronized(this) {
+ connected = false;
+ notifyAll();
+ }
+ }
+ }
+}
Propchange: hadoop/zookeeper/trunk/src/java/systest/org/apache/zookeeper/test/system/SimpleSysTest.java
------------------------------------------------------------------------------
svn:eol-style = native
|