hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From surajsme...@apache.org
Subject svn commit: r1369575 [2/2] - in /hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/h...
Date Sun, 05 Aug 2012 13:13:28 GMT
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sun Aug  5 13:13:26 2012
@@ -22,7 +22,9 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map.Entry;
+import java.util.Queue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
@@ -61,6 +64,9 @@ public abstract class AbstractMessageMan
   // the task attempt id
   protected TaskAttemptID attemptId;
 
+  // List of listeners for all the sent messages
+  protected Queue<MessageEventListener<M>> messageListenerQueue;
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
@@ -70,12 +76,14 @@ public abstract class AbstractMessageMan
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
       Configuration conf, InetSocketAddress peerAddress) {
+    this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
     this.attemptId = attemptId;
     this.peer = peer;
     this.conf = conf;
     this.peerAddress = peerAddress;
     localQueue = getQueue();
     localQueueForNextIteration = getSynchronizedQueue();
+    
   }
 
   /*
@@ -84,18 +92,22 @@ public abstract class AbstractMessageMan
    */
   @Override
   public void close() {
-    Collection<MessageQueue<M>> values = outgoingQueues.values();
-    for (MessageQueue<M> msgQueue : values) {
-      msgQueue.close();
-    }
-    localQueue.close();
-    // remove possible disk queues from the path
     try {
-      FileSystem.get(conf).delete(
-          DiskQueue.getQueueDir(conf, attemptId,
-              conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
-    } catch (IOException e) {
-      LOG.warn("Queue dir couldn't be deleted");
+      Collection<MessageQueue<M>> values = outgoingQueues.values();
+      for (MessageQueue<M> msgQueue : values) {
+        msgQueue.close();
+      }
+      localQueue.close();
+      // remove possible disk queues from the path
+      try {
+        FileSystem.get(conf).delete(
+            DiskQueue.getQueueDir(conf, attemptId,
+                conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
+      } catch (IOException e) {
+        LOG.warn("Queue dir couldn't be deleted");
+      }
+    } finally {
+      notifyClose();
     }
 
   }
@@ -139,6 +151,7 @@ public abstract class AbstractMessageMan
     localQueue = localQueueForNextIteration.getMessageQueue();
     localQueue.prepareRead();
     localQueueForNextIteration = getSynchronizedQueue();
+    notifyInit();
   }
 
   /*
@@ -163,6 +176,7 @@ public abstract class AbstractMessageMan
     queue.add(msg);
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
     outgoingQueues.put(targetPeerAddress, queue);
+    notifySentMessage(peerName, msg);
   }
 
   /*
@@ -206,4 +220,68 @@ public abstract class AbstractMessageMan
     this.conf = conf;
   }
 
+  private void notifySentMessage(String peerName, M message) {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onMessageSent(peerName, message);
+    }
+  }
+
+  private void notifyReceivedMessage(M message) throws IOException {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onMessageReceived(message);
+    }
+  }
+
+  private void notifyInit() {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onInitialized();
+    }
+  }
+
+  private void notifyClose() {
+    Iterator<MessageEventListener<M>> iterator = this.messageListenerQueue
+        .iterator();
+    while (iterator.hasNext()) {
+      iterator.next().onClose();
+    }
+  }
+
+  
+
+  @Override
+  public void registerListener(MessageEventListener<M> listener)
+      throws IOException {
+    if(listener != null)
+      this.messageListenerQueue.add(listener);
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException{
+    for (Writable message : bundle.getMessages()) {
+      loopBackMessage((M)message);
+    }
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void loopBackMessage(Writable message) throws IOException{
+    this.localQueueForNextIteration.add((M)message);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+    notifyReceivedMessage((M)message);
+    
+  }
+  
+  
+  
+  
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Sun Aug  5 13:13:26 2012
@@ -25,7 +25,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
-import java.util.Iterator;
 
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
@@ -61,14 +60,8 @@ public final class AvroMessageManagerImp
     server.close();
   }
 
-  public void put(BSPMessageBundle<M> messages) {
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-        messages.getMessages().size());
-    Iterator<M> iterator = messages.getMessages().iterator();
-    while (iterator.hasNext()) {
-      this.localQueueForNextIteration.add(iterator.next());
-      iterator.remove();
-    }
+  public void put(BSPMessageBundle<M> messages) throws IOException {
+    this.loopBackMessages(messages);
   }
 
   @SuppressWarnings("unchecked")
@@ -139,5 +132,4 @@ public final class AvroMessageManagerImp
       return ByteBuffer.wrap(data);
     }
   }
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java Sun Aug  5 13:13:26 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp.message;
 
+import java.io.IOException;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
@@ -35,7 +37,7 @@ public interface HadoopMessageManager<M 
    * 
    * @param msg
    */
-  public void put(M msg);
+  public void put(M msg) throws IOException;
 
   /**
    * This method puts a messagebundle for the next iteration. Accessed
@@ -43,7 +45,7 @@ public interface HadoopMessageManager<M 
    * 
    * @param messages
    */
-  public void put(BSPMessageBundle<M> messages);
+  public void put(BSPMessageBundle<M> messages) throws IOException;
 
   /**
    * This method puts a compressed message bundle for the next iteration.
@@ -51,6 +53,6 @@ public interface HadoopMessageManager<M 
    * 
    * @param compMsgBundle
    */
-  public void put(BSPCompressedBundle compMsgBundle);
+  public void put(BSPCompressedBundle compMsgBundle) throws IOException;
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java Sun Aug  5 13:13:26 2012
@@ -29,7 +29,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -115,24 +114,19 @@ public final class HadoopMessageManagerI
   }
 
   @Override
-  public final void put(M msg) {
-    this.localQueueForNextIteration.add(msg);
-    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, 1L);
+  public final void put(M msg) throws IOException {
+    loopBackMessage(msg);
   }
 
   @Override
-  public final void put(BSPMessageBundle<M> messages) {
-    for (M message : messages.getMessages()) {
-      this.localQueueForNextIteration.add(message);
-    }
+  public final void put(BSPMessageBundle<M> messages) throws IOException {
+    loopBackMessages(messages);
   }
 
   @Override
-  public final void put(BSPCompressedBundle compMsgBundle) {
+  public final void put(BSPCompressedBundle compMsgBundle) throws IOException {
     BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
-    for (M message : bundle.getMessages()) {
-      this.localQueueForNextIteration.add(message);
-    }
+    loopBackMessages(bundle);
   }
 
   @Override
@@ -141,4 +135,5 @@ public final class HadoopMessageManagerI
     return versionID;
   }
 
+  
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Sun Aug  5 13:13:26 2012
@@ -34,7 +34,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * 
  */
 public interface MessageManager<M extends Writable> {
-  
+
   public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
 
   /**
@@ -96,4 +96,25 @@ public interface MessageManager<M extend
    */
   public int getNumCurrentMessages();
 
+  /**
+   * Send the messages to self to receive in the next superstep.
+   */
+  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) throws IOException;
+  
+  /**
+   * Send the message to self to receive in the next superstep.
+   */
+  public void loopBackMessage(Writable message) throws IOException;
+
+  /**
+   * Register a listener for the events in message manager.
+   * 
+   * @param listener <code>MessageEventListener</code> object that processes the
+   *          messages sent to remote peer.
+   * @throws IOException
+   */
+  public void registerListener(MessageEventListener<M> listener)
+      throws IOException;
+  
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Sun Aug  5 13:13:26 2012
@@ -17,9 +17,10 @@
  */
 package org.apache.hama.bsp.sync;
 
-import org.apache.hadoop.conf.Configuration;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPJobID;
-import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * Basic interface for a client that connects to a sync server.
@@ -28,82 +29,85 @@ import org.apache.hama.bsp.TaskAttemptID
 public interface SyncClient {
 
   /**
-   * Init will be called within a spawned task, it should be used to initialize
-   * the inner structure and fields, e.G. a zookeeper client or an rpc
-   * connection to the real sync daemon.
-   * 
-   * @throws Exception
+   * Construct key in the format required by the SyncClient for storing and 
+   * retrieving information. This function is recommended to use to construct
+   * keys for storing keys.
+   * @param jobId The BSP Job Id.
+   * @param args The list of String objects that would be used to construct key
+   * @return The key consisting of entities provided in the required format.
    */
-  public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
-      throws Exception;
+  public String constructKey(BSPJobID jobId, String ... args);
 
   /**
-   * Enters the barrier before the message sending in each superstep.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param superstep the superstep of the task
-   * @throws SyncException
+   * Stores value for the specified key.
+   * @param key The key for which value should be stored. It is recommended to use 
+   * <code>constructKey</code> to create key object.
+   * @param value The value to be stored.
+   * @param permanent true if the value should be persisted after end of session.
+   * @param Listener object that provides asynchronous updates on the state 
+   * of information stored under the key.
+   * @return true if the operation was successful.
    */
-  public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public boolean storeInformation(String key, Writable value, 
+      boolean permanent, SyncEventListener listener);
 
   /**
-   * Leaves the barrier after all communication has been done, this is usually
-   * the end of a superstep.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param superstep the superstep of the task
-   * @throws SyncException
+   * Retrieve value previously store for the key.
+   * @param key The key for which value was stored.
+   * @param classType The expected class instance of value to be extracted
+   * @return the value if found. Returns null if there was any error of if there
+   * was no value stored for the key.
    */
-  public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
-      throws SyncException;
+  public boolean getInformation(String key, Writable valueHolder);
 
   /**
-   * Registers a specific task with a its host and port to the sync daemon.
-   * 
-   * @param jobId the jobs ID
-   * @param taskId the tasks ID
-   * @param hostAddress the host where the sync server resides
-   * @param port the port where the sync server is up
+   * Store new key in key set.
+   * @param key The key to be saved in key set. It is recommended to use 
+   * <code>constructKey</code> to create key object. 
+   * @param permanent true if the value should be persisted after end of session.
+   * @param listener Listener object that asynchronously notifies the events 
+   * related to the key.
+   * @return true if operation was successful.
    */
-  public void register(BSPJobID jobId, TaskAttemptID taskId,
-      String hostAddress, long port);
+  public boolean addKey(String key, boolean permanent, SyncEventListener listener);
 
   /**
-   * Returns all registered tasks within the sync daemon. They have to be
-   * ordered ascending by their task id.
-   * 
-   * @param taskId the tasks ID
-   * @return an <b>ordered</b> string array of host:port pairs of all tasks
-   *         connected to the daemon.
+   * Check if key was previously stored.
+   * @param key The value of the key. 
+   * @return true if the key exists.
    */
-  public String[] getAllPeerNames(TaskAttemptID taskId);
+  public boolean hasKey(String key);
+  
+  /**
+  * Get list of child keys stored under the key provided.
+  * @param key The key whose child key set are to be found.
+  * @param listener Listener object that asynchronously notifies the changes 
+  * under the provided key
+  * @return Array of child keys.
+  */
+  public String[] getChildKeySet(String key, SyncEventListener listener);
 
   /**
-   * TODO this has currently no use. Could later be used to deregister tasks
-   * from the barrier during runtime if they are finished. Something equal to
-   * voteToHalt() in Pregel.
-   * 
-   * @param jobId
-   * @param taskId
-   * @param hostAddress
-   * @param port
+   * Register a listener for events on the key.
+   * @param key The key on which an event listener should be registered.
+   * @param event for which the listener is registered for.
+   * @param listener The event listener that defines how to process the event.
+   * @return true if the operation is successful.
    */
-  public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
-      String hostAddress, long port);
+  public boolean registerListener(String key, SyncEvent event,
+      SyncEventListener listener);
 
   /**
-   * This stops the sync daemon. Only used in YARN.
+   * Delete the key and the information stored under it.
+   * @param key
+   * @param listener
+   * @return
    */
-  public void stopServer();
-
+  public boolean remove(String key, SyncEventListener listener);
+  
   /**
-   * This method should close all used resources, e.G. a ZooKeeper instance.
    * 
-   * @throws InterruptedException
    */
-  public void close() throws InterruptedException;
-
+  public void close() throws IOException;
+  
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java Sun Aug  5 13:13:26 2012
@@ -22,18 +22,27 @@ import org.apache.hadoop.util.Reflection
 
 public class SyncServiceFactory {
   public static final String SYNC_SERVER_CLASS = "hama.sync.server.class";
-  public static final String SYNC_CLIENT_CLASS = "hama.sync.client.class";
+  public static final String SYNC_PEER_CLASS = "hama.sync.peer.class";
+  public static final String SYNC_MASTER_CLASS = "hama.sync.master.class";
 
   /**
    * Returns a sync client via reflection based on what was configured.
    */
-  public static SyncClient getSyncClient(Configuration conf)
+  public static PeerSyncClient getPeerSyncClient(Configuration conf)
       throws ClassNotFoundException {
-    return (SyncClient) ReflectionUtils
-        .newInstance(conf.getClassByName(conf.get(SYNC_CLIENT_CLASS,
+    return (PeerSyncClient) ReflectionUtils
+        .newInstance(conf.getClassByName(conf.get(SYNC_PEER_CLASS,
             ZooKeeperSyncClientImpl.class.getName())), conf);
   }
 
+  
+  public static SyncClient getMasterSyncClient(Configuration conf)
+		  throws ClassNotFoundException {
+	  return (SyncClient) ReflectionUtils
+			  .newInstance(conf.getClassByName(conf.get(SYNC_MASTER_CLASS,
+					  ZKSyncBSPMasterClient.class.getName())), conf);
+  }
+
   /**
    * Returns a sync server via reflection based on what was configured.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Sun Aug  5 13:13:26 2012
@@ -17,10 +17,6 @@
  */
 package org.apache.hama.bsp.sync;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
@@ -47,7 +43,8 @@ import org.apache.zookeeper.data.Stat;
  * This client class abstracts the use of our zookeeper sync code.
  * 
  */
-public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
+public class ZooKeeperSyncClientImpl extends ZKSyncClient implements
+    PeerSyncClient {
 
   /*
    * TODO maybe extract an abstract class and let the subclasses implement
@@ -81,6 +78,8 @@ public class ZooKeeperSyncClientImpl imp
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
 
+    initialize(this.zk, bspRoot);
+
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
     LOG.info("Start connecting to Zookeeper! At " + peerAddress);
     numBSPTasks = conf.getInt("bsp.peers.num", 1);
@@ -93,14 +92,14 @@ public class ZooKeeperSyncClientImpl imp
 
     try {
       synchronized (zk) {
-        createZnode(bspRoot);
-        final String pathToJobIdZnode = bspRoot + "/"
-            + taskId.getJobID().toString();
-        createZnode(pathToJobIdZnode);
-        final String pathToSuperstepZnode = pathToJobIdZnode + "/" + superstep;
-        createZnode(pathToSuperstepZnode);
+
+        final String pathToSuperstepZnode = 
+            constructKey(taskId.getJobID(), "sync", ""+superstep);
+        
+        writeNode(pathToSuperstepZnode, null, true, null);
         BarrierWatcher barrierWatcher = new BarrierWatcher();
-        // this is really needed to register the barrier watcher, don't remove this line!
+        // this is really needed to register the barrier watcher, don't remove
+        // this line!
         zk.exists(pathToSuperstepZnode + "/ready", barrierWatcher);
         zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE,
             CreateMode.EPHEMERAL);
@@ -131,7 +130,7 @@ public class ZooKeeperSyncClientImpl imp
         } else {
           LOG.debug("---> at superstep: " + superstep
               + " task that is creating /ready znode:" + taskId.toString());
-          createEphemeralZnode(pathToSuperstepZnode + "/ready");
+          writeNode(pathToSuperstepZnode + "/ready", null, false, null);
         }
       }
     } catch (Exception e) {
@@ -143,8 +142,10 @@ public class ZooKeeperSyncClientImpl imp
   public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
       final long superstep) throws SyncException {
     try {
-      final String pathToSuperstepZnode = bspRoot + "/"
-          + taskId.getJobID().toString() + "/" + superstep;
+//      final String pathToSuperstepZnode = bspRoot + "/"
+//          + taskId.getJobID().toString() + "/" + superstep;
+      final String pathToSuperstepZnode = 
+          constructKey(taskId.getJobID(), "sync", ""+superstep);
       while (true) {
         List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
         LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
@@ -236,8 +237,9 @@ public class ZooKeeperSyncClientImpl imp
   public void register(BSPJobID jobId, TaskAttemptID taskId,
       String hostAddress, long port) {
     try {
-      if (zk.exists("/" + jobId.toString(), false) == null) {
-        zk.create("/" + jobId.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE,
+      String jobRegisterKey = constructKey(jobId, "peers");
+      if (zk.exists(jobRegisterKey, false) == null) {
+        zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
             CreateMode.PERSISTENT);
       }
     } catch (KeeperException e) {
@@ -245,7 +247,7 @@ public class ZooKeeperSyncClientImpl imp
     } catch (InterruptedException e) {
       LOG.error(e);
     }
-    registerTask(zk, jobId, hostAddress, port, taskId);
+    registerTask(jobId, hostAddress, port, taskId);
   }
 
   /**
@@ -259,54 +261,14 @@ public class ZooKeeperSyncClientImpl imp
    * @param port
    * @param taskId
    */
-  public static void registerTask(ZooKeeper zk, BSPJobID jobId,
-      String hostAddress, long port, TaskAttemptID taskId) {
-
-    byte[] taskIdBytes = serializeTaskId(taskId);
+  public void registerTask(BSPJobID jobId, String hostAddress, long port,
+      TaskAttemptID taskId) {
 
-    try {
-      zk.create("/" + jobId.toString() + "/" + hostAddress + ":" + port,
-          taskIdBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-    } catch (KeeperException e) {
-      LOG.error(e);
-    } catch (InterruptedException e) {
-      LOG.error(e);
-    }
-  }
-
-  private static byte[] serializeTaskId(TaskAttemptID taskId) {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bos);
-    try {
-      taskId.write(out);
-    } catch (IOException e) {
-      LOG.error(e);
-    } finally {
-      try {
-        out.close();
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-    return bos.toByteArray();
-  }
+    // byte[] taskIdBytes = serializeTaskId(taskId);
+    String taskRegisterKey = constructKey(jobId, "peers", hostAddress + ":"
+        + port);
+    writeNode(taskRegisterKey, taskId, false, null);
 
-  public static TaskAttemptID deserializeTaskId(byte[] arr) {
-    ByteArrayInputStream bis = new ByteArrayInputStream(arr);
-    DataInputStream in = new DataInputStream(bis);
-    TaskAttemptID id = new TaskAttemptID();
-    try {
-      id.readFields(in);
-    } catch (IOException e) {
-      LOG.error(e);
-    } finally {
-      try {
-        in.close();
-      } catch (IOException e) {
-        LOG.error(e);
-      }
-    }
-    return id;
   }
 
   @Override
@@ -314,16 +276,22 @@ public class ZooKeeperSyncClientImpl imp
     if (allPeers == null) {
       TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
       try {
-        allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
-            .toArray(new String[0]);
+        allPeers = zk.getChildren(constructKey(taskId.getJobID(), "peers"),
+            this).toArray(new String[0]);
 
         for (String s : allPeers) {
-          byte[] data = zk.getData(
-              "/" + taskId.getJobID().toString() + "/" + s, this, null);
-          TaskAttemptID thatTask = deserializeTaskId(data);
-          LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
-              + thatTask.getTaskID().getId() + " : " + s);
-          sortedMap.put(thatTask.getTaskID().getId(), s);
+          byte[] data = zk.getData(constructKey(taskId.getJobID(), "peers", s),
+              this, null);
+          TaskAttemptID thatTask = new TaskAttemptID(); 
+          boolean result = getValueFromBytes(data, thatTask);
+
+          if(result){
+            LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+                + thatTask.getTaskID().getId() + " : " + s);
+            sortedMap.put(thatTask.getTaskID().getId(), s);
+          }
+
+
         }
 
       } catch (Exception e) {
@@ -344,8 +312,13 @@ public class ZooKeeperSyncClientImpl imp
   }
 
   @Override
-  public void close() throws InterruptedException {
-    zk.close();
+  public void close() throws IOException {
+    try{
+      zk.close();
+    }
+    catch(InterruptedException e){
+      throw new IOException(e);
+    }
   }
 
   @Override
@@ -379,36 +352,6 @@ public class ZooKeeperSyncClientImpl imp
     return peerAddress.getHostName() + ":" + peerAddress.getPort();
   }
 
-  private String getNodeName(TaskAttemptID taskId, long superstep) {
-    return bspRoot + "/" + taskId.getJobID().toString() + "/" + superstep + "/"
-        + taskId.toString();
-  }
-
-  private void createZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.PERSISTENT);
-  }
-
-  private void createEphemeralZnode(final String path) throws KeeperException,
-      InterruptedException {
-    createZnode(path, CreateMode.EPHEMERAL);
-  }
-
-  private void createZnode(final String path, final CreateMode mode)
-      throws KeeperException, InterruptedException {
-    synchronized (zk) {
-      Stat s = zk.exists(path, false);
-      if (null == s) {
-        try {
-          zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
-        } catch (KeeperException.NodeExistsException nee) {
-          LOG.debug("Ignore because znode may be already created at " + path,
-              nee);
-        }
-      }
-    }
-  }
-
   /*
    * INNER CLASSES
    */

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Sun Aug  5 13:13:26 2012
@@ -316,7 +316,7 @@ public class TestBSPTaskFaults extends T
       HamaConfiguration hamaConf = new HamaConfiguration();
       hamaConf.setInt(Constants.GROOM_PING_PERIOD, 200);
       hamaConf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
-      hamaConf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+      hamaConf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
           LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
       hamaConf.setInt("bsp.master.port", 610002);
@@ -421,7 +421,7 @@ public class TestBSPTaskFaults extends T
 
     conf.setInt(Constants.GROOM_PING_PERIOD, 200);
     conf.setClass("bsp.work.class", FaulTestBSP.class, BSP.class);
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
+    conf.setClass(SyncServiceFactory.SYNC_PEER_CLASS,
         LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 
     int testNumber = incrementTestNumber();

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Sun Aug  5 13:13:26 2012
@@ -17,7 +17,18 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
@@ -25,19 +36,30 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.TestBSPTaskFaults.MinimalGroomServer;
-import org.apache.hama.bsp.message.type.ByteMessage;
-import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
+import org.apache.hama.bsp.ft.FaultTolerantPeerService;
+import org.apache.hama.bsp.message.MessageEventListener;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.sync.BSPPeerSyncClient;
+import org.apache.hama.bsp.sync.PeerSyncClient;
+import org.apache.hama.bsp.sync.SyncEvent;
+import org.apache.hama.bsp.sync.SyncEventListener;
+import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
-import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.BSPNetUtils;
+import org.apache.hama.util.KeyValuePair;
 
 public class TestCheckpoint extends TestCase {
 
@@ -45,130 +67,578 @@ public class TestCheckpoint extends Test
 
   static final String checkpointedDir = "checkpoint/job_201110302255_0001/0/";
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static class TestMessageManager implements MessageManager<Text> {
+
+    List<Text> messageQueue = new ArrayList<Text>();
+    BSPMessageBundle<Text> loopbackBundle = new BSPMessageBundle<Text>();
+    Iterator<Text> iter = null;
+    MessageEventListener<Text> listener;
+
+    @Override
+    public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
+        Configuration conf, InetSocketAddress peerAddress) {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void close() {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public Text getCurrentMessage() throws IOException {
+      if (iter == null)
+        iter = this.messageQueue.iterator();
+      if (iter.hasNext())
+        return iter.next();
+      return null;
+    }
+
+    @Override
+    public void send(String peerName, Text msg) throws IOException {
+    }
+
+    @Override
+    public void finishSendPhase() throws IOException {
+    }
+
+    @Override
+    public Iterator<Entry<InetSocketAddress, MessageQueue<Text>>> getMessageIterator() {
+      return null;
+    }
+
+    @Override
+    public void transfer(InetSocketAddress addr, BSPMessageBundle<Text> bundle)
+        throws IOException {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void clearOutgoingQueues() {
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return this.messageQueue.size();
+    }
+
+    public BSPMessageBundle<Text> getLoopbackBundle() {
+      return this.loopbackBundle;
+    }
+
+    public void addMessage(Text message) throws IOException {
+      this.messageQueue.add(message);
+      listener.onMessageReceived(message);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+      this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
+    }
+
+    @Override
+    public void loopBackMessage(Writable message) {
+    }
+
+    @Override
+    public void registerListener(MessageEventListener<Text> listener)
+        throws IOException {
+      this.listener = listener;
+    }
+
+  }
+
+  public static class TestBSPPeer implements
+      BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, Text> {
+
+    Configuration conf;
+    long superstepCount;
+    FaultTolerantPeerService<Text> fService;
+
+    public TestBSPPeer(BSPJob job, Configuration conf, TaskAttemptID taskId,
+        Counters counters, long superstep, BSPPeerSyncClient syncClient,
+        MessageManager<Text> messenger, TaskStatus.State state) {
+      this.conf = conf;
+      if (superstep > 0)
+        superstepCount = superstep;
+      else
+        superstepCount = 0L;
+
+      try {
+        fService = (new AsyncRcvdMsgCheckpointImpl<Text>()).constructPeerFaultTolerance(
+            job, (BSPPeer<?, ?, ?, ?, Text>) this,
+            (BSPPeerSyncClient) syncClient, null, taskId, superstep, conf,
+            messenger);
+        this.fService.onPeerInitialized(state);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void send(String peerName, Text msg) throws IOException {
+    }
+
+    @Override
+    public Text getCurrentMessage() throws IOException {
+      return new Text("data");
+    }
+
+    @Override
+    public int getNumCurrentMessages() {
+      return 1;
+    }
+
+    @Override
+    public void sync() throws IOException, SyncException, InterruptedException {
+      ++superstepCount;
+      try {
+        this.fService.afterBarrier();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+      LOG.info("After barrier " + superstepCount);
+    }
+
+    @Override
+    public long getSuperstepCount() {
+      return superstepCount;
+    }
+
+    @Override
+    public String getPeerName() {
+      return null;
+    }
+
+    @Override
+    public String getPeerName(int index) {
+      return null;
+    }
+
+    @Override
+    public int getPeerIndex() {
+      return 1;
+    }
+
+    @Override
+    public String[] getAllPeerNames() {
+      return null;
+    }
+
+    @Override
+    public int getNumPeers() {
+      return 0;
+    }
+
+    @Override
+    public void clear() {
+
+    }
+
+    @Override
+    public void write(NullWritable key, NullWritable value) throws IOException {
+
+    }
+
+    @Override
+    public boolean readNext(NullWritable key, NullWritable value)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public KeyValuePair<NullWritable, NullWritable> readNext()
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public void reopenInput() throws IOException {
+
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+      return null;
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+      return null;
+    }
+
+    @Override
+    public void incrementCounter(Enum<?> key, long amount) {
+
+    }
+
+    @Override
+    public void incrementCounter(String group, String counter, long amount) {
+
+    }
+
+  }
+
+  public static class TempSyncClient extends BSPPeerSyncClient {
+
+    Map<String, Writable> valueMap = new HashMap<String, Writable>();
+
+    @Override
+    public String constructKey(BSPJobID jobId, String... args) {
+      StringBuffer buffer = new StringBuffer(100);
+      buffer.append(jobId.toString()).append("/");
+      for (String arg : args) {
+        buffer.append(arg).append("/");
+      }
+      return buffer.toString();
+    }
+
+    @Override
+    public boolean storeInformation(String key, Writable value,
+        boolean permanent, SyncEventListener listener) {
+      ArrayWritable writables = (ArrayWritable) value;
+      long step = ((LongWritable) writables.get()[0]).get();
+      long count = ((LongWritable) writables.get()[1]).get();
+
+      LOG.info("SyncClient Storing value step = " + step + " count = " + count
+          + " for key " + key);
+      valueMap.put(key, value);
+      return true;
+    }
+
+    @Override
+    public boolean getInformation(String key,
+        Writable valueHolder) {
+      LOG.info("Getting value for key " + key);
+      if(!valueMap.containsKey(key)){
+        return false;
+      }
+      Writable value =  valueMap.get(key);
+      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+      DataOutputStream outputStream = new DataOutputStream(byteStream);
+      byte[] data = null;
+      try {
+        value.write(outputStream);
+        outputStream.flush();
+        data = byteStream.toByteArray();
+        ByteArrayInputStream istream = new ByteArrayInputStream(data);
+        DataInputStream diStream = new DataInputStream(istream);
+        valueHolder.readFields(diStream);
+        return true;
+      } catch (IOException e) {
+        LOG.error("Error writing data to write buffer.", e);
+      } finally {
+        try {
+          byteStream.close();
+          outputStream.close();
+        } catch (IOException e) {
+          LOG.error("Error closing byte stream.", e);
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public boolean addKey(String key, boolean permanent,
+        SyncEventListener listener) {
+      valueMap.put(key, NullWritable.get());
+      return true;
+    }
+
+    @Override
+    public boolean hasKey(String key) {
+      return valueMap.containsKey(key);
+    }
+
+    @Override
+    public String[] getChildKeySet(String key, SyncEventListener listener) {
+      List<String> list = new ArrayList<String>();
+      Iterator<String> keyIter = valueMap.keySet().iterator();
+      while (keyIter.hasNext()) {
+        String keyVal = keyIter.next();
+        if (keyVal.startsWith(key + "/")) {
+          list.add(keyVal);
+        }
+      }
+      String[] arr = new String[list.size()];
+      list.toArray(arr);
+      return arr;
+    }
+
+    @Override
+    public boolean registerListener(String key, SyncEvent event,
+        SyncEventListener listener) {
+      return false;
+    }
+
+    @Override
+    public boolean remove(String key, SyncEventListener listener) {
+      valueMap.remove(key);
+      return false;
+    }
+
+    @Override
+    public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+        throws Exception {
+    }
+
+    @Override
+    public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws SyncException {
+      LOG.info("Enter barrier called - " + superstep);
+    }
+
+    @Override
+    public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws SyncException {
+      LOG.info("Exit barrier called - " + superstep);
+    }
+
+    @Override
+    public void register(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+    }
+
+    @Override
+    public String[] getAllPeerNames(TaskAttemptID taskId) {
+      return null;
+    }
+
+    @Override
+    public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+    }
+
+    @Override
+    public void stopServer() {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+  }
+
+  private void checkSuperstepMsgCount(PeerSyncClient syncClient,
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspTask, BSPJob job, long step, long count) {
+    
+    ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
+    
+    boolean result = syncClient.getInformation(
+        syncClient.constructKey(job.getJobID(), "checkpoint",
+            "" + bspTask.getPeerIndex()), writableVal);
+    
+    assertTrue(result);
+
+    LongWritable superstepNo = (LongWritable) writableVal.get()[0]; 
+    LongWritable msgCount = (LongWritable) writableVal.get()[1];
+
+    assertEquals(step, superstepNo.get());
+    assertEquals(count, msgCount.get());
+  }
+
+  public void testCheckpointInterval() throws Exception {
+    Configuration config = new Configuration();
+    System.setProperty("user.dir", "/tmp");
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    config.setInt(Constants.CHECKPOINT_INTERVAL, 2);
+    config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
+
+    FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+    @SuppressWarnings("rawtypes")
+    BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+        (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
+
+    assertNotNull("BSPPeerImpl should not be null.", bspTask);
+
+    LOG.info("Created bsp peer and other parameters");
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
+
+    boolean result = syncClient.getInformation(
+            syncClient.constructKey(job.getJobID(), "checkpoint",
+                "" + bspTask.getPeerIndex()), new ArrayWritable(LongWritable.class));
+
+    assertFalse(result);
+
+    bspTask.sync();
+    // Superstep 1
+  
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+    Text txtMessage = new Text("data");
+    messenger.addMessage(txtMessage);
+
+    bspTask.sync();
+    // Superstep 2
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+    messenger.addMessage(txtMessage);
+
+    bspTask.sync();
+    // Superstep 3
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
+
+    bspTask.sync();
+    // Superstep 4
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 3L, 1L);
+
+    messenger.addMessage(txtMessage);
+    messenger.addMessage(txtMessage);
+
+    bspTask.sync();
+    // Superstep 5
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+    bspTask.sync();
+    // Superstep 6
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 5L, 2L);
+
+    dfs.delete(new Path("checkpoint"), true);
+  }
+
+  @SuppressWarnings("rawtypes")
   public void testCheckpoint() throws Exception {
     Configuration config = new Configuration();
-    config.set(SyncServiceFactory.SYNC_CLIENT_CLASS,
-        LocalBSPRunner.LocalSyncClient.class.getName());
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.setBoolean(Constants.FAULT_TOLERANCE_FLAG, true);
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
+
+    config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    config.setInt(Constants.PEER_PORT, port);
+
     config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
+
     FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
+
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+    BSPPeer bspTask = new TestBSPPeer(job, config, taskId, new Counters(), -1L,
+        (BSPPeerSyncClient) syncClient, messenger, TaskStatus.State.RUNNING);
 
-    BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
-    bspTask.setCurrentTaskStatus(new TaskStatus(new BSPJobID(),
-        new TaskAttemptID(), 1.0f, TaskStatus.State.RUNNING, "running",
-        "127.0.0.1", TaskStatus.Phase.STARTING, new Counters()));
     assertNotNull("BSPPeerImpl should not be null.", bspTask);
-    if (dfs.mkdirs(new Path("checkpoint"))) {
-      if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001"))) {
-        if (dfs.mkdirs(new Path("checkpoint/job_201110302255_0001/0")))
-          ;
-      }
-    }
-    assertTrue("Make sure directory is created.",
-        dfs.exists(new Path(checkpointedDir)));
-    byte[] tmpData = "data".getBytes();
-    BSPMessageBundle bundle = new BSPMessageBundle();
-    bundle.addMessage(new ByteMessage("abc".getBytes(), tmpData));
-    assertNotNull("Message bundle can not be null.", bundle);
-    assertNotNull("Configuration should not be null.", config);
-    bspTask.checkpoint(checkpointedDir + "/attempt_201110302255_0001_000000_0",
-        bundle);
-    FSDataInputStream in = dfs.open(new Path(checkpointedDir
-        + "/attempt_201110302255_0001_000000_0"));
-    BSPMessageBundle bundleRead = new BSPMessageBundle();
-    bundleRead.readFields(in);
-    in.close();
-    ByteMessage byteMsg = (ByteMessage) (bundleRead.getMessages()).get(0);
-    String content = new String(byteMsg.getData());
-    LOG.info("Saved checkpointed content is " + content);
-    assertTrue("Message content should be the same.", "data".equals(content));
+
+    LOG.info("Created bsp peer and other parameters");
+
+    @SuppressWarnings("unused")
+    FaultTolerantPeerService<Text> service = null;
+
+    bspTask.sync();
+    LOG.info("Completed first sync.");
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 1L, 0L);
+
+    Text txtMessage = new Text("data");
+    messenger.addMessage(txtMessage);
+    
+    bspTask.sync();
+
+    LOG.info("Completed second sync.");
+
+    checkSuperstepMsgCount(syncClient, bspTask, job, 2L, 1L);
+
+    // Checking the messages for superstep 2 and peer id 1
+    String expectedPath = "checkpoint/job_checkpttest_0001/2/1";
+    FSDataInputStream in = dfs.open(new Path(expectedPath));
+
+    String className = in.readUTF();
+    Text message = (Text) ReflectionUtils.newInstance(Class.forName(className),
+        config);
+    message.readFields(in);
+
+    assertEquals("data", message.toString());
+
     dfs.delete(new Path("checkpoint"), true);
   }
 
-  public void testCheckpointInterval() throws Exception {
+  public void testPeerRecovery() throws Exception {
+    Configuration config = new Configuration();
+    config.set(SyncServiceFactory.SYNC_PEER_CLASS,
+        TempSyncClient.class.getName());
+    config.set(Constants.FAULT_TOLERANCE_CLASS,
+        AsyncRcvdMsgCheckpointImpl.class.getName());
+    config.setBoolean(Constants.CHECKPOINT_ENABLED, true);
+    int port = BSPNetUtils.getFreePort(12502);
+    LOG.info("Got port = " + port);
 
-    Configuration conf = new Configuration();
-    conf.set("bsp.output.dir", "/tmp/hama-test_out");
-    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
-        LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
-
-    conf.setBoolean(Constants.CHECKPOINT_ENABLED, false);
-
-    int port = BSPNetUtils.getFreePort(5000);
-    InetSocketAddress inetAddress = new InetSocketAddress(port);
-    MinimalGroomServer groom = new MinimalGroomServer(conf);
-    Server workerServer = RPC.getServer(groom, inetAddress.getHostName(),
-        inetAddress.getPort(), conf);
-    workerServer.start();
-
-    LOG.info("Started RPC server");
-    conf.setInt("bsp.groom.rpc.port", inetAddress.getPort());
-    conf.setInt("bsp.peers.num", 1);
-
-    BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
-        BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID, inetAddress,
-        conf);
-    LOG.info("Started the proxy connections");
-
-    TaskAttemptID tid = new TaskAttemptID(new TaskID(new BSPJobID(
-        "job_201110102255", 1), 1), 1);
-
-    try {
-      BSPJob job = new BSPJob(new HamaConfiguration(conf));
-      job.setOutputPath(TestBSPMasterGroomServer.OUTPUT_PATH);
-      job.setOutputFormat(TextOutputFormat.class);
-      final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
-          BSPPeerProtocol.class, HamaRPCProtocolVersion.versionID,
-          new InetSocketAddress("127.0.0.1", port), conf);
+    config.set(Constants.PEER_HOST, Constants.DEFAULT_PEER_HOST);
+    config.setInt(Constants.PEER_PORT, port);
 
-      BSPTask task = new BSPTask();
-      task.setConf(job);
+    config.set("bsp.output.dir", "/tmp/hama-test_out");
+    config.set("bsp.local.dir", "/tmp/hama-test");
 
-      @SuppressWarnings("rawtypes")
-      BSPPeerImpl<?, ?, ?, ?, ?> bspPeer = new BSPPeerImpl(job, conf, tid,
-          proto, 0, null, null, new Counters());
+    FileSystem dfs = FileSystem.get(config);
+    BSPJob job = new BSPJob(new BSPJobID("checkpttest", 1), "/tmp");
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(job.getJobID(), 1), 1);
 
-      bspPeer.setCurrentTaskStatus(new TaskStatus(new BSPJobID(), tid, 1.0f,
-          TaskStatus.State.RUNNING, "running", "127.0.0.1",
-          TaskStatus.Phase.STARTING, new Counters()));
-
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-
-      conf.setBoolean(Constants.CHECKPOINT_ENABLED, true);
-      conf.setInt(Constants.CHECKPOINT_INTERVAL, 3);
-
-      bspPeer.sync();
-
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), true);
-
-      job.setCheckPointInterval(5);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-      bspPeer.sync();
-      LOG.info("Is Ready = " + bspPeer.isReadyToCheckpoint() + " at step "
-          + bspPeer.getSuperstepCount());
-      assertEquals(bspPeer.isReadyToCheckpoint(), false);
-
-    } catch (Exception e) {
-      LOG.error("Error testing BSPPeer.", e);
-    } finally {
-      umbilical.close();
-      Thread.sleep(2000);
-      workerServer.stop();
-      Thread.sleep(2000);
+    TestMessageManager messenger = new TestMessageManager();
+    PeerSyncClient syncClient = (TempSyncClient) SyncServiceFactory
+        .getPeerSyncClient(config);
+
+    Text txtMessage = new Text("data");
+    String writeKey = "job_checkpttest_0001/checkpoint/1/";
+
+    Writable[] writableArr = new Writable[2];
+    writableArr[0] = new LongWritable(3L);
+    writableArr[1] = new LongWritable(5L);
+    ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+    arrWritable.set(writableArr);
+    syncClient.storeInformation(writeKey, arrWritable, true, null);
+
+    String writePath = "checkpoint/job_checkpttest_0001/3/1";
+    FSDataOutputStream out = dfs.create(new Path(writePath));
+    for (int i = 0; i < 5; ++i) {
+      out.writeUTF(txtMessage.getClass().getCanonicalName());
+      txtMessage.write(out);
     }
+    out.close();
 
+    @SuppressWarnings("unused")
+    BSPPeer<?, ?, ?, ?, Text> bspTask = new TestBSPPeer(job, config, taskId,
+        new Counters(), 3L, (BSPPeerSyncClient) syncClient, messenger,
+        TaskStatus.State.RECOVERING);
+
+    BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
+    assertEquals(5, bundleRead.getMessages().size());
+    String recoveredMsg = bundleRead.getMessages().get(0).toString();
+    assertEquals(recoveredMsg, "data");
+    dfs.delete(new Path("checkpoint"), true);
   }
+
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestZooKeeper.java Sun Aug  5 13:13:26 2012
@@ -20,20 +20,22 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
-import java.util.ArrayList;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.bsp.sync.ZKSyncBSPMasterClient;
+import org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl;
 import org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl;
 import org.apache.hama.util.BSPNetUtils;
-import org.apache.hama.zookeeper.QuorumPeer;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.mortbay.log.Log;
 
 public class TestZooKeeper extends TestCase {
 
@@ -41,6 +43,7 @@ public class TestZooKeeper extends TestC
 
   public TestZooKeeper() {
     configuration = new HamaConfiguration();
+    System.setProperty("user.dir", "/tmp");
     configuration.set("bsp.master.address", "localhost");
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
@@ -57,6 +60,7 @@ public class TestZooKeeper extends TestC
   public void testClearZKNodes() throws IOException, KeeperException,
       InterruptedException {
     final ZooKeeperSyncServerImpl server = new ZooKeeperSyncServerImpl();
+    boolean done = false;
     try {
       server.init(configuration);
       new Thread(new Runnable() {
@@ -71,55 +75,113 @@ public class TestZooKeeper extends TestC
         }
       }).start();
 
-      int timeout = configuration.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT,
-          6000);
-      String connectStr = QuorumPeer.getZKQuorumServersString(configuration);
-      String bspRoot = "/";
-      // Establishing a zk session.
-      ZooKeeper zk = new ZooKeeper(connectStr, timeout, null);
-
-      // Creating dummy bspRoot if it doesn't already exist.
-      Stat s = zk.exists(bspRoot, false);
-      if (s == null) {
-        zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
-            CreateMode.PERSISTENT);
-      }
-
-      // Creating dummy child nodes at depth 1.
-      String node1 = bspRoot + "task1";
-      String node2 = bspRoot + "task2";
-      zk.create(node1, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-      zk.create(node2, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-      // Creating dummy child node at depth 2.
-      String node11 = node1 + "superstep1";
-      zk.create(node11, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
-      ArrayList<String> list = (ArrayList<String>) zk.getChildren(bspRoot,
-          false);
-      assertEquals(2, list.size());
-      System.out.println(list.size());
-
-      // clear it
-      BSPMaster.clearZKNodes(zk, "/");
-
-      list = (ArrayList<String>) zk.getChildren(bspRoot, false);
-      System.out.println(list.size());
-      assertEquals(0, list.size());
-
-      try {
-        zk.getData(node11, false, null);
-        fail();
-      } catch (KeeperException.NoNodeException e) {
-        System.out.println("Node has been removed correctly!");
-      } finally {
-        zk.close();
-      }
+      Thread.sleep(1000);
+
+      String bspRoot = "/bsp";
+
+      ZooKeeperSyncClientImpl peerClient = (ZooKeeperSyncClientImpl) SyncServiceFactory
+          .getPeerSyncClient(configuration);
+
+      ZKSyncBSPMasterClient masterClient = (ZKSyncBSPMasterClient) SyncServiceFactory
+          .getMasterSyncClient(configuration);
+
+      masterClient.init(configuration);
+
+      Thread.sleep(100);
+
+      Log.info("Created master and client sync clients");
+
+      assertTrue(masterClient.hasKey(bspRoot));
+
+      Log.info("BSP root exists");
+
+      BSPJobID jobID = new BSPJobID("test1", 1);
+      masterClient.registerJob(jobID.toString());
+      TaskID taskId1 = new TaskID(jobID, 1);
+      TaskID taskId2 = new TaskID(jobID, 2);
+
+      TaskAttemptID task1 = new TaskAttemptID(taskId1, 1);
+      TaskAttemptID task2 = new TaskAttemptID(taskId2, 1);
+
+      int zkPort = BSPNetUtils.getFreePort(21815);
+      configuration.setInt(Constants.PEER_PORT, zkPort);
+      peerClient.init(configuration, jobID, task1);
+
+      peerClient.registerTask(jobID, "hamanode1", 5000L, task1);
+      peerClient.registerTask(jobID, "hamanode2", 5000L, task2);
+
+      peerClient.storeInformation(
+          peerClient.constructKey(jobID, "info", "level2"), new IntWritable(5),
+          true, null);
+
+      String[] names = peerClient.getAllPeerNames(task1);
+
+      Log.info("Found child count = " + names.length);
+
+      assertEquals(2, names.length);
+
+      Log.info("Passed the child count test");
+
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "1"),
+          true, null);
+      masterClient.addKey(masterClient.constructKey(jobID, "peer", "2"),
+          true, null);
+
+      String[] peerChild = masterClient.getChildKeySet(
+          masterClient.constructKey(jobID, "peer"), null);
+      Log.info("Found child count = " + peerChild.length);
+
+      assertEquals(2, peerChild.length);
+
+      Log.info(" Peer name " + peerChild[0]);
+      Log.info(" Peer name " + peerChild[1]);
+
+      Log.info("Passed the child key set test");
+
+      masterClient.deregisterJob(jobID.toString());
+      Log.info(masterClient.constructKey(jobID));
+
+      Thread.sleep(200);
+
+      assertEquals(false, masterClient.hasKey(masterClient.constructKey(jobID)));
+
+      Log.info("Passed the key presence test");
+
+      boolean result = masterClient
+          .getInformation(masterClient.constructKey(jobID, "info", "level3"),
+              new IntWritable());
+
+      assertEquals(false, result);
+      
+      Writable[] writableArr = new Writable[2];
+      writableArr[0] = new LongWritable(3L);
+      writableArr[1] = new LongWritable(5L);
+      ArrayWritable arrWritable = new ArrayWritable(LongWritable.class);
+      arrWritable.set(writableArr);
+      masterClient.storeInformation(
+          masterClient.constructKey(jobID, "info", "level3"), 
+          arrWritable, true, null);
+      
+      ArrayWritable valueHolder = new ArrayWritable(LongWritable.class);
+      
+      boolean getResult = masterClient.getInformation(
+          masterClient.constructKey(jobID, "info", "level3"), valueHolder);
+      
+      assertTrue(getResult);
+      
+      assertEquals(arrWritable.get()[0], valueHolder.get()[0]);
+      assertEquals(arrWritable.get()[1], valueHolder.get()[1]);
+      
+      Log.info("Passed array writable test");
+      done = true;
+
     } catch (Exception e) {
       e.printStackTrace();
+
     } finally {
       server.stopServer();
     }
+    assertEquals(true, done);
   }
 
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java?rev=1369575&r1=1369574&r2=1369575&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/sync/TestSyncServiceFactory.java Sun Aug  5 13:13:26 2012
@@ -17,20 +17,64 @@
  */
 package org.apache.hama.bsp.sync;
 
+import java.util.concurrent.Executors;
+
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.util.BSPNetUtils;
 
 public class TestSyncServiceFactory extends TestCase {
 
+  public static final Log LOG = LogFactory.getLog(TestCase.class);
+
+  public static class ListenerTest extends ZKSyncEventListener {
+
+    private Text value;
+
+    public ListenerTest() {
+      value = new Text("init");
+    }
+
+    public String getValue() {
+      return value.toString();
+    }
+
+    @Override
+    public void onDelete() {
+
+    }
+
+    @Override
+    public void onChange() {
+      LOG.info("ZK value changed event triggered.");
+      value.set("Changed");
+
+    }
+
+    @Override
+    public void onChildKeySetChange() {
+
+    }
+
+  }
+
   public void testClientInstantiation() throws Exception {
 
     Configuration conf = new Configuration();
     // given null, should return zookeeper
-    SyncClient syncClient = SyncServiceFactory.getSyncClient(conf);
+    PeerSyncClient syncClient = SyncServiceFactory.getPeerSyncClient(conf);
     assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
   }
-  
+
   public void testServerInstantiation() throws Exception {
 
     Configuration conf = new Configuration();
@@ -39,4 +83,104 @@ public class TestSyncServiceFactory exte
     assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
   }
 
+  private static class ZKServerThread implements Runnable {
+
+    SyncServer server;
+
+    ZKServerThread(SyncServer s) {
+      server = s;
+    }
+
+    @Override
+    public void run() {
+      try {
+        server.start();
+      } catch (Exception e) {
+        LOG.error("Error running server.", e);
+      }
+    }
+
+  }
+
+  public void testZKSyncStore() throws Exception {
+    Configuration conf = new Configuration();
+    int zkPort = BSPNetUtils.getFreePort(21811);
+    conf.set("bsp.local.dir", "/tmp/hama-test");
+    conf.set("bsp.output.dir", "/tmp/hama-test_out");
+    conf.setInt(Constants.PEER_PORT, zkPort);
+    conf.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkPort);
+    conf.set(Constants.ZOOKEEPER_SESSION_TIMEOUT, "12000");
+    System.setProperty("user.dir", "/tmp");
+    // given null, should return zookeeper
+    final SyncServer syncServer = SyncServiceFactory.getSyncServer(conf);
+    syncServer.init(conf);
+    assertTrue(syncServer instanceof ZooKeeperSyncServerImpl);
+
+    ZKServerThread serverThread = new ZKServerThread(syncServer);
+    Executors.newFixedThreadPool(1).submit(serverThread);
+
+    Thread.sleep(1000);
+
+    final PeerSyncClient syncClient = (PeerSyncClient) SyncServiceFactory
+        .getPeerSyncClient(conf);
+    assertTrue(syncClient instanceof ZooKeeperSyncClientImpl);
+    BSPJobID jobId = new BSPJobID("abc", 1);
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId, 1), 1);
+    syncClient.init(conf, jobId, taskId);
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        try {
+          syncServer.stopServer();
+
+        } catch (Exception e) {
+          // too late to log!
+        }
+      }
+    });
+
+    IntWritable data = new IntWritable(5);
+    syncClient.storeInformation(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+        null);
+
+    ListenerTest listenerTest = new ListenerTest();
+
+    syncClient.registerListener(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+        ZKSyncEventFactory.getValueChangeEvent(), listenerTest);
+
+    IntWritable valueHolder = new IntWritable();
+    boolean result = syncClient
+        .getInformation(
+            syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+            valueHolder);
+    assertTrue(result);
+    int intVal = valueHolder.get();
+    assertTrue(intVal == data.get());
+
+    data.set(6);
+    syncClient.storeInformation(
+        syncClient.constructKey(jobId, String.valueOf(1L), "test"), data, true,
+        null);
+    valueHolder = new IntWritable();
+    result = syncClient
+        .getInformation(
+            syncClient.constructKey(jobId, String.valueOf(1L), "test"),
+            valueHolder);
+
+    assertTrue(result);
+    intVal = valueHolder.get();
+    assertTrue(intVal == data.get());
+
+    Thread.sleep(5000);
+
+    assertEquals(true, listenerTest.getValue().equals("Changed"));
+
+    syncServer.stopServer();
+
+  }
+
 }



Mime
View raw message