hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From surajsme...@apache.org
Subject svn commit: r1353374 - in /hama/branches/HAMA-505-branch/core/src: main/java/org/apache/hama/bsp/ main/java/org/apache/hama/bsp/sync/ test/java/org/apache/hama/bsp/ test/java/org/apache/hama/bsp/sync/
Date Mon, 25 Jun 2012 01:14:27 GMT
Author: surajsmenon
Date: Mon Jun 25 01:14:24 2012
New Revision: 1353374

URL: http://svn.apache.org/viewvc?rev=1353374&view=rev
Log:
[HAMA-587] Version 3 patch applied. API's needed for Zookeeper interface to record checkpoint progress.

Added:
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
Modified:
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
    hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
    hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon Jun 25 01:14:24 2012
@@ -48,6 +48,9 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.BSPMasterSyncClient;
+import org.apache.hama.bsp.sync.MasterSyncClient;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
 import org.apache.hama.http.HttpServer;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -79,6 +82,7 @@ public class BSPMaster implements JobSub
   private HamaConfiguration conf;
   ZooKeeper zk = null;
   private String bspRoot = null;
+  MasterSyncClient syncClient = null;
 
   /**
    * Constants for BSPMaster's status.
@@ -465,108 +469,20 @@ public class BSPMaster implements JobSub
   }
 
   /**
-   * When start the cluster, cleans all zk nodes up.
-   * 
-   * @param conf
+   * Initialize the global synchronization client.
+   * @param conf Hama configuration.
    */
   private void initZK(HamaConfiguration conf) {
-    try {
-      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
-          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
-    } catch (IOException e) {
-      LOG.error("Exception during reinitialization!", e);
-    }
-
-    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
-        Constants.DEFAULT_ZOOKEEPER_ROOT);
-    Stat s = null;
-    if (zk != null) {
-      try {
-        s = zk.exists(bspRoot, false);
-      } catch (Exception e) {
-        LOG.error(s, e);
-      }
-
-      if (s == null) {
-        try {
-          zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
-              CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-          LOG.error(e);
-        } catch (InterruptedException e) {
-          LOG.error(e);
-        }
-      } else {
-        this.clearZKNodes(zk);
-      }
-    }
-  }
-
-  /**
-   * Clears all sub-children of node bspRoot
-   */
-  public void clearZKNodes(ZooKeeper zk) {
-    clearZKNodes(zk, bspRoot);
-  }
-
-  public static void clearZKNodes(ZooKeeper zk, String pPath) {
-    String path = pPath;
-    if (!path.startsWith("/")) {
-      path = "/" + path;
-      LOG.warn("Path did not start with /, adding it: " + path);
-    }
-    try {
-      Stat s = zk.exists(path, false);
-      if (s != null) {
-        clearZKNodesInternal(zk, path);
-      }
-
-    } catch (Exception e) {
-      LOG.warn("Could not clear zookeeper nodes.", e);
-    }
+    this.syncClient = new ZKSyncBSPMasterClient();
+    this.syncClient.init(conf);
   }
 
   /**
-   * Clears all sub-children of node rooted at path.
+   * Get a handle of the global synchronization client used by BSPMaster.
+   * @return The synchronization client.
    */
-  private static void clearZKNodesInternal(ZooKeeper zk, String path)
-      throws KeeperException, InterruptedException {
-    ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
-
-    if (list.size() == 0) {
-      return;
-
-    } else {
-      for (String node : list) {
-        clearZKNodes(zk, path + "/" + node);
-        zk.delete(path + "/" + node, -1); // delete any version of this node.
-      }
-    }
-  }
-
-  public void createJobRoot(String string) {
-    try {
-      zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,
-          CreateMode.PERSISTENT);
-    } catch (KeeperException e) {
-      LOG.error(e);
-    } catch (InterruptedException e) {
-      LOG.error(e);
-    }
-  }
-
-  public void deleteJobRoot(String string) {
-    try {
-      for (String node : zk.getChildren("/" + string, this)) {
-        zk.delete("/" + string + "/" + node, 0);
-      }
-
-      zk.delete("/" + string, 0);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (KeeperException e) {
-      e.printStackTrace();
-    }
+  public MasterSyncClient getSyncClient(){
+    return this.syncClient;
   }
 
   /**

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Jun 25 01:14:24 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
@@ -38,7 +39,10 @@ import org.apache.hama.bsp.Counters.Coun
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
 import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
 import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncEventListener;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -71,7 +75,7 @@ public final class BSPPeerImpl<K1, V1, K
   private String[] allPeers;
 
   // SYNC
-  private SyncClient syncClient;
+  private PeerSyncClient syncClient;
   private MessageManager<M> messenger;
 
   // A checkpoint is initiated at the <checkPointInterval>th interval.
@@ -182,7 +186,7 @@ public final class BSPPeerImpl<K1, V1, K
 
   @SuppressWarnings("unchecked")
   public final void initialize() throws Exception {
-    syncClient = SyncServiceFactory.getSyncClient(conf);
+    syncClient = SyncServiceFactory.getPeerSyncClient(conf);
     syncClient.init(conf, taskId.getJobID(), taskId);
 
     initInput();

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon Jun 25 01:14:24 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.Constants;
+import org.apache.hama.bsp.sync.MasterSyncClient;
 
 /**
  * JobInProgress maintains all the info for keeping a Job on the straight and
@@ -243,8 +244,8 @@ class JobInProgress {
         0L, 0L, JobStatus.RUNNING, counters);
 
     // delete all nodes belonging to that job before start
-    BSPMaster.clearZKNodes(master.zk, this.getJobID().toString());
-    master.createJobRoot(this.getJobID().toString());
+    MasterSyncClient syncClient = master.getSyncClient();
+    syncClient.registerJob(this.getJobID().toString());
 
     tasksInited = true;
     LOG.info("Job is initialized.");
@@ -332,7 +333,7 @@ class JobInProgress {
       LOG.info("Job successfully done.");
 
       // delete job root
-      master.deleteJobRoot(this.getJobID().toString());
+      master.getSyncClient().deregisterJob(this.getJobID().toString());
 
       garbageCollect();
     }

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Jun 25 01:14:24 2012
@@ -49,7 +49,10 @@ import org.apache.hama.bsp.message.Memor
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
 import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
 import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -124,7 +127,7 @@ public class LocalBSPRunner implements J
 
     conf.setClass(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
         LocalMessageManager.class, MessageManager.class);
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalSyncClient.class,
+    conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS, LocalSyncClient.class,
         SyncClient.class);
 
     BSPJob job = new BSPJob(new HamaConfiguration(conf), jobID);
@@ -463,7 +466,7 @@ public class LocalBSPRunner implements J
 
   }
 
-  public static class LocalSyncClient implements SyncClient {
+  public static class LocalSyncClient extends BSPPeerSyncClient {
     // note that this is static, because we will have multiple peers
     private static CyclicBarrier barrier;
     private int tasks;
@@ -528,6 +531,54 @@ public class LocalBSPRunner implements J
     public void close() throws InterruptedException {
       barrier = null;
     }
+    @Override
+    public Writable getInformation(String key,
+        Class<? extends Writable> classType) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public String constructKey(BSPJobID jobId, String... args) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+    @Override
+    public boolean storeInformation(String key, Writable value,
+        boolean permanent, SyncEventListener listener) {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    public boolean addKey(String key, boolean permanent,
+        SyncEventListener listener) {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    public boolean hasKey(String key) {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    public boolean registerListener(String key,
+        SyncEvent event,
+        SyncEventListener listener) {
+      // TODO Auto-generated method stub
+      return false;
+    }
+
+    @Override
+    public String[] getChildKeySet(String key, SyncEventListener listener) {
+      // TODO Auto-generated method stub
+      return null;
+    }
+
+	
   }
 
   @Override

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPMasterSyncClient.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hama.HamaConfiguration;
+
+public abstract class BSPMasterSyncClient implements MasterSyncClient{
+
+  /**
+   * Initialize the Synchronization client.
+   * 
+   * @param conf The configuration parameters to initialize the client.
+   */
+  public abstract void init(HamaConfiguration conf);
+  
+  /**
+   * Clears all information stored.
+   */
+  public abstract void clear();
+  
+  /**
+   * Register a newly added job 
+   * @param string
+   */
+  public abstract void registerJob(String string);
+
+  /**
+   * Deregister the job from the system.
+   * @param string
+   */
+  public abstract void deregisterJob(String string);
+    
+  /**
+   * Closes the client.
+   */
+  public abstract void close();
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/BSPPeerSyncClient.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+
+public abstract class BSPPeerSyncClient implements PeerSyncClient{
+
+  /**
+   * Init will be called within a spawned task, it should be used to initialize
+   * the inner structure and fields, e.G. a zookeeper client or an rpc
+   * connection to the real sync daemon.
+   * 
+   * @throws Exception
+   */
+  public abstract void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+      throws Exception;
+
+  /**
+   * Enters the barrier before the message sending in each superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public abstract void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Leaves the barrier after all communication has been done, this is usually
+   * the end of a superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public abstract void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Registers a specific task with a its host and port to the sync daemon.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param hostAddress the host where the sync server resides
+   * @param port the port where the sync server is up
+   */
+  public abstract void register(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * Returns all registered tasks within the sync daemon. They have to be
+   * ordered ascending by their task id.
+   * 
+   * @param taskId the tasks ID
+   * @return an <b>ordered</b> string array of host:port pairs of all tasks
+   *         connected to the daemon.
+   */
+  public abstract String[] getAllPeerNames(TaskAttemptID taskId);
+
+  /**
+   * TODO this has currently no use. Could later be used to deregister tasks
+   * from the barrier during runtime if they are finished. Something equal to
+   * voteToHalt() in Pregel.
+   * 
+   * @param jobId
+   * @param taskId
+   * @param hostAddress
+   * @param port
+   */
+  public abstract void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * This stops the sync daemon. Only used in YARN.
+   */
+  public abstract void stopServer();
+
+  /**
+   * This method should close all used resources, e.G. a ZooKeeper instance.
+   * 
+   * @throws InterruptedException
+   */
+  public abstract void close() throws InterruptedException;
+
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/MasterSyncClient.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * MasterSyncClient defines the behavior that BSPMaster should follow
+ * to perform different required globally synchronized state changes. 
+ *
+ */
+public interface MasterSyncClient extends SyncClient{
+
+  /**
+   * Initialize the Synchronization client.
+   * 
+   * @param conf The configuration parameters to initialize the client.
+   */
+  public abstract void init(HamaConfiguration conf);
+  
+  /**
+   * Clears all information stored.
+   */
+  public abstract void clear();
+  
+  /**
+   * Register a newly added job 
+   * @param string
+   */
+  public abstract void registerJob(String string);
+
+  /**
+   * Deregister the job from the system.
+   * @param string
+   */
+  public abstract void deregisterJob(String string);
+    
+  /**
+   * Closes the client.
+   */
+  public abstract void close();
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/PeerSyncClient.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * PeerSyncClient defines the behavior that a BSPPeer performs to maintain
+ * synchronized global state as it progresses. 
+ */
+
+public interface PeerSyncClient extends SyncClient{
+
+  /**
+   * Init will be called within a spawned task, it should be used to initialize
+   * the inner structure and fields, e.G. a zookeeper client or an rpc
+   * connection to the real sync daemon.
+   * 
+   * @throws Exception
+   */
+  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+      throws Exception;
+
+  /**
+   * Enters the barrier before the message sending in each superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Leaves the barrier after all communication has been done, this is usually
+   * the end of a superstep.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param superstep the superstep of the task
+   * @throws SyncException
+   */
+  public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
+      throws SyncException;
+
+  /**
+   * Registers a specific task with a its host and port to the sync daemon.
+   * 
+   * @param jobId the jobs ID
+   * @param taskId the tasks ID
+   * @param hostAddress the host where the sync server resides
+   * @param port the port where the sync server is up
+   */
+  public void register(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * Returns all registered tasks within the sync daemon. They have to be
+   * ordered ascending by their task id.
+   * 
+   * @param taskId the tasks ID
+   * @return an <b>ordered</b> string array of host:port pairs of all tasks
+   *         connected to the daemon.
+   */
+  public String[] getAllPeerNames(TaskAttemptID taskId);
+
+  /**
+   * TODO this has currently no use. Could later be used to deregister tasks
+   * from the barrier during runtime if they are finished. Something equal to
+   * voteToHalt() in Pregel.
+   * 
+   * @param jobId
+   * @param taskId
+   * @param hostAddress
+   * @param port
+   */
+  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+      String hostAddress, long port);
+
+  /**
+   * This stops the sync daemon. Only used in YARN.
+   */
+  public void stopServer();
+
+  /**
+   * This method should close all used resources, e.G. a ZooKeeper instance.
+   * 
+   * @throws InterruptedException
+   */
+  public void close() throws InterruptedException;
+
+
+}

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Mon Jun 25 01:14:24 2012
@@ -17,9 +17,8 @@
  */
 package org.apache.hama.bsp.sync;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPJobID;
-import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * Basic interface for a client that connects to a sync server.
@@ -28,82 +27,72 @@ import org.apache.hama.bsp.TaskAttemptID
 public interface SyncClient {
 
   /**
-   * Init will be called within a spawned task, it should be used to initialize
-   * the inner structure and fields, e.G. a zookeeper client or an rpc
-   * connection to the real sync daemon.
-   * 
-   * @throws Exception
+   * Construct key in the format required by the SyncClient for storing and 
+   * retrieving information. This function is recommended to use to construct
+   * keys for storing keys.
+   * @param jobId The BSP Job Id.
+   * @param args The list of String objects that would be used to construct key
+   * @return The key consisting of entities provided in the required format.
    */
-  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
-      throws Exception;
+  public String constructKey(BSPJobID jobId, String ... args);
 
   /**
-   * Enters the barrier before the message sending in each superstep.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param superstep the superstep of the task
-   * @throws SyncException
+   * Stores value for the specified key.
+   * @param key The key for which value should be stored. It is recommended to use 
+   * <code>constructKey</code> to create key object.
+   * @param value The value to be stored.
+   * @param permanent true if the value should be persisted after end of session.
+   * @param Listener object that provides asynchronous updates on the state 
+   * of information stored under the key.
+   * @return true if the operation was successful.
    */
-  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public boolean storeInformation(String key, Writable value, 
+      boolean permanent, SyncEventListener listener);
 
   /**
-   * Leaves the barrier after all communication has been done, this is usually
-   * the end of a superstep.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param superstep the superstep of the task
-   * @throws SyncException
+   * Retrieve value previously store for the key.
+   * @param key The key for which value was stored.
+   * @param classType The expected class instance of value to be extracted
+   * @return the value if found. Returns null if there was any error of if there
+   * was no value stored for the key.
    */
-  public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public Writable getInformation(String key, Class<? extends Writable> classType);
 
   /**
-   * Registers a specific task with a its host and port to the sync daemon.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param hostAddress the host where the sync server resides
-   * @param port the port where the sync server is up
+   * Store new key in key set.
+   * @param key The key to be saved in key set. It is recommended to use 
+   * <code>constructKey</code> to create key object. 
+   * @param permanent true if the value should be persisted after end of session.
+   * @param listener Listener object that asynchronously notifies the events 
+   * related to the key.
+   * @return true if operation was successful.
    */
-  public void register(BSPJobID jobId, TaskAttemptID taskId,
-      String hostAddress, long port);
+  public boolean addKey(String key, boolean permanent, SyncEventListener listener);
 
   /**
-   * Returns all registered tasks within the sync daemon. They have to be
-   * ordered ascending by their task id.
-   * 
-   * @param taskId the tasks ID
-   * @return an <b>ordered</b> string array of host:port pairs of all tasks
-   *         connected to the daemon.
+   * Check if key was previously stored.
+   * @param key The value of the key. 
+   * @return true if the key exists.
    */
-  public String[] getAllPeerNames(TaskAttemptID taskId);
-
+  public boolean hasKey(String key);
+  
   /**
-   * TODO this has currently no use. Could later be used to deregister tasks
-   * from the barrier during runtime if they are finished. Something equal to
-   * voteToHalt() in Pregel.
-   * 
-   * @param jobId
-   * @param taskId
-   * @param hostAddress
-   * @param port
-   */
-  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
-      String hostAddress, long port);
-
-  /**
-   * This stops the sync daemon. Only used in YARN.
-   */
-  public void stopServer();
+  * Get list of child keys stored under the key provided.
+  * @param key The key whose child key set are to be found.
+  * @param listener Listener object that asynchronously notifies the changes 
+  * under the provided key
+  * @return Array of child keys.
+  */
+  public String[] getChildKeySet(String key, SyncEventListener listener);
 
   /**
-   * This method should close all used resources, e.G. a ZooKeeper instance.
-   * 
-   * @throws InterruptedException
+   * Register a listener for events on the key.
+   * @param key The key on which an event listener should be registered.
+   * @param event for which the listener is registered for.
+   * @param listener The event listener that defines how to process the event.
+   * @return true if the operation is successful.
    */
-  public void close() throws InterruptedException;
+  public boolean registerListener(String key, SyncEvent event,
+      SyncEventListener listener);
 
 }

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEvent.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * A distributed global synchronization event.   
+ */
+public interface SyncEvent {
+  
+  /**
+   * Returns the event identifier in the scheme of events defined for the
+   * global synchronization service.
+   * @return the event identifier 
+   */
+  public int getEventId();
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncEventListener.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * This class is used to define a listener to the synchronized global event.
+ *
+ */
+public abstract class SyncEventListener {
+  
+  /**
+   * Every event is identified by an event identifier. You can refer to 
+   * <code>SyncEvent</code> class.
+   * @param eventId The event identification code.
+   */
+  public abstract void handleEvent(int eventId);
+
+}

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java Mon Jun 25 01:14:24 2012
@@ -22,18 +22,27 @@ import org.apache.hadoop.util.Reflection
 
 public class SyncServiceFactory {
   public static final String SYNC_SERVER_CLASS = "hama.sync.server.class";
-  public static final String SYNC_CLIENT_CLASS = "hama.sync.client.class";
+  public static final String SYNC_PEER_CLASS = "hama.sync.peer.class";
+  public static final String SYNC_MASTER_CLASS = "hama.sync.master.class";
 
   /**
    * Returns a sync client via reflection based on what was configured.
    */
-  public static SyncClient getSyncClient(Configuration conf)
+  public static PeerSyncClient getPeerSyncClient(Configuration conf)
       throws ClassNotFoundException {
-    return (SyncClient) ReflectionUtils
-        .newInstance(conf.getClassByName(conf.get(SYNC_CLIENT_CLASS,
+    return (PeerSyncClient) ReflectionUtils
+        .newInstance(conf.getClassByName(conf.get(SYNC_PEER_CLASS,
             ZooKeeperSyncClientImpl.class.getName())), conf);
   }
 
+  
+  public static SyncClient getMasterSyncClient(Configuration conf)
+		  throws ClassNotFoundException {
+	  return (SyncClient) ReflectionUtils
+			  .newInstance(conf.getClassByName(conf.get(SYNC_MASTER_CLASS,
+					  ZKSyncBSPMasterClient.class.getName())), conf);
+  }
+
   /**
    * Returns a sync server via reflection based on what was configured.
    */

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncBSPMasterClient.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Zookeeper sunchronization client that is used by BSPMaster to maintain global
+ * state of cluster.
+ */
+public class ZKSyncBSPMasterClient extends ZKSyncClient implements
+    MasterSyncClient {
+
+  private ZooKeeper zk = null;
+  private String bspRoot = null;
+
+  Log LOG = LogFactory.getLog(ZKSyncBSPMasterClient.class);
+
+  @Override
+  public void init(HamaConfiguration conf) {
+    try {
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
+          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+    } catch (IOException e) {
+      LOG.error("Exception during reinitialization!", e);
+    }
+
+    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+        Constants.DEFAULT_ZOOKEEPER_ROOT);
+    LOG.info("Initialized ZK " + (null == zk));
+    Stat s = null;
+    if (zk != null) {
+      initialize(zk, bspRoot);
+      try {
+        s = zk.exists(bspRoot, false);
+      } catch (Exception e) {
+        LOG.error(s, e);
+      }
+
+      if (s == null) {
+        try {
+          zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      } else {
+        this.clearZKNodes();
+      }
+    }
+
+  }
+
+  /**
+   * Clears all sub-children of node bspRoot
+   */
+  private void clearZKNodes() {
+    try {
+      Stat s = zk.exists(bspRoot, false);
+      if (s != null) {
+        clearZKNodes(bspRoot);
+      }
+
+    } catch (Exception e) {
+      LOG.warn("Could not clear zookeeper nodes.", e);
+    }
+  }
+
+  /**
+   * Clears all sub-children of node rooted at path.
+   * 
+   * @param path
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private void clearZKNodes(String path) throws KeeperException,
+      InterruptedException {
+    ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
+
+    if (list.size() == 0) {
+      return;
+
+    } else {
+      for (String node : list) {
+        clearZKNodes(path + "/" + node);
+        LOG.info("Deleting " + path + "/" + node);
+        zk.delete(path + "/" + node, -1); // delete any version of this
+        // node.
+      }
+    }
+  }
+
+  private void createJobRoot(String string) {
+    writeNode(string, null, true, null);
+  }
+
+  @Override
+  public void clear() {
+    clearZKNodes();
+  }
+
+  @Override
+  public void registerJob(String string) {
+    // TODO: delete job root if key is present.
+    createJobRoot(string);
+  }
+
+  @Override
+  public void deregisterJob(String string) {
+    try {
+      clearZKNodes(bspRoot + "/" + string);
+      this.zk.delete(bspRoot + "/" + string, -1);
+    } catch (KeeperException e) {
+      LOG.error("Error deleting job " + string);
+    } catch (InterruptedException e) {
+      LOG.error("Error deleting job " + string);
+    }
+
+  }
+
+  @Override
+  public void close() {
+    try {
+      this.zk.close();
+    } catch (InterruptedException e) {
+      LOG.error("Error closing sync client", e);
+    }
+
+  }
+
+  @Override
+  public void process(WatchedEvent arg0) {
+    LOG.debug("Processing event " + arg0.getPath());
+    LOG.debug("Processing event type " + arg0.getType().toString());
+
+  }
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncClient.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,432 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.ReflectionUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * A Zookeeper based BSP distributed synchronization client that provides
+ * primitive synchronization API's
+ * 
+ */
+public abstract class ZKSyncClient implements SyncClient, Watcher {
+
+  Log LOG = LogFactory.getLog(ZKSyncClient.class);
+
+  private ZooKeeper zk;
+  private String bspRoot;
+
+  protected Map<String, List<ZKSyncEventListener>> eventListenerMap;
+
+  public ZKSyncClient() {
+    eventListenerMap = new HashMap<String, List<ZKSyncEventListener>>(10);
+  }
+
+  /**
+   * Initializes the zookeeper client-base.
+   */
+  protected void initialize(ZooKeeper zookeeper, String root) {
+    LOG.info("Initializing ZK Sync Client");
+    zk = zookeeper;
+    bspRoot = root;
+    eventListenerMap = new HashMap<String, List<ZKSyncEventListener>>(10);
+  }
+
+  /**
+   * Returns the Node name using conventions to address a superstep progress for
+   * task
+   * 
+   * @param taskId The Task ID
+   * @param superstep The superstep number
+   * @return String that represents the Zookeeper node path that could be
+   *         created.
+   */
+  protected String getNodeName(TaskAttemptID taskId, long superstep) {
+    return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
+        + taskId.toString();
+  }
+  
+  private String correctKey(String key){
+    if (!key.startsWith("/")) {
+      key = "/" + key;
+    }
+    return key;
+  }
+
+  /**
+   * Check if the zookeeper node exists.
+   * 
+   * @param path The Zookeeper node path to check.
+   * @param watcher A Watcher that would trigger interested events on the node.
+   *          This value could be null if no watcher has to be left.
+   * @return true if the node exists.
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  protected boolean isExists(final String path, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    synchronized (zk) {
+      return !(null == zk.exists(path, false));
+    }
+  }
+
+  /**
+   * Returns the zookeeper stat object.
+   * 
+   * @param path The path of the expected Zookeeper node.
+   * @return Stat object for the Zookeeper path.
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  protected Stat getStat(final String path) throws KeeperException,
+  InterruptedException {
+    synchronized (zk) {
+      return zk.exists(path, false);
+    }
+  }
+
+  private void createZnode(final String path, final CreateMode mode,
+      byte[] data, Watcher watcher) throws KeeperException,
+      InterruptedException {
+
+    synchronized (zk) {
+      Stat s = zk.exists(path, false);
+      if (null == s) {
+        try {
+          zk.create(path, data, Ids.OPEN_ACL_UNSAFE, mode);
+        } catch (KeeperException.NodeExistsException nee) {
+          LOG.debug("Ignore because znode may be already created at " + path,
+              nee);
+        }
+      }
+    }
+  }
+
+  /**
+   * Utility function to get byte array out of Writable
+   * 
+   * @param value The Writable object to be converted to byte array.
+   * @return byte array from the Writable object. Returns null on given null
+   *         value or on error.
+   */
+  protected byte[] getBytesForData(Writable value) {
+    byte[] data = null;
+
+    if (value != null) {
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      DataOutputStream outputStream = new DataOutputStream(byteStream);
+      try {
+        value.write(outputStream);
+        outputStream.flush();
+        data = byteStream.toByteArray();
+      } catch (IOException e) {
+        LOG.error("Error writing data to write buffer.", e);
+      } finally {
+        try {
+          byteStream.close();
+          outputStream.close();
+        } catch (IOException e) {
+          LOG.error("Error closing byte stream.", e);
+        }
+      }
+    }
+    return data;
+  }
+
+  /**
+   * Utility function to read Writable object value from byte array.
+   * @param data The byte array
+   * @param classType The Class object of expected Writable object.
+   * @return The instance of Writable object. 
+   * @throws IOException
+   */
+  protected Writable getValueFromBytes(
+      byte[] data, Class<? extends Writable> classType) throws IOException{
+    Writable value = null;
+    if (data != null) {
+      ByteArrayInputStream istream = new ByteArrayInputStream(data);
+      value = ReflectionUtils
+          .newInstance(classType, new Object[0]);
+      DataInputStream diStream = new DataInputStream(istream);
+      try {
+        value.readFields(diStream);
+      }
+      finally {
+        diStream.close();
+      }
+    }
+    return value;
+  }
+
+
+  /**
+   * Read value stored in the Zookeeper node.
+   * 
+   * @param path The path of the Zookeeper node.
+   * @param classType The expected class type of the Writable object.
+   * @return The Writable object constructed from the value read from the
+   *         Zookeeper node.
+   */
+  protected Writable extractData(String path,
+      Class<? extends Writable> classType) {
+    try {
+      Stat stat = getStat(path);
+      if (stat != null) {
+        byte[] data = this.zk.getData(path, false, stat);
+        Writable value = null;
+        try{
+          value = getValueFromBytes(data, classType);
+        }
+        catch (IOException e) {
+          LOG.error(
+              new StringBuffer(200).append("Error getting data from path ")
+              .append(path).toString(), e);
+          value = null;
+        }
+        return value;
+      }
+
+    } catch (KeeperException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+
+    } catch (InterruptedException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+
+    }
+    return null;
+
+  }
+
+  /**
+   * Writes data into the Zookeeper node. If the path does not exist the
+   * zookeeper node is created recursively and the value is stored in the node.
+   * 
+   * @param path The path of the Zookeeper node.
+   * @param value The value to be stored in the Zookeeper node.
+   * @param persistent true if the node to be created is ephemeral of permanent
+   * @param watcher If any Watcher object should listen to event on the
+   *          Zookeeper node.
+   * @return true if operation is successful.
+   */
+  protected boolean writeNode(String path, Writable value, boolean persistent,
+      Watcher watcher) {
+    if (path == null || "".equals(path.trim())) {
+      return false;
+    }
+    path = correctKey(path);
+    boolean pathExists = false;
+    try {
+      pathExists = isExists(path, watcher);
+    } catch (KeeperException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+    } catch (InterruptedException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(path).toString(), e);
+    }
+
+    byte[] data = getBytesForData(value);
+
+    if (!pathExists) {
+      try {
+        String[] pathComponents = path.split("/");
+        StringBuffer pathBuffer = new StringBuffer(path.length()
+            + pathComponents.length);
+        for (int i = 0; i < pathComponents.length - 1; ++i) {
+          if (pathComponents[i].equals(""))
+            continue;
+          pathBuffer.append("/").append(pathComponents[i]);
+          createZnode(pathBuffer.toString(), CreateMode.PERSISTENT, null,
+              watcher);
+        }
+        pathBuffer.append("/")
+        .append(pathComponents[pathComponents.length - 1]);
+        CreateMode mode = CreateMode.EPHEMERAL;
+        if (persistent) {
+          mode = CreateMode.PERSISTENT;
+        }
+        createZnode(pathBuffer.toString(), mode, data, watcher);
+
+        return true;
+      } catch (InterruptedException e) {
+        LOG.error(new StringBuilder(200).append("Error creating zk path ")
+            .append(path).toString(), e);
+      } catch (KeeperException e) {
+        LOG.error(new StringBuilder(200).append("Error creating zk path ")
+            .append(path).toString(), e);
+      }
+    } else if (value != null) {
+      try {
+        this.zk.setData(path, data, -1);
+        return true;
+      } catch (InterruptedException e) {
+        LOG.error(new StringBuilder(200).append("Error modifying zk path ")
+            .append(path).toString(), e);
+        return false;
+      } catch (KeeperException e) {
+        LOG.error(new StringBuilder(200).append("Error modifying zk path ")
+            .append(path).toString(), e);
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public String constructKey(BSPJobID jobId, String... args) {
+    StringBuffer keyBuffer = new StringBuffer(100);
+    keyBuffer.append(bspRoot);
+    if (jobId != null)
+      keyBuffer.append("/").append(jobId.toString());
+    for (String arg : args) {
+      keyBuffer.append("/").append(arg);
+    }
+    return keyBuffer.toString();
+  }
+
+  @Override
+  public boolean storeInformation(String key, Writable value,
+      boolean permanent, SyncEventListener listener) {
+    ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+    key = correctKey(key);
+    final String path = key;
+    LOG.info("Writing data " + path);
+    return writeNode(path, value, permanent, zkListener);
+  }
+
+  @Override
+  public Writable getInformation(String key, Class<? extends Writable> classType) {
+    key = correctKey(key);
+    final String path = key;
+    return extractData(path, classType);
+  }
+
+  @Override
+  public boolean addKey(String key, boolean permanent,
+      SyncEventListener listener) {
+    ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+    return writeNode(key, null, permanent, zkListener);
+  }
+
+  @Override
+  public boolean hasKey(String key) {
+    try {
+      return isExists(key, null);
+    } catch (KeeperException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(key).toString(), e);
+    } catch (InterruptedException e) {
+      LOG.error(new StringBuilder(200).append("Error checking zk path ")
+          .append(key).toString(), e);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean registerListener(String key, SyncEvent event,
+      SyncEventListener listener) {
+    key = correctKey(key);
+
+    LOG.debug("Registering listener for " + key);
+    ZKSyncEventListener zkListener = (ZKSyncEventListener) listener;
+    zkListener.setSyncEvent(event);
+    zkListener.setZKSyncClient(this);
+    synchronized (this.zk) {
+
+      try {
+        Stat stat = this.zk.exists(key, zkListener);
+        if (stat == null) {
+          writeNode(key, null, true, zkListener);
+        }
+        this.zk.getData(key, zkListener, stat);
+        this.zk.getChildren(key, zkListener);
+        // List<ZKSyncEventListener> list = this.eventListenerMap.get(key);
+        // if(!eventListenerMap.containsKey(key)){
+        // list = new ArrayList<ZKSyncEventListener>(5);
+        // }
+        // list.add(zkListener);
+        // this.eventListenerMap.put(key, list);
+        return true;
+      } catch (KeeperException e) {
+        LOG.error("Error getting stat and data.", e);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted getting stat and data.", e);
+      }
+
+    }
+    return false;
+  }
+
+  @Override
+  public String[] getChildKeySet(String key, SyncEventListener listener) {
+    key = correctKey(key);
+    ZKSyncEventListener zkListener = null;
+    if (listener != null) {
+      zkListener = (ZKSyncEventListener) listener;
+    }
+    Stat stat = null;
+    String[] children = new String[0];
+    try {
+      stat = this.zk.exists(key, null);
+
+    } catch (KeeperException e) {
+      LOG.error("Error getting stat and data.", e);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted getting stat and data.", e);
+    }
+    if (stat == null)
+      return children;
+
+    try {
+      List<String> childList = this.zk.getChildren(key, zkListener);
+      children = new String[childList.size()];
+      childList.toArray(children);
+    } catch (KeeperException e) {
+      LOG.error("Error getting stat and data.", e);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted getting stat and data.", e);
+    }
+
+    return children;
+  }
+
+  
+  
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventFactory.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+/**
+ * Zookeeper Synchronization Event Factory. 
+ * <ul>It provides three event definitions. 
+ * <li>Value stored in a Zookeeper node is changed
+ * <li>A new child node is added to a Zookeeper node.
+ * <li>A Zookeeper node is deleted.
+ * </ul>
+ */
+public class ZKSyncEventFactory {
+  
+  public static enum ZKEvent{
+    VALUE_CHANGE_EVENT(0),
+    CHILD_ADD_EVENT(1),
+    DELETE_EVENT(2);
+    
+    private final int id;
+    
+    ZKEvent(int num){
+      this.id = num;
+    }
+    
+    public int getValue(){
+      return this.id;
+    }
+    
+    public static int getEventCount(){
+      return ZKEvent.values().length;
+    }
+    
+    public String getName(int num){
+      if(num >=0 && num < ZKEvent.getEventCount()){
+        return ZKEvent.values()[num].name();
+      }
+      else 
+        throw new IllegalArgumentException((new StringBuilder(100)
+              .append("The value ")
+              .append(num).append(" is not a valid ZKEvent type. ")
+              .append("Expected range is 0-")
+              .append(getEventCount()-1)).toString());
+    }
+    
+  };
+  
+  public static int getSupportedEventCount(){
+    return ZKEvent.getEventCount();
+  }
+
+  private static class ValueChangeEvent implements SyncEvent {
+
+    @Override
+    public int getEventId() {
+      return ZKEvent.VALUE_CHANGE_EVENT.getValue();
+    }
+    
+  }
+  
+  private static class ChildAddEvent implements SyncEvent {
+
+    @Override
+    public int getEventId() {
+      return ZKEvent.CHILD_ADD_EVENT.getValue();
+    }
+    
+  }
+  
+  private static class DeleteEvent implements SyncEvent {
+
+    @Override
+    public int getEventId() {
+      return ZKEvent.DELETE_EVENT.getValue();
+    }
+    
+  }
+  
+  /**
+   * Provides the Zookeeper node value change event definition.
+   * @return the Zookeeper value changed event.
+   */
+  public static SyncEvent getValueChangeEvent(){
+    return new ValueChangeEvent();
+  }
+  
+  /**
+   * Provides the Zookeeper deletion event definition.
+   * @return the Zookeeper node is deleted event
+   */
+  public static SyncEvent getDeletionEvent(){
+    return new DeleteEvent();
+  }
+  
+  /**
+   * Provides the Zookeeper child addition event definition. 
+   * @return the Zookeeper child node is added event
+   */
+  public static SyncEvent getChildAddEvent(){
+    return new ChildAddEvent();
+  }
+  
+
+}

Added: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java?rev=1353374&view=auto
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java (added)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZKSyncEventListener.java Mon Jun 25 01:14:24 2012
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.sync;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public abstract class ZKSyncEventListener extends SyncEventListener
+ implements Watcher {
+Log LOG = LogFactory.getLog(SyncEventListener.class);
+  
+  private ZKSyncClient client;
+  private SyncEvent event;
+
+  /**
+   * 
+   */
+  @Override
+  public void process(WatchedEvent event) {
+    
+    client.registerListener(event.getPath(),
+        ZKSyncEventFactory.getValueChangeEvent()
+        , this);    
+    //if(LOG.isDebugEnabled()){
+      LOG.debug(event.toString());
+    //}
+
+    if(event.getType().equals(EventType.NodeChildrenChanged)){
+      LOG.debug("Node children changed - " + event.getPath());
+      onChildKeySetChange();
+    }
+    else if (event.getType().equals(EventType.NodeDeleted)){
+      LOG.debug("Node children deleted - " + event.getPath());
+      onDelete();
+    }
+    else if (event.getType().equals(EventType.NodeDataChanged)){
+      LOG.debug("Node children changed - " + event.getPath());
+      
+      onChange();
+    }
+
+  }
+  
+  public void setZKSyncClient(ZKSyncClient zkClient){
+    client = zkClient;
+  }
+  
+  public void setSyncEvent(SyncEvent event){
+    this.event = event;
+  }
+  
+  public SyncEvent getEvent(){
+    return this.event;
+  }
+
+  /**
+   * 
+   */
+  public abstract void onDelete();
+
+  /**
+   * 
+   */
+  public abstract void onChange();
+
+  /**
+   * 
+   */
+  public abstract void onChildKeySetChange();
+
+  @Override
+  public void handleEvent(int eventId) {
+    
+  }
+
+}

Modified: hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/branches/HAMA-505-branch/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Mon Jun 25 01:14:24 2012
@@ -17,11 +17,6 @@
  */
 package org.apache.hama.bsp.sync;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
@@ -47,7 +42,8 @@ import org.apache.zookeeper.data.Stat;
  * This client class abstracts the use of our zookeeper sync code.
  * 
  */
-public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
+public class ZooKeeperSyncClientImpl extends ZKSyncClient implements
+    PeerSyncClient {
 
   /*
    * TODO maybe extract an abstract class and let the subclasses implement
@@ -81,6 +77,8 @@ public class ZooKeeperSyncClientImpl imp
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
 
+    initialize(this.zk, bspRoot);
+
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
     LOG.info("Start connecting to Zookeeper! At " + peerAddress);
     numBSPTasks = conf.getInt("bsp.peers.num", 1);
@@ -93,14 +91,13 @@ public class ZooKeeperSyncClientImpl imp
 
     try {
       synchronized (zk) {
-        createZnode(bspRoot);
         final String pathToJobIdZnode = bspRoot + "/"
             + taskId.getJobID().toString();
-        createZnode(pathToJobIdZnode);
         final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep;
-        createZnode(pathToSuperstepZnode);
+        writeNode(pathToSuperstepZnode, null, true, null);
         BarrierWatcher barrierWatcher = new BarrierWatcher();
-        // this is really needed to register the barrier watcher, don't remove this line!
+        // this is really needed to register the barrier watcher, don't remove
+        // this line!
         zk.exists(pathToSuperstepZnode + "/ready", barrierWatcher);
         zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE,
             CreateMode.EPHEMERAL);
@@ -131,7 +128,7 @@ public class ZooKeeperSyncClientImpl imp
         } else {
           LOG.debug("---> at superstep: " + superstep
               + " task that is creating /ready znode:" + taskId.toString());
-          createEphemeralZnode(pathToSuperstepZnode + "/ready");
+          writeNode(pathToSuperstepZnode + "/ready", null, false, null);
         }
       }
     } catch (Exception e) {
@@ -236,8 +233,9 @@ public class ZooKeeperSyncClientImpl imp
   public void register(BSPJobID jobId, TaskAttemptID taskId,
       String hostAddress, long port) {
     try {
-      if (zk.exists("/" + jobId.toString(), false) == null) {
-        zk.create("/" + jobId.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+      String jobRegisterKey = constructKey(jobId, "peers");
+      if (zk.exists(jobRegisterKey, false) == null) {
+        zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT);
       }
     } catch (KeeperException e) {
@@ -245,7 +243,7 @@ public class ZooKeeperSyncClientImpl imp
     } catch (InterruptedException e) {
       LOG.error(e);
     }
-    registerTask(zk, jobId, hostAddress, port, taskId);
+    registerTask(jobId, hostAddress, port, taskId);
   }
 
   /**
@@ -259,54 +257,14 @@ public class ZooKeeperSyncClientImpl imp
    * @param port
    * @param taskId
    */
-  public static void registerTask(ZooKeeper zk, BSPJobID jobId,
-      String hostAddress, long port, TaskAttemptID taskId) {
-
-    byte[] taskIdBytes = serializeTaskId(taskId);
-
-    try {
-      zk.create("/" + jobId.toString() + "/" + hostAddress + ":" + port,
-          taskIdBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-    } catch (KeeperException e) {
-      LOG.error(e);
-    } catch (InterruptedException e) {
-      LOG.error(e);
-    }
-  }
+  public void registerTask(BSPJobID jobId, String hostAddress, long port,
+      TaskAttemptID taskId) {
 
-  private static byte[] serializeTaskId(TaskAttemptID taskId) {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bos);
-    try {
-      taskId.write(out);
-    } catch (IOException e) {
-      LOG.error(e);
-    } finally {
-      try {
-        out.close();
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-    return bos.toByteArray();
-  }
+    // byte[] taskIdBytes = serializeTaskId(taskId);
+    String taskRegisterKey = constructKey(jobId, "peers", hostAddress + ":"
+        + port);
+    writeNode(taskRegisterKey, taskId, false, null);
 
-  public static TaskAttemptID deserializeTaskId(byte[] arr) {
-    ByteArrayInputStream bis = new ByteArrayInputStream(arr);
-    DataInputStream in = new DataInputStream(bis);
-    TaskAttemptID id = new TaskAttemptID();
-    try {
-      id.readFields(in);
-    } catch (IOException e) {
-      LOG.error(e);
-    } finally {
-      try {
-        in.close();
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-    return id;
   }
 
   @Override
@@ -314,13 +272,14 @@ public class ZooKeeperSyncClientImpl imp
     if (allPeers == null) {
       TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
       try {
-        allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
-            .toArray(new String[0]);
+        allPeers = zk.getChildren(constructKey(taskId.getJobID(), "peers"),
+            this).toArray(new String[0]);
 
         for (String s : allPeers) {
-          byte[] data = zk.getData(
-              "/" + taskId.getJobID().toString() + "/" + s, this, null);
-          TaskAttemptID thatTask = deserializeTaskId(data);
+          byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
+              this, null);
+          TaskAttemptID thatTask = (TaskAttemptID) getValueFromBytes(data,
+              TaskAttemptID.class);
           LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
               + thatTask.getTaskID().getId() + " : " + s);
           sortedMap.put(thatTask.getTaskID().getId(), s);
@@ -379,36 +338,6 @@ public class ZooKeeperSyncClientImpl imp
     return peerAddress.getHostName() + ":" + peerAddress.getPort();
   }
 
-  private String getNodeName(TaskAttemptID taskId, long superstep) {
-    return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
-        + taskId.toString();
-  }
-
-  private void createZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.PERSISTENT);
-  }
-
-  private void createEphemeralZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.EPHEMERAL);
-  }
-
-  private void createZnode(final String path, final CreateMode mode)
-      throws KeeperException, InterruptedException {
-    synchronized (zk) {
-      Stat s = zk.exists(path, false);
-      if (null == s) {
-        try {
-          zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
-        } catch (KeeperException.NodeExistsException nee) {
-          LOG.debug("Ignore because znode may be already created at " + path,
-              nee);
-        }
-      }
-    }
-  }
-
   /*
    * INNER CLASSES
    */

Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Mon Jun 25 01:14:24 2012
@@ -316,7 +316,7 @@ public class TestBSPTaskFaults extends T
       HamaConfiguration hamaConf = new HamaConfiguration();
       hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200);
       hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
-      hamaConf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+      hamaConf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
           LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
       hamaConf.setInt("bsp.master.port", 610002);
@@ -421,7 +421,7 @@ public class TestBSPTaskFaults extends T
 
     conf.setInt(Constants.GROOM_PING_PERIOD, 200);
     conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+    conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
         LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
     int testNumber = incrementTestNumber();

Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon Jun 25 01:14:24 2012
@@ -48,7 +48,7 @@ public class TestCheckpoint extends Test
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public void testCheckpoint() throws Exception {
     Configuration config = new Configuration();
-    config.set(SyncServiceFactory.SYNC_CLIENT_CLASS,
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
         LocalBSPRunner.LocalSyncClient.class.getName());
     config.set("bsp.output.dir", "/tmp/hama-test_out");
     FileSystem dfs = FileSystem.get(config);
@@ -89,7 +89,7 @@ public class TestCheckpoint extends Test
 
     Configuration conf = new Configuration();
     conf.set("bsp.output.dir", "/tmp/hama-test_out");
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+    conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
         LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
     conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);

Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Mon Jun 25 01:14:24 2012
@@ -20,20 +20,20 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.util.ArrayList;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
+import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
 import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl;
 import org.apache.hama.util.BSPNetUtils;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.mortbay.log.Log;
 
 public class TestZooKeeper extends TestCase {
 
@@ -57,6 +57,7 @@ public class TestZooKeeper extends TestC
   public void testClearZKNodes() throws IOException, KeeperException,
       InterruptedException {
     final ZooKeeperSyncServerImpl server = new ZooKeeperSyncServerImpl();
+    boolean done = false;
     try {
       server.init(configuration);
       new Thread(new Runnable() {
@@ -71,55 +72,93 @@ public class TestZooKeeper extends TestC
         }
       }).start();
 
-      int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
-          6000);
-      String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
-      String bspRoot = "/";
-      // Establishing a zk session.
-      ZooKeeper zk = new ZooKeeper(connectStr, timeout, null);
-
-      // Creating dummy bspRoot if it doesn't already exist.
-      Stat s = zk.exists(bspRoot, false);
-      if (s == null) {
-        zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-      }
-
-      // Creating dummy child nodes at depth 1.
-      String node1 = bspRoot + "task1";
-      String node2 = bspRoot + "task2";
-      zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-      // Creating dummy child node at depth 2.
-      String node11 = node1 + "superstep1";
-      zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-      ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot,
-          false);
-      assertEquals(2, list.size());
-      System.out.println(list.size());
-
-      // clear it
-      BSPMaster.clearZKNodes(zk, "/");
-
-      list = (ArrayList<String>) zk.getChildren(bspRoot, false);
-      System.out.println(list.size());
-      assertEquals(0, list.size());
-
-      try {
-        zk.getData(node11, false, null);
-        fail();
-      } catch (KeeperException.NoNodeException e) {
-        System.out.println("Node has been removed correctly!");
-      } finally {
-        zk.close();
-      }
+      Thread.sleep(1000);
+
+      String bspRoot = "/bsp";
+
+      ZooKeeperSyncClientImpl peerClient = (ZooKeeperSyncClientImpl) SyncServiceFactory
+          .getPeerSyncClient(configuration);
+
+      ZKSyncBSPMasterClient masterClient = (ZKSyncBSPMasterClient) SyncServiceFactory
+          .getMasterSyncClient(configuration);
+
+      masterClient.init(configuration);
+
+      Thread.sleep(100);
+
+      Log.info("Created master and client sync clients");
+
+      assertTrue(masterClient.hasKey(bspRoot));
+
+      Log.info("BSP root exists");
+
+      BSPJobID jobID = new BSPJobID("test1", 1);
+      masterClient.registerJob(jobID.toString());
+      TaskID taskId1 = new TaskID(jobID, 1);
+      TaskID taskId2 = new TaskID(jobID, 2);
+
+      TaskAttemptID task1 = new TaskAttemptID(taskId1, 1);
+      TaskAttemptID task2 = new TaskAttemptID(taskId2, 1);
+
+      int zkPort = BSPNetUtils.getFreePort(21815);
+      configuration.setInt(Constants.PEER_PORT, zkPort);
+      peerClient.init(configuration, jobID, task1);
+
+      peerClient.registerTask(jobID, "hamanode1", 5000L, task1);
+      peerClient.registerTask(jobID, "hamanode2", 5000L, task2);
+
+      peerClient.storeInformation(
+          peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5),
+          true, null);
+
+      String[] names = peerClient.getAllPeerNames(task1);
+
+      Log.info("Found child count = " + names.length);
+
+      assertEquals(2, names.length);
+
+      Log.info("Passed the child count test");
+
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"),
+          true, null);
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"),
+          true, null);
+
+      String[] peerChild = masterClient.getChildKeySet(
+          masterClient.constructKey(jobID, "peer"), null);
+      Log.info("Found child count = " + peerChild.length);
+
+      assertEquals(2, peerChild.length);
+
+      Log.info(" Peer name " + peerChild[0]);
+      Log.info(" Peer name " + peerChild[1]);
+
+      Log.info("Passed the child key set test");
+
+      masterClient.deregisterJob(jobID.toString());
+      Log.info(masterClient.constructKey(jobID));
+
+      Thread.sleep(200);
+
+      assertEquals(false, masterClient.hasKey(masterClient.constructKey(jobID)));
+
+      Log.info("Passed the key presence test");
+
+      Writable value = masterClient
+          .getInformation(masterClient.constructKey(jobID, "info", "level2"),
+              IntWritable.class);
+
+      assertEquals(null, value);
+      Log.info("Passed the null value check.");
+      done = true;
+
     } catch (Exception e) {
       e.printStackTrace();
+
     } finally {
       server.stopServer();
     }
+    assertEquals(true, done);
   }
 
 }

Modified: hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1353374&r1=1353373&r2=1353374&view=diff
==============================================================================
--- hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java (original)
+++ hama/branches/HAMA-505-branch/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java Mon Jun 25 01:14:24 2012
@@ -17,20 +17,67 @@
  */
 package org.apache.hama.bsp.sync;
 
+import java.io.File;
+import java.util.concurrent.Executors;
+
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.util.BSPNetUtils;
 
 public class TestSyncServiceFactory extends TestCase {
 
+  public static final Log LOG = LogFactory.getLog(TestCase.class);
+  
+  public static class ListenerTest extends ZKSyncEventListener{
+
+    private Text value;
+    
+    public ListenerTest(){
+      value = new Text("init");
+    }
+    
+    public String getValue(){
+      return value.toString();
+    }
+    
+    @Override
+    public void onDelete() {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void onChange() {
+      LOG.info("ZK value changed event triggered.");
+      value.set("Changed");
+      
+    }
+
+    @Override
+    public void onChildKeySetChange() {
+      // TODO Auto-generated method stub
+      
+    }
+    
+  }
+
   public void testClientInstantiation() throws Exception {
 
     Configuration conf = new Configuration();
     // given null, should return zookeeper
-    SyncClient syncClient = SyncServiceFactory.getSyncClient(conf);
+    PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(conf);
     assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
   }
-  
+
   public void testServerInstantiation() throws Exception {
 
     Configuration conf = new Configuration();
@@ -39,4 +86,126 @@ public class TestSyncServiceFactory exte
     assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
   }
 
+  private static class ZKServerThread implements Runnable {
+
+    SyncServer server;
+
+    ZKServerThread(SyncServer s) {
+      server = s;
+    }
+
+    @Override
+    public void run() {
+      // TODO Auto-generated method stub
+      try {
+        server.start();
+      } catch (Exception e) {
+        LOG.error("Error running server.", e);
+      }
+    }
+
+  }
+
+  public void testZKSyncStore() throws Exception {
+    Configuration conf = new Configuration();
+    int zkPort = BSPNetUtils.getFreePort(21811);
+    conf.setInt(Constants.PEER_PORT, zkPort);
+    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort);
+    conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000");
+    // given null, should return zookeeper
+    final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf);
+    syncServer.init(conf);
+    assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
+
+    ZKServerThread serverThread = new ZKServerThread(syncServer);
+    Executors.newFixedThreadPool(1).submit(serverThread);
+
+    Thread.sleep(1000);
+
+    final PeerSyncClient syncClient = (PeerSyncClient)
+    		SyncServiceFactory.getPeerSyncClient(conf);
+    assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
+    BSPJobID jobId = new BSPJobID("abc", 1);
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1);
+    syncClient.init(conf, jobId, taskId);
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        try {
+          syncServer.stopServer();
+
+        } catch (Exception e) {
+          // too late to log!
+        }
+      }
+    });
+
+    try {
+      IntWritable data = new IntWritable(5);
+      syncClient.storeInformation(
+          syncClient.constructKey(jobId, String.valueOf(1L), "test"), data,
+          true, null);
+      
+      ListenerTest listenerTest = new ListenerTest();
+      
+      
+      syncClient.registerListener(
+          syncClient.constructKey(jobId, String.valueOf(1L), "test"), 
+          ZKSyncEventFactory.getValueChangeEvent(),
+          listenerTest);
+      
+      IntWritable value = (IntWritable) syncClient.getInformation(
+          syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+          IntWritable.class);
+      assertTrue(value != null);
+      int intVal = value == null ? 0 : value.get();
+      assertTrue(intVal == data.get());
+      
+      data.set(6);
+      syncClient.storeInformation(
+          syncClient.constructKey(jobId, String.valueOf(1L), "test"), data,
+          true, null);
+      value = (IntWritable) syncClient.getInformation(
+          syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+          IntWritable.class);
+
+      
+      intVal = value == null ? 0 : value.get();
+      assertTrue(intVal == data.get());
+      
+      Thread.sleep(5000);
+      
+      assertEquals(true, listenerTest.getValue().equals("Changed"));
+      
+      
+      syncServer.stopServer();
+    } finally {
+
+      String dir = System.getProperty("user.dir");
+      LOG.info("Deleting zookeeper files in " + dir);
+      File zookeeperDir = new File(dir + File.separator + "nullzookeeper");
+      if (zookeeperDir.exists()) {
+        File[] files = zookeeperDir.listFiles();
+        for (File file : files) {
+          if (file.isDirectory()) {
+            File[] childFiles = file.listFiles();
+            for (File childFile : childFiles) {
+              LOG.info("Deleting zookeeper file - "
+                  + childFile.getAbsolutePath());
+              childFile.delete();
+            }
+          } else {
+            LOG.info("Deleting zookeeper file - " + file.getAbsolutePath());
+            file.delete();
+          }
+        }
+        zookeeperDir.delete();
+
+      }
+    }
+
+  }
+
 }



Mime
View raw message