hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ph...@apache.org
Subject svn commit: r705346 - in /hadoop/zookeeper/trunk: ./ docs/ src/docs/src/documentation/content/xdocs/
Date Thu, 16 Oct 2008 20:09:16 GMT
Author: phunt
Date: Thu Oct 16 13:09:15 2008
New Revision: 705346

URL: http://svn.apache.org/viewvc?rev=705346&view=rev
Log:
ZOOKEEPER-193. update java example doc to compile with latest zookeeper

Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/docs/javaExample.html
    hadoop/zookeeper/trunk/docs/javaExample.pdf
    hadoop/zookeeper/trunk/docs/zookeeperTutorial.html
    hadoop/zookeeper/trunk/docs/zookeeperTutorial.pdf
    hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/javaExample.xml
    hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=705346&r1=705345&r2=705346&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Thu Oct 16 13:09:15 2008
@@ -35,6 +35,9 @@
 
   BUGFIXES: 
 
+  ZOOKEEPER-193. update java example doc to compile with latest zookeeper
+  (phunt)
+
   ZOOKEEPER-187. CreateMode api docs missing (phunt)
 
   ZOOKEEPER-186. add new "releasenotes.xml" to forrest documentation

Modified: hadoop/zookeeper/trunk/docs/javaExample.html
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/docs/javaExample.html?rev=705346&r1=705345&r2=705346&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/docs/javaExample.html (original)
+++ hadoop/zookeeper/trunk/docs/javaExample.html Thu Oct 16 13:09:15 2008
@@ -271,47 +271,44 @@
     both the <strong>ZooKeeper</strong> object, <strong>DataMonitor</strong>, as described above in 
     <a href="#sc_design">Program Design</a>.  </p>
 <pre class="code">
-// from the Executor class...
+    // from the Executor class...
     
-public static void main(String[] args) {
-    if (args.length &lt; 4) {
-        System.err
-                .println("USAGE: Executor hostPort znode filename program [args ...]");
-        System.exit(2);
-    }
-    String hostPort = args[0];
-    String znode = args[1];
-    String filename = args[2];
-    String exec[] = new String[args.length - 3];
-    System.arraycopy(args, 3, exec, 0, exec.length);
-    try {
-        Executor theExectutor = new Executor(hostPort, znode, filename, exec);
-	theExectutor.run();
-    } catch (Exception e) {
-        e.printStackTrace();
+    public static void main(String[] args) {
+        if (args.length &lt; 4) {
+            System.err
+                    .println("USAGE: Executor hostPort znode filename program [args ...]");
+            System.exit(2);
+        }
+        String hostPort = args[0];
+        String znode = args[1];
+        String filename = args[2];
+        String exec[] = new String[args.length - 3];
+        System.arraycopy(args, 3, exec, 0, exec.length);
+        try {
+            new Executor(hostPort, znode, filename, exec).run();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
-}
-    
-public Executor(String hostPort, String znode, String filename,
-    String exec[]) throws KeeperException, IOException {
-    this.filename = filename;
-    this.exec = exec;
-    
-    //create a new zookeeper object, passing a self-reference in a the Watcher
-    zk = new ZooKeeper(hostPort, 3000, this);
-    dm = new DataMonitor(zk, znode, null, this);
-}
-   
-public void run() {
-    try {
-        synchronized (this) {
-            while (!dm.dead) {
-                wait();
+
+    public Executor(String hostPort, String znode, String filename,
+            String exec[]) throws KeeperException, IOException {
+        this.filename = filename;
+        this.exec = exec;
+        zk = new ZooKeeper(hostPort, 3000, this);
+        dm = new DataMonitor(zk, znode, null, this);
+    }
+
+    public void run() {
+        try {
+            synchronized (this) {
+                while (!dm.dead) {
+                    wait();
+                }
             }
+        } catch (InterruptedException e) {
         }
-    } catch (InterruptedException e) {    
     }
-}
 </pre>
 <p>
     Recall that the Executor's job is to starts and stop the executable whose name you pass in on the command line. 
@@ -321,7 +318,7 @@
     interfaces:
     </p>
 <pre class="code">
-public class Executor implements Watcher, Runnable, DataMonitorListener {
+public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
 ...
     </pre>
 <p>The <strong>Watcher</strong> interface is defined by the ZooKeeper Java API.
@@ -331,9 +328,9 @@
     the point that, by convention, the Executor or some Executor-like object "owns" the ZooKeeper connection, but it is free to delegate the events to other
     events to other objects. It also uses this as the default channel on which to fire watch events. (More on this later.)</p>
 <pre class="code">
-public void process(WatcherEvent event) {
-    dm.process(event);
-}
+    public void process(WatchedEvent event) {
+        dm.process(event);
+    }
 </pre>
 <p>The <strong>DataMonitorListener</strong> 
     interface, on the other hand, is not part of the the ZooKeeper API. It is a completely custom interface, 
@@ -512,33 +509,33 @@
 </p>
 <p>Finally, notice how DataMonitor processes watch events: </p>
 <pre class="code">
-public void process(WatcherEvent event) {
-    String path = event.getPath();
-    if (event.getType() == Watcher.Event.EventNone) {
-        // We are are being told that the state of the
-        // connection has changed
-        switch (event.getState()) {
-        case Event.KeeperStateSyncConnected:
-            // Everything is happy. Lets kick things off
-            // again by checking the existence of the znode
-            zk.exists(znode, true, this, null);
-            break;
-        case Event.KeeperStateExpired:
-            // It's all over
-            dead = true;
-            listener.closing(KeeperException.Code.SessionExpired);
-            break;
+    public void process(WatchedEvent event) {
+        String path = event.getPath();
+        if (event.getType() == Event.EventType.None) {
+            // We are are being told that the state of the
+            // connection has changed
+            switch (event.getState()) {
+            case SyncConnected:
+                // Everything is happy. Lets kick things off
+                // again by checking the existence of the znode
+                zk.exists(znode, true, this, null);
+                break;
+            case Expired:
+                // It's all over
+                dead = true;
+                listener.closing(KeeperException.Code.SessionExpired);
+                break;
+            }
+        } else {
+            if (path != null &amp;&amp; path.equals(znode)) {
+                // Something has changed on the node, let's find out
+                zk.exists(znode, true, this, null);
+            }
         }
-    } else {
-        if (path != null &amp;&amp; path.equals(znode)) {
-            // Something has changed on the node, let's find out
-            zk.exists(znode, true, this, null);
+        if (chainedWatcher != null) {
+            chainedWatcher.process(event);
         }
-   }
-   if (chainedWatcher != null) {
-        chainedWatcher.process(event);
-   }
-}
+    }
 </pre>
 <p>
 If the client-side ZooKeeper libraries can reestablish the communication channel to ZooKeeper, DataMonitor simply kicks
@@ -564,20 +561,19 @@
  * with the specified arguments when the znode exists and kills
  * the program if the znode goes away.
  */
-package com.yahoo.zk.executor;
-
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import com.yahoo.zk.executor.DataMonitor.DataMonitorListener;
-import com.yahoo.zookeeper.KeeperException;
-import com.yahoo.zookeeper.Watcher;
-import com.yahoo.zookeeper.ZooKeeper;
-import com.yahoo.zookeeper.proto.WatcherEvent;
-
-public class Executor implements Watcher, Runnable, DataMonitorListener {
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class Executor
+    implements Watcher, Runnable, DataMonitor.DataMonitorListener
+{
     String znode;
 
     DataMonitor dm;
@@ -621,15 +617,13 @@
 
     /***************************************************************************
      * We do process any events ourselves, we just need to forward them on.
-     * 
-     * @see com.yahoo.zookeeper.Watcher#process(com.yahoo.zookeeper.proto.WatcherEvent)
+     *
+     * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
      */
-    @Override
-    public void process(WatcherEvent event) {
+    public void process(WatchedEvent event) {
         dm.process(event);
     }
 
-    @Override
     public void run() {
         try {
             synchronized (this) {
@@ -641,7 +635,6 @@
         }
     }
 
-    @Override
     public void closing(int rc) {
         synchronized (this) {
             notifyAll();
@@ -672,7 +665,6 @@
         }
     }
 
-    @Override
     public void exists(byte[] data) {
         if (data == null) {
             if (child != null) {
@@ -728,17 +720,15 @@
  * A simple class that monitors the data and existence of a ZooKeeper
  * node. It uses asynchronous ZooKeeper APIs.
  */
-package com.yahoo.zk.executor;
-
 import java.util.Arrays;
 
-import com.yahoo.zookeeper.KeeperException;
-import com.yahoo.zookeeper.Watcher;
-import com.yahoo.zookeeper.ZooKeeper;
-import com.yahoo.zookeeper.AsyncCallback.StatCallback;
-import com.yahoo.zookeeper.KeeperException.Code;
-import com.yahoo.zookeeper.data.Stat;
-import com.yahoo.zookeeper.proto.WatcherEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
 
 public class DataMonitor implements Watcher, StatCallback {
 
@@ -776,30 +766,25 @@
 
         /**
          * The ZooKeeper session is no longer valid.
-         * 
+         *
          * @param rc
          *                the ZooKeeper reason code
          */
         void closing(int rc);
     }
 
-    @Override
-    /**
-     * This is a watch event callback. The node we were watching has changed or
-     * something happened to our connection to ZooKeeper.
-     */
-    public void process(WatcherEvent event) {
+    public void process(WatchedEvent event) {
         String path = event.getPath();
-        if (event.getType() == Watcher.Event.EventNone) {
+        if (event.getType() == Event.EventType.None) {
             // We are are being told that the state of the
             // connection has changed
             switch (event.getState()) {
-            case Event.KeeperStateSyncConnected:
+            case SyncConnected:
                 // Everything is happy. Lets kick things off
                 // again by checking the existence of the znode
                 zk.exists(znode, true, this, null);
                 break;
-            case Event.KeeperStateExpired:
+            case Expired:
                 // It's all over
                 dead = true;
                 listener.closing(KeeperException.Code.SessionExpired);
@@ -816,7 +801,6 @@
         }
     }
 
-    @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
         boolean exists;
         switch (rc) {

Modified: hadoop/zookeeper/trunk/docs/javaExample.pdf
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/docs/javaExample.pdf?rev=705346&r1=705345&r2=705346&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/zookeeper/trunk/docs/zookeeperTutorial.html
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/docs/zookeeperTutorial.html?rev=705346&r1=705345&r2=705346&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/docs/zookeeperTutorial.html (original)
+++ hadoop/zookeeper/trunk/docs/zookeeperTutorial.html Thu Oct 16 13:09:15 2008
@@ -211,35 +211,32 @@
     These examples assume that you have at least one ZooKeeper server running.</p>
 <p>Both primitives use the following common excerpt of code:</p>
 <pre class="code">
-static ZooKeeper zk = null;
-static Integer mutex;
-
-String root;
+    static ZooKeeper zk = null;
+    static Integer mutex;
 
-SyncPrimitive(String address) {
-	if(zk == null){
-	    try {
-		System.out.println("Starting ZK:");
-		zk = new ZooKeeper(address, 3000, this);
-		mutex = new Integer(-1);
-		System.out.println("Finished starting ZK: " + zk);
-	    } catch (KeeperException e) {
-		System.out.println("Keeper exception when starting new session: "
-			+ e.toString());
-		zk = null;
-	    } catch (IOException e) {
-		System.out.println(e.toString());
-		zk = null;
-	    }
-	}
-}
+    String root;
 
-synchronized public void process(WatcherEvent event) {
-	synchronized (mutex) {
-	    mutex.notify();
-	}
-}
+    SyncPrimitive(String address) {
+        if(zk == null){
+            try {
+                System.out.println("Starting ZK:");
+                zk = new ZooKeeper(address, 3000, this);
+                mutex = new Integer(-1);
+                System.out.println("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                System.out.println(e.toString());
+                zk = null;
+            }
+        }
+        //else mutex = new Integer(-1);
+    }
 
+    synchronized public void process(WatchedEvent event) {
+        synchronized (mutex) {
+            //System.out.println("Process: " + event.getType());
+            mutex.notify();
+        }
+    }
 </pre>
 <p>Both classes extend SyncPrimitive. In this way, we execute steps that are 
 common to all primitives in the constructor of SyncPrimitive. To keep the examples 
@@ -291,40 +288,43 @@
 barrier node on ZooKeeper, which is the parent node of all process nodes, and 
 we call root (<strong>Note:</strong> This is not the ZooKeeper root "/").</p>
 <pre class="code">
- /**
- * Barrier constructor
- *
- * @param address
- * @param name
- * @param size
- */
-Barrier(String address, String name, int size) {
-    super(address);
-    this.root = name;
-    this.size = size;
-
-    // Create barrier node
-    if (zk != null) {
-	try {
-	    Stat s = zk.exists(root, false);
-	    if (s == null) {
-		zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
-	    }
-	} catch (KeeperException e) {
-	    System.out.println("Keeper exception when instantiating queue: " + e.toString());
-	} catch (InterruptedException e) {
-	    System.out.println("Interrupted exception");
-	}
-    }
-
-    // My node name
-    try {
-	name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
-    } catch (UnknownHostException e) {
-	System.out.println(e.toString());
-    }
+        /**
+         * Barrier constructor
+         *
+         * @param address
+         * @param name
+         * @param size
+         */
+        Barrier(String address, String name, int size) {
+            super(address);
+            this.root = name;
+            this.size = size;
 
-}
+            // Create barrier node
+            if (zk != null) {
+                try {
+                    Stat s = zk.exists(root, false);
+                    if (s == null) {
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
+                    }
+                } catch (KeeperException e) {
+                    System.out
+                            .println("Keeper exception when instantiating queue: "
+                                    + e.toString());
+                } catch (InterruptedException e) {
+                    System.out.println("Interrupted exception");
+                }
+            }
+
+            // My node name
+            try {
+                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
+            } catch (UnknownHostException e) {
+                System.out.println(e.toString());
+            }
+
+        }
 </pre>
 <p>
 To enter the barrier, a process calls enter(). The process creates a node under 
@@ -338,29 +338,29 @@
 a boolean flag that enables the process to set a watch. In the code the flag is true.
 </p>
 <pre class="code">
- /**
- * Join barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
-
-boolean enter() throws KeeperException, InterruptedException{
-    zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
-	    CreateFlags.EPHEMERAL);
-    while (true) {
-	synchronized (mutex) {
-	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-
-	    if (list.size() &lt; size) {
-		mutex.wait();
-	    } else {
-		return true;
-	    }
-	}
-    }
-}
+        /**
+         * Join barrier
+         *
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+
+        boolean enter() throws KeeperException, InterruptedException{
+            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+            while (true) {
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+
+                    if (list.size() &lt; size) {
+                        mutex.wait();
+                    } else {
+                        return true;
+                    }
+                }
+            }
+        }
 </pre>
 <p>
 Note that enter() throws both KeeperException and InterruptedException, so it is 
@@ -373,27 +373,28 @@
 ZooKeeper has to set a watch on the the root node). Upon reception of a notification, 
 it checks once more whether the root node has any child.</p>
 <pre class="code">
- /**
- * Wait until all reach barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
-
-boolean leave() throws KeeperException, InterruptedException{
-    zk.delete(root + "/" + name, 0);
-    while (true) {
-	synchronized (mutex) {
-	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-		if (list.size() &gt; 0) {
-		    mutex.wait();
-		} else {
-		    return true;
-		}
-	    }
-	}
-}
+        /**
+         * Wait until all reach barrier
+         *
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+
+        boolean leave() throws KeeperException, InterruptedException{
+            zk.delete(root + "/" + name, 0);
+            while (true) {
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+                        if (list.size() &gt; 0) {
+                            mutex.wait();
+                        } else {
+                            return true;
+                        }
+                    }
+                }
+        }
+    }
 </pre>
 </div>
 
@@ -415,32 +416,33 @@
 node of the queue exists, and creates if it doesn't.
 </p>
 <pre class="code">
-/**
- * Constructor of producer-consumer queue
- *
- * @param address
- * @param name
- */
-Queue(String address, String name) {
-    super(address);
-    this.root = name;
-    // Create ZK node name
-    if (zk != null) {
-	try {
-	    Stat s = zk.exists(root, false);
-	    if (s == null) {
-		zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
-	    }
-	} catch (KeeperException e) {
-	    System.out
-		    .println("Keeper exception when instantiating queue: "
-			    + e.toString());
-	} catch (InterruptedException e) {
-	    System.out.println("Interrupted exception");
-	}
-    }
-}
- </pre>
+        /**
+         * Constructor of producer-consumer queue
+         *
+         * @param address
+         * @param name
+         */
+        Queue(String address, String name) {
+            super(address);
+            this.root = name;
+            // Create ZK node name
+            if (zk != null) {
+                try {
+                    Stat s = zk.exists(root, false);
+                    if (s == null) {
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
+                    }
+                } catch (KeeperException e) {
+                    System.out
+                            .println("Keeper exception when instantiating queue: "
+                                    + e.toString());
+                } catch (InterruptedException e) {
+                    System.out.println("Interrupted exception");
+                }
+            }
+        }
+</pre>
 <p>
 A producer process calls "produce()" to add an element to the queue, and passes 
 an integer as an argument. To add an element to the queue, the method creates a 
@@ -450,25 +452,25 @@
 oldest element of the queue is the next one consumed.
 </p>
 <pre class="code">
-/**
- * Add element to the queue.
- *
- * @param i
- * @return
- */
-
-boolean produce(int i) throws KeeperException, InterruptedException{
-    ByteBuffer b = ByteBuffer.allocate(4);
-    byte[] value;
-
-    // Add child with value i
-    b.putInt(i);
-    value = b.array();
-    zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
-		CreateFlags.SEQUENCE);
+        /**
+         * Add element to the queue.
+         *
+         * @param i
+         * @return
+         */
 
-    return true;
-}
+        boolean produce(int i) throws KeeperException, InterruptedException{
+            ByteBuffer b = ByteBuffer.allocate(4);
+            byte[] value;
+
+            // Add child with value i
+            b.putInt(i);
+            value = b.array();
+            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL);
+
+            return true;
+        }
 </pre>
 <p>
 To consume an element, a consumer process obtains the children of the root node, 
@@ -482,41 +484,44 @@
 the smallest counter value, we traverse the list, and remove the prefix "element" 
 from each one.</p>
 <pre class="code">
-/**
- * Remove first element from the queue.
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
-int consume() throws KeeperException, InterruptedException{
-    int retvalue = -1;
-    Stat stat = null;
-
-    // Get the first element available
-    while (true) {
-	synchronized (mutex) {
-	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-	    if (list.size() == 0) {
-		System.out.println("Going to wait");
-		mutex.wait();
-	    } else {
-		Integer min = new Integer(list.get(0).substring(7));
-		for(String s : list){
-		    Integer tempValue = new Integer(s.substring(7));
-		    if(tempValue &gt; min) min = tempValue;
-		}
-		System.out.println("Temporary value: " + root + "/element" + min);
-		byte[] b = zk.getData(root + "/element" + min, false, stat);
-		zk.delete(root + "/element" + min, 0);
-		ByteBuffer buffer = ByteBuffer.wrap(b);
-		retvalue = buffer.getInt();
-
-		return retvalue;
-	    }
-	}
+        /**
+         * Remove first element from the queue.
+         *
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+        int consume() throws KeeperException, InterruptedException{
+            int retvalue = -1;
+            Stat stat = null;
+
+            // Get the first element available
+            while (true) {
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+                    if (list.size() == 0) {
+                        System.out.println("Going to wait");
+                        mutex.wait();
+                    } else {
+                        Integer min = new Integer(list.get(0).substring(7));
+                        for(String s : list){
+                            Integer tempValue = new Integer(s.substring(7));
+                            //System.out.println("Temporary value: " + tempValue);
+                            if(tempValue &lt; min) min = tempValue;
+                        }
+                        System.out.println("Temporary value: " + root + "/element" + min);
+                        byte[] b = zk.getData(root + "/element" + min,
+                                    false, stat);
+                        zk.delete(root + "/element" + min, 0);
+                        ByteBuffer buffer = ByteBuffer.wrap(b);
+                        retvalue = buffer.getInt();
+
+                        return retvalue;
+                    }
+                }
+            }
+        }
     }
-}
 </pre>
 </div>
 
@@ -530,29 +535,26 @@
 <title>SyncPrimitive.Java</title>
 
 <pre class="code">
-package com.yahoo.SyncPrimitive;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.lang.InterruptedException;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
-import com.yahoo.zookeeper.Watcher;
-import com.yahoo.zookeeper.data.Stat;
-import com.yahoo.zookeeper.proto.WatcherEvent;
-import com.yahoo.zookeeper.KeeperException;
-import com.yahoo.zookeeper.ZooDefs.Ids;
-import com.yahoo.zookeeper.ZooDefs.CreateFlags;
-import com.yahoo.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
 
 public class SyncPrimitive implements Watcher {
 
     static ZooKeeper zk = null;
     static Integer mutex;
-    
+
     String root;
 
     SyncPrimitive(String address) {
@@ -562,10 +564,6 @@
                 zk = new ZooKeeper(address, 3000, this);
                 mutex = new Integer(-1);
                 System.out.println("Finished starting ZK: " + zk);
-            } catch (KeeperException e) {
-                System.out.println("Keeper exception when starting new session: "
-                        + e.toString());
-                zk = null;
             } catch (IOException e) {
                 System.out.println(e.toString());
                 zk = null;
@@ -574,7 +572,7 @@
         //else mutex = new Integer(-1);
     }
 
-    synchronized public void process(WatcherEvent event) {
+    synchronized public void process(WatchedEvent event) {
         synchronized (mutex) {
             //System.out.println("Process: " + event.getType());
             mutex.notify();
@@ -590,7 +588,7 @@
 
         /**
          * Barrier constructor
-         * 
+         *
          * @param address
          * @param name
          * @param size
@@ -605,7 +603,8 @@
                 try {
                     Stat s = zk.exists(root, false);
                     if (s == null) {
-                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
                     }
                 } catch (KeeperException e) {
                     System.out
@@ -615,31 +614,31 @@
                     System.out.println("Interrupted exception");
                 }
             }
-            
+
             // My node name
             try {
                 name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
             } catch (UnknownHostException e) {
                 System.out.println(e.toString());
             }
-            
+
         }
 
         /**
          * Join barrier
-         * 
+         *
          * @return
          * @throws KeeperException
          * @throws InterruptedException
          */
-        
+
         boolean enter() throws KeeperException, InterruptedException{
             zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateFlags.EPHEMERAL);
+                    CreateMode.EPHEMERAL);
             while (true) {
-                synchronized (mutex) {                    
-                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-       
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+
                     if (list.size() &lt; size) {
                         mutex.wait();
                     } else {
@@ -651,17 +650,17 @@
 
         /**
          * Wait until all reach barrier
-         * 
+         *
          * @return
          * @throws KeeperException
          * @throws InterruptedException
          */
-        
+
         boolean leave() throws KeeperException, InterruptedException{
             zk.delete(root + "/" + name, 0);
             while (true) {
                 synchronized (mutex) {
-                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+                    List&lt;String&gt; list = zk.getChildren(root, true);
                         if (list.size() &gt; 0) {
                             mutex.wait();
                         } else {
@@ -679,7 +678,7 @@
 
         /**
          * Constructor of producer-consumer queue
-         * 
+         *
          * @param address
          * @param name
          */
@@ -691,7 +690,8 @@
                 try {
                     Stat s = zk.exists(root, false);
                     if (s == null) {
-                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
                     }
                 } catch (KeeperException e) {
                     System.out
@@ -705,11 +705,11 @@
 
         /**
          * Add element to the queue.
-         * 
+         *
          * @param i
          * @return
          */
-        
+
         boolean produce(int i) throws KeeperException, InterruptedException{
             ByteBuffer b = ByteBuffer.allocate(4);
             byte[] value;
@@ -718,15 +718,15 @@
             b.putInt(i);
             value = b.array();
             zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
-                        CreateFlags.SEQUENCE);
-            
+                        CreateMode.PERSISTENT_SEQUENTIAL);
+
             return true;
         }
 
 
         /**
          * Remove first element from the queue.
-         * 
+         *
          * @return
          * @throws KeeperException
          * @throws InterruptedException
@@ -734,11 +734,11 @@
         int consume() throws KeeperException, InterruptedException{
             int retvalue = -1;
             Stat stat = null;
-            
+
             // Get the first element available
             while (true) {
                 synchronized (mutex) {
-                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+                    List&lt;String&gt; list = zk.getChildren(root, true);
                     if (list.size() == 0) {
                         System.out.println("Going to wait");
                         mutex.wait();
@@ -755,11 +755,11 @@
                         zk.delete(root + "/element" + min, 0);
                         ByteBuffer buffer = ByteBuffer.wrap(b);
                         retvalue = buffer.getInt();
-                        
+
                         return retvalue;
                     }
                 }
-            }           
+            }
         }
     }
 
@@ -781,16 +781,16 @@
         if (args[3].equals("p")) {
             System.out.println("Producer");
             for (i = 0; i &lt; max; i++)
-                try{    
+                try{
                     q.produce(10 + i);
                 } catch (KeeperException e){
-                    
+
                 } catch (InterruptedException e){
-                    
+
                 }
         } else {
             System.out.println("Consumer");
-   
+
             for (i = 0; i &lt; max; i++) {
                 try{
                     int r = q.consume();
@@ -798,7 +798,7 @@
                 } catch (KeeperException e){
                     i--;
                 } catch (InterruptedException e){
-                    
+
                 }
             }
         }
@@ -811,9 +811,9 @@
             System.out.println("Entered barrier: " + args[2]);
             if(!flag) System.out.println("Error when entering the barrier");
         } catch (KeeperException e){
-            
+
         } catch (InterruptedException e){
-            
+
         }
 
         // Generate random integer
@@ -830,13 +830,14 @@
         try{
             b.leave();
         } catch (KeeperException e){
-            
+
         } catch (InterruptedException e){
-            
+
         }
         System.out.println("Left barrier");
     }
-}</pre>
+}
+</pre>
 </div>
 </div>
 </div>

Modified: hadoop/zookeeper/trunk/docs/zookeeperTutorial.pdf
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/docs/zookeeperTutorial.pdf?rev=705346&r1=705345&r2=705346&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/javaExample.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/javaExample.xml?rev=705346&r1=705345&r2=705346&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/javaExample.xml (original)
+++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/javaExample.xml Thu Oct 16 13:09:15 2008
@@ -83,47 +83,44 @@
     <xref linkend="sc_design"/>.  </para>
     
     <programlisting>
-// from the Executor class...
+    // from the Executor class...
     
-public static void main(String[] args) {
-    if (args.length &lt; 4) {
-        System.err
-                .println("USAGE: Executor hostPort znode filename program [args ...]");
-        System.exit(2);
-    }
-    String hostPort = args[0];
-    String znode = args[1];
-    String filename = args[2];
-    String exec[] = new String[args.length - 3];
-    System.arraycopy(args, 3, exec, 0, exec.length);
-    try {
-        Executor theExectutor = new Executor(hostPort, znode, filename, exec);
-	theExectutor.run();
-    } catch (Exception e) {
-        e.printStackTrace();
+    public static void main(String[] args) {
+        if (args.length &lt; 4) {
+            System.err
+                    .println("USAGE: Executor hostPort znode filename program [args ...]");
+            System.exit(2);
+        }
+        String hostPort = args[0];
+        String znode = args[1];
+        String filename = args[2];
+        String exec[] = new String[args.length - 3];
+        System.arraycopy(args, 3, exec, 0, exec.length);
+        try {
+            new Executor(hostPort, znode, filename, exec).run();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
     }
-}
-    
-public Executor(String hostPort, String znode, String filename,
-    String exec[]) throws KeeperException, IOException {
-    this.filename = filename;
-    this.exec = exec;
-    
-    //create a new zookeeper object, passing a self-reference in a the Watcher
-    zk = new ZooKeeper(hostPort, 3000, this);
-    dm = new DataMonitor(zk, znode, null, this);
-}
-   
-public void run() {
-    try {
-        synchronized (this) {
-            while (!dm.dead) {
-                wait();
+
+    public Executor(String hostPort, String znode, String filename,
+            String exec[]) throws KeeperException, IOException {
+        this.filename = filename;
+        this.exec = exec;
+        zk = new ZooKeeper(hostPort, 3000, this);
+        dm = new DataMonitor(zk, znode, null, this);
+    }
+
+    public void run() {
+        try {
+            synchronized (this) {
+                while (!dm.dead) {
+                    wait();
+                }
             }
+        } catch (InterruptedException e) {
         }
-    } catch (InterruptedException e) {    
     }
-}
 </programlisting>
 
 
@@ -136,7 +133,7 @@
     </para>
     
     <programlisting>
-public class Executor implements Watcher, Runnable, DataMonitorListener {
+public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
 ...
     </programlisting>
     
@@ -148,9 +145,9 @@
     events to other objects. It also uses this as the default channel on which to fire watch events. (More on this later.)</para>
     
 <programlisting>
-public void process(WatcherEvent event) {
-    dm.process(event);
-}
+    public void process(WatchedEvent event) {
+        dm.process(event);
+    }
 </programlisting>
     
     <para>The <emphasis role="bold">DataMonitorListener</emphasis> 
@@ -330,33 +327,33 @@
 
 <para>Finally, notice how DataMonitor processes watch events: </para>
 <programlisting>
-public void process(WatcherEvent event) {
-    String path = event.getPath();
-    if (event.getType() == Watcher.Event.EventNone) {
-        // We are are being told that the state of the
-        // connection has changed
-        switch (event.getState()) {
-        case Event.KeeperStateSyncConnected:
-            // Everything is happy. Lets kick things off
-            // again by checking the existence of the znode
-            zk.exists(znode, true, this, null);
-            break;
-        case Event.KeeperStateExpired:
-            // It's all over
-            dead = true;
-            listener.closing(KeeperException.Code.SessionExpired);
-            break;
+    public void process(WatchedEvent event) {
+        String path = event.getPath();
+        if (event.getType() == Event.EventType.None) {
+            // We are are being told that the state of the
+            // connection has changed
+            switch (event.getState()) {
+            case SyncConnected:
+                // Everything is happy. Lets kick things off
+                // again by checking the existence of the znode
+                zk.exists(znode, true, this, null);
+                break;
+            case Expired:
+                // It's all over
+                dead = true;
+                listener.closing(KeeperException.Code.SessionExpired);
+                break;
+            }
+        } else {
+            if (path != null &amp;&amp; path.equals(znode)) {
+                // Something has changed on the node, let's find out
+                zk.exists(znode, true, this, null);
+            }
         }
-    } else {
-        if (path != null &amp;&amp; path.equals(znode)) {
-            // Something has changed on the node, let's find out
-            zk.exists(znode, true, this, null);
+        if (chainedWatcher != null) {
+            chainedWatcher.process(event);
         }
-   }
-   if (chainedWatcher != null) {
-        chainedWatcher.process(event);
-   }
-}
+    }
 </programlisting>
 <para>
 If the client-side ZooKeeper libraries can reestablish the communication channel to ZooKeeper, DataMonitor simply kicks
@@ -376,20 +373,19 @@
  * with the specified arguments when the znode exists and kills
  * the program if the znode goes away.
  */
-package com.yahoo.zk.executor;
-
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import com.yahoo.zk.executor.DataMonitor.DataMonitorListener;
-import com.yahoo.zookeeper.KeeperException;
-import com.yahoo.zookeeper.Watcher;
-import com.yahoo.zookeeper.ZooKeeper;
-import com.yahoo.zookeeper.proto.WatcherEvent;
-
-public class Executor implements Watcher, Runnable, DataMonitorListener {
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class Executor
+    implements Watcher, Runnable, DataMonitor.DataMonitorListener
+{
     String znode;
 
     DataMonitor dm;
@@ -433,15 +429,13 @@
 
     /***************************************************************************
      * We do process any events ourselves, we just need to forward them on.
-     * 
-     * @see com.yahoo.zookeeper.Watcher#process(com.yahoo.zookeeper.proto.WatcherEvent)
+     *
+     * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
      */
-    @Override
-    public void process(WatcherEvent event) {
+    public void process(WatchedEvent event) {
         dm.process(event);
     }
 
-    @Override
     public void run() {
         try {
             synchronized (this) {
@@ -453,7 +447,6 @@
         }
     }
 
-    @Override
     public void closing(int rc) {
         synchronized (this) {
             notifyAll();
@@ -484,7 +477,6 @@
         }
     }
 
-    @Override
     public void exists(byte[] data) {
         if (data == null) {
             if (child != null) {
@@ -535,17 +527,15 @@
  * A simple class that monitors the data and existence of a ZooKeeper
  * node. It uses asynchronous ZooKeeper APIs.
  */
-package com.yahoo.zk.executor;
-
 import java.util.Arrays;
 
-import com.yahoo.zookeeper.KeeperException;
-import com.yahoo.zookeeper.Watcher;
-import com.yahoo.zookeeper.ZooKeeper;
-import com.yahoo.zookeeper.AsyncCallback.StatCallback;
-import com.yahoo.zookeeper.KeeperException.Code;
-import com.yahoo.zookeeper.data.Stat;
-import com.yahoo.zookeeper.proto.WatcherEvent;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.data.Stat;
 
 public class DataMonitor implements Watcher, StatCallback {
 
@@ -583,30 +573,25 @@
 
         /**
          * The ZooKeeper session is no longer valid.
-         * 
+         *
          * @param rc
          *                the ZooKeeper reason code
          */
         void closing(int rc);
     }
 
-    @Override
-    /**
-     * This is a watch event callback. The node we were watching has changed or
-     * something happened to our connection to ZooKeeper.
-     */
-    public void process(WatcherEvent event) {
+    public void process(WatchedEvent event) {
         String path = event.getPath();
-        if (event.getType() == Watcher.Event.EventNone) {
+        if (event.getType() == Event.EventType.None) {
             // We are are being told that the state of the
             // connection has changed
             switch (event.getState()) {
-            case Event.KeeperStateSyncConnected:
+            case SyncConnected:
                 // Everything is happy. Lets kick things off
                 // again by checking the existence of the znode
                 zk.exists(znode, true, this, null);
                 break;
-            case Event.KeeperStateExpired:
+            case Expired:
                 // It's all over
                 dead = true;
                 listener.closing(KeeperException.Code.SessionExpired);
@@ -623,7 +608,6 @@
         }
     }
 
-    @Override
     public void processResult(int rc, String path, Object ctx, Stat stat) {
         boolean exists;
         switch (rc) {

Modified: hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml?rev=705346&r1=705345&r2=705346&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml (original)
+++ hadoop/zookeeper/trunk/src/docs/src/documentation/content/xdocs/zookeeperTutorial.xml Thu Oct 16 13:09:15 2008
@@ -51,35 +51,32 @@
     <para>Both primitives use the following common excerpt of code:</para>
     
     <programlisting>
-static ZooKeeper zk = null;
-static Integer mutex;
-
-String root;
+    static ZooKeeper zk = null;
+    static Integer mutex;
 
-SyncPrimitive(String address) {
-	if(zk == null){
-	    try {
-		System.out.println("Starting ZK:");
-		zk = new ZooKeeper(address, 3000, this);
-		mutex = new Integer(-1);
-		System.out.println("Finished starting ZK: " + zk);
-	    } catch (KeeperException e) {
-		System.out.println("Keeper exception when starting new session: "
-			+ e.toString());
-		zk = null;
-	    } catch (IOException e) {
-		System.out.println(e.toString());
-		zk = null;
-	    }
-	}
-}
+    String root;
 
-synchronized public void process(WatcherEvent event) {
-	synchronized (mutex) {
-	    mutex.notify();
-	}
-}
+    SyncPrimitive(String address) {
+        if(zk == null){
+            try {
+                System.out.println("Starting ZK:");
+                zk = new ZooKeeper(address, 3000, this);
+                mutex = new Integer(-1);
+                System.out.println("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                System.out.println(e.toString());
+                zk = null;
+            }
+        }
+        //else mutex = new Integer(-1);
+    }
 
+    synchronized public void process(WatchedEvent event) {
+        synchronized (mutex) {
+            //System.out.println("Process: " + event.getType());
+            mutex.notify();
+        }
+    }
 </programlisting>
 
 <para>Both classes extend SyncPrimitive. In this way, we execute steps that are 
@@ -124,40 +121,43 @@
 we call root (<emphasis role="bold">Note:</emphasis> This is not the ZooKeeper root "/").</para>
 
 <programlisting>
- /**
- * Barrier constructor
- *
- * @param address
- * @param name
- * @param size
- */
-Barrier(String address, String name, int size) {
-    super(address);
-    this.root = name;
-    this.size = size;
-
-    // Create barrier node
-    if (zk != null) {
-	try {
-	    Stat s = zk.exists(root, false);
-	    if (s == null) {
-		zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
-	    }
-	} catch (KeeperException e) {
-	    System.out.println("Keeper exception when instantiating queue: " + e.toString());
-	} catch (InterruptedException e) {
-	    System.out.println("Interrupted exception");
-	}
-    }
-
-    // My node name
-    try {
-	name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
-    } catch (UnknownHostException e) {
-	System.out.println(e.toString());
-    }
+        /**
+         * Barrier constructor
+         *
+         * @param address
+         * @param name
+         * @param size
+         */
+        Barrier(String address, String name, int size) {
+            super(address);
+            this.root = name;
+            this.size = size;
 
-}
+            // Create barrier node
+            if (zk != null) {
+                try {
+                    Stat s = zk.exists(root, false);
+                    if (s == null) {
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
+                    }
+                } catch (KeeperException e) {
+                    System.out
+                            .println("Keeper exception when instantiating queue: "
+                                    + e.toString());
+                } catch (InterruptedException e) {
+                    System.out.println("Interrupted exception");
+                }
+            }
+
+            // My node name
+            try {
+                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
+            } catch (UnknownHostException e) {
+                System.out.println(e.toString());
+            }
+
+        }
 </programlisting>
 <para>
 To enter the barrier, a process calls enter(). The process creates a node under 
@@ -172,29 +172,29 @@
 </para>
 
 <programlisting>
- /**
- * Join barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
-
-boolean enter() throws KeeperException, InterruptedException{
-    zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
-	    CreateFlags.EPHEMERAL);
-    while (true) {
-	synchronized (mutex) {
-	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-
-	    if (list.size() &lt; size) {
-		mutex.wait();
-	    } else {
-		return true;
-	    }
-	}
-    }
-}
+        /**
+         * Join barrier
+         *
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+
+        boolean enter() throws KeeperException, InterruptedException{
+            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+            while (true) {
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+
+                    if (list.size() &lt; size) {
+                        mutex.wait();
+                    } else {
+                        return true;
+                    }
+                }
+            }
+        }
 </programlisting>
 <para>
 Note that enter() throws both KeeperException and InterruptedException, so it is 
@@ -209,27 +209,28 @@
 it checks once more whether the root node has any child.</para>
 
 <programlisting>
- /**
- * Wait until all reach barrier
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
-
-boolean leave() throws KeeperException, InterruptedException{
-    zk.delete(root + "/" + name, 0);
-    while (true) {
-	synchronized (mutex) {
-	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-		if (list.size() &gt; 0) {
-		    mutex.wait();
-		} else {
-		    return true;
-		}
-	    }
-	}
-}
+        /**
+         * Wait until all reach barrier
+         *
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+
+        boolean leave() throws KeeperException, InterruptedException{
+            zk.delete(root + "/" + name, 0);
+            while (true) {
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+                        if (list.size() &gt; 0) {
+                            mutex.wait();
+                        } else {
+                            return true;
+                        }
+                    }
+                }
+        }
+    }
 </programlisting>
 </section>
 <section id="sc_producerConsumerQueues"><title>Producer-Consumer Queues</title>
@@ -249,32 +250,33 @@
 node of the queue exists, and creates if it doesn't.
 </para>
 <programlisting>
-/**
- * Constructor of producer-consumer queue
- *
- * @param address
- * @param name
- */
-Queue(String address, String name) {
-    super(address);
-    this.root = name;
-    // Create ZK node name
-    if (zk != null) {
-	try {
-	    Stat s = zk.exists(root, false);
-	    if (s == null) {
-		zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
-	    }
-	} catch (KeeperException e) {
-	    System.out
-		    .println("Keeper exception when instantiating queue: "
-			    + e.toString());
-	} catch (InterruptedException e) {
-	    System.out.println("Interrupted exception");
-	}
-    }
-}
- </programlisting>
+        /**
+         * Constructor of producer-consumer queue
+         *
+         * @param address
+         * @param name
+         */
+        Queue(String address, String name) {
+            super(address);
+            this.root = name;
+            // Create ZK node name
+            if (zk != null) {
+                try {
+                    Stat s = zk.exists(root, false);
+                    if (s == null) {
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
+                    }
+                } catch (KeeperException e) {
+                    System.out
+                            .println("Keeper exception when instantiating queue: "
+                                    + e.toString());
+                } catch (InterruptedException e) {
+                    System.out.println("Interrupted exception");
+                }
+            }
+        }
+</programlisting>
  
 <para>
 A producer process calls "produce()" to add an element to the queue, and passes 
@@ -286,25 +288,25 @@
 </para>
 
 <programlisting>
-/**
- * Add element to the queue.
- *
- * @param i
- * @return
- */
-
-boolean produce(int i) throws KeeperException, InterruptedException{
-    ByteBuffer b = ByteBuffer.allocate(4);
-    byte[] value;
-
-    // Add child with value i
-    b.putInt(i);
-    value = b.array();
-    zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
-		CreateFlags.SEQUENCE);
+        /**
+         * Add element to the queue.
+         *
+         * @param i
+         * @return
+         */
 
-    return true;
-}
+        boolean produce(int i) throws KeeperException, InterruptedException{
+            ByteBuffer b = ByteBuffer.allocate(4);
+            byte[] value;
+
+            // Add child with value i
+            b.putInt(i);
+            value = b.array();
+            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT_SEQUENTIAL);
+
+            return true;
+        }
 </programlisting>
 <para>
 To consume an element, a consumer process obtains the children of the root node, 
@@ -320,41 +322,44 @@
 from each one.</para>
 
 <programlisting>
-/**
- * Remove first element from the queue.
- *
- * @return
- * @throws KeeperException
- * @throws InterruptedException
- */
-int consume() throws KeeperException, InterruptedException{
-    int retvalue = -1;
-    Stat stat = null;
-
-    // Get the first element available
-    while (true) {
-	synchronized (mutex) {
-	    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-	    if (list.size() == 0) {
-		System.out.println("Going to wait");
-		mutex.wait();
-	    } else {
-		Integer min = new Integer(list.get(0).substring(7));
-		for(String s : list){
-		    Integer tempValue = new Integer(s.substring(7));
-		    if(tempValue &gt; min) min = tempValue;
-		}
-		System.out.println("Temporary value: " + root + "/element" + min);
-		byte[] b = zk.getData(root + "/element" + min, false, stat);
-		zk.delete(root + "/element" + min, 0);
-		ByteBuffer buffer = ByteBuffer.wrap(b);
-		retvalue = buffer.getInt();
-
-		return retvalue;
-	    }
-	}
+        /**
+         * Remove first element from the queue.
+         *
+         * @return
+         * @throws KeeperException
+         * @throws InterruptedException
+         */
+        int consume() throws KeeperException, InterruptedException{
+            int retvalue = -1;
+            Stat stat = null;
+
+            // Get the first element available
+            while (true) {
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+                    if (list.size() == 0) {
+                        System.out.println("Going to wait");
+                        mutex.wait();
+                    } else {
+                        Integer min = new Integer(list.get(0).substring(7));
+                        for(String s : list){
+                            Integer tempValue = new Integer(s.substring(7));
+                            //System.out.println("Temporary value: " + tempValue);
+                            if(tempValue &lt; min) min = tempValue;
+                        }
+                        System.out.println("Temporary value: " + root + "/element" + min);
+                        byte[] b = zk.getData(root + "/element" + min,
+                                    false, stat);
+                        zk.delete(root + "/element" + min, 0);
+                        ByteBuffer buffer = ByteBuffer.wrap(b);
+                        retvalue = buffer.getInt();
+
+                        return retvalue;
+                    }
+                }
+            }
+        }
     }
-}
 </programlisting>
  
 </section>
@@ -362,29 +367,26 @@
 <example id="eg_SyncPrimitive_java">
 <title>SyncPrimitive.Java</title>
 <programlisting>
-package com.yahoo.SyncPrimitive;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.lang.InterruptedException;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
-import com.yahoo.zookeeper.Watcher;
-import com.yahoo.zookeeper.data.Stat;
-import com.yahoo.zookeeper.proto.WatcherEvent;
-import com.yahoo.zookeeper.KeeperException;
-import com.yahoo.zookeeper.ZooDefs.Ids;
-import com.yahoo.zookeeper.ZooDefs.CreateFlags;
-import com.yahoo.zookeeper.ZooKeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
 
 public class SyncPrimitive implements Watcher {
 
     static ZooKeeper zk = null;
     static Integer mutex;
-    
+
     String root;
 
     SyncPrimitive(String address) {
@@ -394,10 +396,6 @@
                 zk = new ZooKeeper(address, 3000, this);
                 mutex = new Integer(-1);
                 System.out.println("Finished starting ZK: " + zk);
-            } catch (KeeperException e) {
-                System.out.println("Keeper exception when starting new session: "
-                        + e.toString());
-                zk = null;
             } catch (IOException e) {
                 System.out.println(e.toString());
                 zk = null;
@@ -406,7 +404,7 @@
         //else mutex = new Integer(-1);
     }
 
-    synchronized public void process(WatcherEvent event) {
+    synchronized public void process(WatchedEvent event) {
         synchronized (mutex) {
             //System.out.println("Process: " + event.getType());
             mutex.notify();
@@ -422,7 +420,7 @@
 
         /**
          * Barrier constructor
-         * 
+         *
          * @param address
          * @param name
          * @param size
@@ -437,7 +435,8 @@
                 try {
                     Stat s = zk.exists(root, false);
                     if (s == null) {
-                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
                     }
                 } catch (KeeperException e) {
                     System.out
@@ -447,31 +446,31 @@
                     System.out.println("Interrupted exception");
                 }
             }
-            
+
             // My node name
             try {
                 name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
             } catch (UnknownHostException e) {
                 System.out.println(e.toString());
             }
-            
+
         }
 
         /**
          * Join barrier
-         * 
+         *
          * @return
          * @throws KeeperException
          * @throws InterruptedException
          */
-        
+
         boolean enter() throws KeeperException, InterruptedException{
             zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
-                    CreateFlags.EPHEMERAL);
+                    CreateMode.EPHEMERAL);
             while (true) {
-                synchronized (mutex) {                    
-                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-       
+                synchronized (mutex) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+
                     if (list.size() &lt; size) {
                         mutex.wait();
                     } else {
@@ -483,18 +482,18 @@
 
         /**
          * Wait until all reach barrier
-         * 
+         *
          * @return
          * @throws KeeperException
          * @throws InterruptedException
          */
-        
+
         boolean leave() throws KeeperException, InterruptedException{
             zk.delete(root + "/" + name, 0);
             while (true) {
                 synchronized (mutex) {
-                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
-                        if (list.size() > 0) {
+                    List&lt;String&gt; list = zk.getChildren(root, true);
+                        if (list.size() &gt; 0) {
                             mutex.wait();
                         } else {
                             return true;
@@ -511,7 +510,7 @@
 
         /**
          * Constructor of producer-consumer queue
-         * 
+         *
          * @param address
          * @param name
          */
@@ -523,7 +522,8 @@
                 try {
                     Stat s = zk.exists(root, false);
                     if (s == null) {
-                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
                     }
                 } catch (KeeperException e) {
                     System.out
@@ -537,11 +537,11 @@
 
         /**
          * Add element to the queue.
-         * 
+         *
          * @param i
          * @return
          */
-        
+
         boolean produce(int i) throws KeeperException, InterruptedException{
             ByteBuffer b = ByteBuffer.allocate(4);
             byte[] value;
@@ -550,15 +550,15 @@
             b.putInt(i);
             value = b.array();
             zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
-                        CreateFlags.SEQUENCE);
-            
+                        CreateMode.PERSISTENT_SEQUENTIAL);
+
             return true;
         }
 
 
         /**
          * Remove first element from the queue.
-         * 
+         *
          * @return
          * @throws KeeperException
          * @throws InterruptedException
@@ -566,11 +566,11 @@
         int consume() throws KeeperException, InterruptedException{
             int retvalue = -1;
             Stat stat = null;
-            
+
             // Get the first element available
             while (true) {
                 synchronized (mutex) {
-                    ArrayList&lt;String&gt; list = zk.getChildren(root, true);
+                    List&lt;String&gt; list = zk.getChildren(root, true);
                     if (list.size() == 0) {
                         System.out.println("Going to wait");
                         mutex.wait();
@@ -587,11 +587,11 @@
                         zk.delete(root + "/element" + min, 0);
                         ByteBuffer buffer = ByteBuffer.wrap(b);
                         retvalue = buffer.getInt();
-                        
+
                         return retvalue;
                     }
                 }
-            }           
+            }
         }
     }
 
@@ -613,16 +613,16 @@
         if (args[3].equals("p")) {
             System.out.println("Producer");
             for (i = 0; i &lt; max; i++)
-                try{    
+                try{
                     q.produce(10 + i);
                 } catch (KeeperException e){
-                    
+
                 } catch (InterruptedException e){
-                    
+
                 }
         } else {
             System.out.println("Consumer");
-   
+
             for (i = 0; i &lt; max; i++) {
                 try{
                     int r = q.consume();
@@ -630,7 +630,7 @@
                 } catch (KeeperException e){
                     i--;
                 } catch (InterruptedException e){
-                    
+
                 }
             }
         }
@@ -643,9 +643,9 @@
             System.out.println("Entered barrier: " + args[2]);
             if(!flag) System.out.println("Error when entering the barrier");
         } catch (KeeperException e){
-            
+
         } catch (InterruptedException e){
-            
+
         }
 
         // Generate random integer
@@ -662,13 +662,14 @@
         try{
             b.leave();
         } catch (KeeperException e){
-            
+
         } catch (InterruptedException e){
-            
+
         }
         System.out.println("Left barrier");
     }
-}</programlisting></example>
+}
+</programlisting></example>
 </section>
 
 </article>



Mime
View raw message