Author: breed
Date: Fri Sep 5 13:30:49 2008
New Revision: 692534
URL: http://svn.apache.org/viewvc?rev=692534&view=rev
Log:
ZOOKEEPER-112 src/java/main ZooKeeper.java has test code embedded into it.
Added:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
Modified:
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=692534&r1=692533&r2=692534&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Fri Sep 5 13:30:49
2008
@@ -70,7 +70,7 @@
* connected to as needed.
*
*/
-class ClientCnxn {
+public class ClientCnxn {
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
private ArrayList<InetSocketAddress> serverAddrs =
@@ -112,6 +112,8 @@
private final ZooKeeper zooKeeper;
+ private final Watcher watcher;
+
private long sessionId;
private byte sessionPasswd[] = new byte[16];
@@ -203,9 +205,11 @@
}
}
- public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper)
- throws IOException {
- this(hosts, sessionTimeout, zooKeeper, 0, new byte[16]);
+ public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
+ Watcher watcher)
+ throws IOException
+ {
+ this(hosts, sessionTimeout, zooKeeper, watcher, 0, new byte[16]);
}
/**
@@ -222,8 +226,11 @@
* @throws IOException
*/
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
- long sessionId, byte[] sessionPasswd) throws IOException {
+ Watcher watcher, long sessionId, byte[] sessionPasswd)
+ throws IOException
+ {
this.zooKeeper = zooKeeper;
+ this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
String hostsList[] = hosts.split(",");
@@ -273,7 +280,7 @@
break;
}
if (event instanceof WatcherEvent) {
- zooKeeper.processWatchEvent((WatcherEvent) event);
+ watcher.process((WatcherEvent) event);
} else {
Packet p = (Packet) event;
int rc = 0;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=692534&r1=692533&r2=692534&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Fri Sep 5 13:30:49
2008
@@ -114,77 +114,86 @@
new HashMap<String, Set<Watcher>>();
/**
- * Process a WatchEvent.
- *
- * Looks up the watch in the set of watches, processes the event
- * if found, otw uses the default watcher (registered during instance
- * creation) to process the watch.
- *
- * @param event the event to process.
- */
- public void processWatchEvent(WatcherEvent event) {
- // clear the watches if we are not connected
- if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
- synchronized (dataWatches) {
- for (Set<Watcher> watchers : dataWatches.values()) {
- for (Watcher watcher : watchers) {
- watcher.process(event);
+ * Process watch events generated by the ClientCnxn object.
+ *
+ * We are implementing this as a nested class of ZooKeeper so that
+ * the public Watcher.process(event) method will not be exposed as part
+ * of the ZooKeeper client API.
+ */
+ private class ZKWatcher implements Watcher {
+ /**
+ * Process a WatchEvent.
+ *
+ * Looks up the watch in the set of watches, processes the event
+ * if found, otw uses the default watcher (registered during instance
+ * creation) to process the watch.
+ *
+ * @param event the event to process.
+ */
+ public void process(WatcherEvent event) {
+ // clear the watches if we are not connected
+ if (event.getState() != Watcher.Event.KeeperStateSyncConnected) {
+ synchronized (dataWatches) {
+ for (Set<Watcher> watchers : dataWatches.values()) {
+ for (Watcher watcher : watchers) {
+ watcher.process(event);
+ }
}
+ dataWatches.clear();
}
- dataWatches.clear();
- }
- synchronized (childWatches) {
- for (Set<Watcher> watchers : childWatches.values()) {
- for (Watcher watcher : watchers) {
- watcher.process(event);
+ synchronized (childWatches) {
+ for (Set<Watcher> watchers : childWatches.values()) {
+ for (Watcher watcher : watchers) {
+ watcher.process(event);
+ }
}
+ childWatches.clear();
}
- childWatches.clear();
- }
- }
-
- Set<Watcher> watchers = null;
-
- switch (event.getType()) {
- case Watcher.Event.EventNone:
- defaultWatcher.process(event);
- return;
- case Watcher.Event.EventNodeDataChanged:
- case Watcher.Event.EventNodeCreated:
- synchronized (dataWatches) {
- watchers = dataWatches.remove(event.getPath());
- }
- break;
- case Watcher.Event.EventNodeChildrenChanged:
- synchronized (childWatches) {
- watchers = childWatches.remove(event.getPath());
}
- break;
- case Watcher.Event.EventNodeDeleted:
- synchronized (dataWatches) {
- watchers = dataWatches.remove(event.getPath());
- }
- Set<Watcher> cwatches;
- synchronized (childWatches) {
- cwatches = childWatches.remove(event.getPath());
- }
- if (cwatches != null) {
- if (watchers == null) {
- watchers = cwatches;
- } else {
- watchers.addAll(cwatches);
+
+ Set<Watcher> watchers = null;
+
+ switch (event.getType()) {
+ case Watcher.Event.EventNone:
+ defaultWatcher.process(event);
+ return;
+ case Watcher.Event.EventNodeDataChanged:
+ case Watcher.Event.EventNodeCreated:
+ synchronized (dataWatches) {
+ watchers = dataWatches.remove(event.getPath());
+ }
+ break;
+ case Watcher.Event.EventNodeChildrenChanged:
+ synchronized (childWatches) {
+ watchers = childWatches.remove(event.getPath());
}
+ break;
+ case Watcher.Event.EventNodeDeleted:
+ synchronized (dataWatches) {
+ watchers = dataWatches.remove(event.getPath());
+ }
+ Set<Watcher> cwatches;
+ synchronized (childWatches) {
+ cwatches = childWatches.remove(event.getPath());
+ }
+ if (cwatches != null) {
+ if (watchers == null) {
+ watchers = cwatches;
+ } else {
+ watchers.addAll(cwatches);
+ }
+ }
+ break;
+ default:
+ String msg = "Unhandled watch event type " + event.getType();
+ LOG.error(msg);
+ throw new RuntimeException(msg);
}
- break;
- default:
- String msg = "Unhandled watch event type " + event.getType();
- LOG.error(msg);
- throw new RuntimeException(msg);
- }
-
- if (watchers != null) {
- for (Watcher watcher : watchers) {
- watcher.process(event);
+
+ if (watchers != null) {
+ for (Watcher watcher : watchers) {
+ watcher.process(event);
+ }
}
}
}
@@ -257,19 +266,19 @@
volatile States state;
- ClientCnxn cnxn;
+ protected ClientCnxn cnxn;
public ZooKeeper(String host, int sessionTimeout, Watcher watcher)
throws IOException {
this.defaultWatcher = watcher;
- cnxn = new ClientCnxn(host, sessionTimeout, this);
+ cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher());
}
public ZooKeeper(String host, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd) throws IOException {
this.defaultWatcher = watcher;
- cnxn = new ClientCnxn(host, sessionTimeout, this, sessionId,
- sessionPasswd);
+ cnxn = new ClientCnxn(host, sessionTimeout, this, new ZKWatcher(),
+ sessionId, sessionPasswd);
}
/**
@@ -291,10 +300,6 @@
cnxn.addAuthInfo(scheme, auth);
}
- public String describeCNXN() {
- return cnxn.toString();
- }
-
public synchronized void register(Watcher watcher) {
this.defaultWatcher = watcher;
}
@@ -935,13 +940,4 @@
public States getState() {
return state;
}
-
- // Everything below this line is for testing!
-
- /** Testing only!!! Really this needs to be moved into a stub in the
- * tests - pending JIRA for that.
- */
- public void disconnect() throws IOException {
- cnxn.disconnect();
- }
}
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java?rev=692534&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
(added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
Fri Sep 5 13:30:49 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Specialized form of ZooKeeper specific for testing. Typically provides
+ * the ability to do unsafe or incorrect operations that allow negative
+ * testing.
+ */
+public class DisconnectableZooKeeper extends ZooKeeper {
+ public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher)
+ throws IOException
+ {
+ super(host, sessionTimeout, watcher);
+ }
+
+ public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher,
+ long sessionId, byte[] sessionPasswd)
+ throws IOException
+ {
+ super(host, sessionTimeout, watcher, sessionId, sessionPasswd);
+ }
+
+ /** Testing only!!! Really!!!! This is only here to test when the client
+ * disconnects from the server w/o sending a session disconnect (ie
+ * ending the session cleanly). The server will eventually notice the
+ * client is no longer pinging and will timeout the session.
+ */
+ public void disconnect() throws IOException {
+ cnxn.disconnect();
+ }
+
+}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=692534&r1=692533&r2=692534&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java Fri Sep
5 13:30:49 2008
@@ -30,7 +30,6 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.CreateFlags;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
@@ -88,11 +87,12 @@
}
}
- private ZooKeeper createClient()
+ private DisconnectableZooKeeper createClient()
throws IOException, InterruptedException
{
CountdownWatcher watcher = new CountdownWatcher();
- ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, watcher);
+ DisconnectableZooKeeper zk =
+ new DisconnectableZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, watcher);
if(!watcher.clientConnected.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS))
{
@@ -143,7 +143,7 @@
public void testSession()
throws IOException, InterruptedException, KeeperException
{
- ZooKeeper zk = createClient();
+ DisconnectableZooKeeper zk = createClient();
zk.create("/e", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateFlags.EPHEMERAL);
LOG.info("zk with session id 0x" + Long.toHexString(zk.getSessionId())
@@ -156,9 +156,9 @@
Stat stat = new Stat();
startSignal = new CountDownLatch(1);
- zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this,
- zk.getSessionId(),
- zk.getSessionPasswd());
+ zk = new DisconnectableZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this,
+ zk.getSessionId(),
+ zk.getSessionPasswd());
startSignal.await();
LOG.info("zk with session id 0x" + Long.toHexString(zk.getSessionId())
|