incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1162501 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/checkpoint/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/checkpoint/
Date Sun, 28 Aug 2011 11:42:17 GMT
Author: edwardyoon
Date: Sun Aug 28 11:42:16 2011
New Revision: 1162501

URL: http://svn.apache.org/viewvc?rev=1162501&view=rev
Log:
Add CheckPointer and saving messages for future fault-tolerant systems.

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/
    incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1162501&r1=1162500&r2=1162501&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sun Aug 28 11:42:16 2011
@@ -4,6 +4,8 @@ Release 0.4 - Unreleased
 
   NEW FEATURES
 
+   HAMA-398: Add CheckPointer and saving messages for future fault-tolerant systems (ChiaHung Lin via edwardyoon)
+
   BUG FIXES
 
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1162501&r1=1162500&r2=1162501&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Sun Aug 28 11:42:16 2011
@@ -205,13 +205,7 @@ public class BSPMaster implements JobSub
       while (true) {
         try {
           Directive directive = this.buffer.take();
-          if (directive instanceof ReportGroomStatusDirective) {
-            ((DirectiveHandler) handlers.get(ReportGroomStatusDirective.class))
-                .handle(directive);
-          } else {
-            throw new RuntimeException("Directive is not supported."
-                + directive);
-          }
+          handlers.get(directive.getClass()).handle(directive);
         } catch (InterruptedException ie) {
           LOG.error("Unable to retrieve directive from the queue.", ie);
           Thread.currentThread().interrupt();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1162501&r1=1162500&r2=1162501&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Sun Aug 28 11:42:16 2011
@@ -17,8 +17,13 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -26,14 +31,21 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import static java.util.concurrent.TimeUnit.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.checkpoint.CheckpointRunner;
 import org.apache.hama.Constants;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.util.Bytes;
@@ -53,21 +65,26 @@ public class BSPPeer implements Watcher,
 
   public static final Log LOG = LogFactory.getLog(BSPPeer.class);
 
-  private Configuration conf;
+  private final Configuration conf;
   private BSPJob jobConf;
 
-  private Server server = null;
+  private volatile Server server = null;
   private ZooKeeper zk = null;
   private volatile Integer mutex = 0;
 
   private final String bspRoot;
   private final String quorumServers;
 
-  private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
-  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
-  private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
-  private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
+  private final Map<InetSocketAddress, BSPPeerInterface> peers = 
+    new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = 
+    new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueue = 
+    new ConcurrentLinkedQueue<BSPMessage>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = 
+    new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<String, InetSocketAddress> peerSocketCache = 
+    new ConcurrentHashMap<String, InetSocketAddress>();
 
   private SortedSet<String> allPeerNames = new TreeSet<String>();
   private InetSocketAddress peerAddress;
@@ -76,19 +93,127 @@ public class BSPPeer implements Watcher,
   private TaskAttemptID taskid;
   private BSPPeerProtocol umbilical;
 
+  private final BSPMessageSerializer messageSerializer;
+
+  public static final class BSPSerializableMessage implements Writable {
+    final AtomicReference<String> path = new AtomicReference<String>();
+    final AtomicReference<BSPMessageBundle> bundle = 
+      new AtomicReference<BSPMessageBundle>();
+
+    public BSPSerializableMessage(){}
+
+    public BSPSerializableMessage(final String path, final BSPMessageBundle bundle) {
+      if(null == path) 
+        throw new NullPointerException("No path provided for checkpointing.");
+      if(null == bundle) 
+        throw new NullPointerException("No data provided for checkpointing.");
+      this.path.set(path);
+      this.bundle.set(bundle);
+    }
+
+    public final String checkpointedPath() {
+      return this.path.get();
+    }
+    
+    public final BSPMessageBundle messageBundle(){
+      return this.bundle.get();
+    }
+
+    @Override 
+    public final void write(DataOutput out) throws IOException {
+      out.writeUTF(this.path.get());
+      this.bundle.get().write(out);
+    }
+
+    @Override 
+    public final void readFields(DataInput in) throws IOException {
+      this.path.set(in.readUTF());
+      BSPMessageBundle pack = new BSPMessageBundle(); 
+      pack.readFields(in);
+      this.bundle.set(pack);
+    }
+
+  }// serializable message
+
+  final class BSPMessageSerializer {
+    final Socket client;
+    final ScheduledExecutorService sched;
+
+    public BSPMessageSerializer(final int port) { 
+      Socket tmp = null;
+      int cnt = 0;
+      do {
+        tmp = init(port);
+        cnt ++;
+        try {   
+          Thread.sleep(1000); 
+        } catch(InterruptedException ie) { 
+          LOG.warn("Thread is interrupted.", ie); 
+          Thread.currentThread().interrupt();
+        }
+      } while(null == tmp && 10 > cnt);
+      this.client = tmp;
+      if(null == this.client)
+        throw new NullPointerException("Client socket is null.");
+      this.sched = Executors.newScheduledThreadPool(
+        conf.getInt("bsp.checkpoint.serializer_thread", 10));
+      LOG.info(BSPMessageSerializer.class.getName()+
+      " is ready to serialize message.");
+    }
+
+    private Socket init(final int port) {
+       Socket tmp = null;
+       try {
+         tmp = new Socket("localhost", port);
+       } catch(UnknownHostException uhe) {
+         LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+       } catch(IOException ioe) {
+         LOG.warn("Fail to create socket.", ioe);
+       }
+       return tmp;
+    }
+   
+    void serialize(final BSPSerializableMessage tmp) throws IOException { 
+      if(LOG.isDebugEnabled())
+        LOG.debug("Messages are saved to "+tmp.checkpointedPath());
+      final DataOutput out = new DataOutputStream(client.getOutputStream());
+      this.sched.schedule(new Callable() {
+        public Object call() throws Exception {
+          tmp.write(out);
+          return null;
+        }
+      }, 0, SECONDS);
+    }
+
+    public void close() {
+      try {
+        this.client.close();
+        this.sched.shutdown();
+      } catch(IOException io) {
+        LOG.error("Fail to close client socket.", io);
+      }
+    }
+
+  }// message serializer 
+
   /**
    * Protected default constructor for LocalBSPRunner.
    */
   protected BSPPeer() {
     bspRoot = null;
     quorumServers = null;
+    messageSerializer = null;
+    conf = null;
   }
 
   /**
-   * Constructor
+   * BSPPeer Constructor.
    * 
-   * @param umbilical
-   * @param taskid
+   * BSPPeer acts on behalf of clients performing bsp() tasks.
+   *  
+   * @param conf is the configuration file containing bsp peer host, port, etc.
+   * @param umbilical is the bsp protocol used to contact its parent process.
+   * @param taskid is the id that current process holds.
    */
   public BSPPeer(Configuration conf, TaskAttemptID taskid,
       BSPPeerProtocol umbilical) throws IOException {
@@ -103,33 +228,36 @@ public class BSPPeer implements Watcher,
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
     quorumServers = QuorumPeer.getZKQuorumServersString(conf);
-    LOG.debug("Quorum  " + quorumServers);
-
-    // TODO: may require to dynamic reflect the underlying
-    // network e.g. ip address, port.
+    if(LOG.isDebugEnabled()) LOG.debug("Quorum  " + quorumServers);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
-    reinitialize();
+    BSPMessageSerializer msgSerializer = null;
+    if(this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      msgSerializer = 
+        new BSPMessageSerializer(conf.getInt("bsp.checkpoint.port", 
+        Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
+    }
+    this.messageSerializer = msgSerializer;
   }
 
   public void reinitialize() {
     try {
-      LOG.debug("reinitialize(): " + getPeerName());
-      server = RPC.getServer(this, peerAddress.getHostName(),
+      if(LOG.isDebugEnabled()) LOG.debug("reinitialize(): " + getPeerName());
+      this.server = RPC.getServer(this, peerAddress.getHostName(),
           peerAddress.getPort(), conf);
       server.start();
       LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
           + peerAddress.getPort());
     } catch (IOException e) {
-      LOG.error("Exception during reinitialization!", e);
+      LOG.error("Fail to start RPC server!", e);
     }
-
+ 
     try {
-      zk = new ZooKeeper(quorumServers, conf.getInt(
+      this.zk = new ZooKeeper(quorumServers, conf.getInt(
           Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
-    } catch (IOException e) {
-      LOG.error("Exception during reinitialization!", e);
+    } catch(IOException e) {
+      LOG.error("Fail while reinitializing zookeeeper!", e);
     }
-
+  
     Stat s = null;
     if (zk != null) {
       try {
@@ -149,7 +277,6 @@ public class BSPPeer implements Watcher,
         }
       }
     }
-
   }
 
   @Override
@@ -187,6 +314,16 @@ public class BSPPeer implements Watcher,
     }
   }
 
+  private String checkpointedPath() {
+    String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
+    String ckptPath = 
+      backup + jobConf.getJobID().toString() + "/" + getSuperstepCount() + 
+      "/" + this.taskid.toString();
+    if(LOG.isDebugEnabled()) 
+      LOG.debug("Messages are to be saved to "+ckptPath);
+    return ckptPath;
+  }
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.BSPPeerInterface#sync()
@@ -195,8 +332,8 @@ public class BSPPeer implements Watcher,
   public void sync() throws IOException, KeeperException, InterruptedException {
     enterBarrier();
     long startTime = System.currentTimeMillis();
-    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
-        .entrySet().iterator();
+    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = 
+        this.outgoingQueues.entrySet().iterator();
 
     while (it.hasNext()) {
       Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
@@ -211,6 +348,13 @@ public class BSPPeer implements Watcher,
       for (BSPMessage message : messages) {
         bundle.addMessage(message);
       }
+      
+      // checkpointing 
+      if(null != this.messageSerializer) {
+        this.messageSerializer.serialize(new BSPSerializableMessage(
+          checkpointedPath(), bundle));
+      }
+
       peer.put(bundle);
     }
 
@@ -295,6 +439,7 @@ public class BSPPeer implements Watcher,
   @Override
   public void close() throws IOException {
     server.stop();
+    if(null != messageSerializer) this.messageSerializer.close();
   }
 
   @Override

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1162501&r1=1162500&r2=1162501&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Sun Aug 28 11:42:16 2011
@@ -52,6 +52,8 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hama.checkpoint.Checkpointer;
+import org.apache.hama.checkpoint.CheckpointRunner;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -73,7 +75,7 @@ public class GroomServer implements Runn
 
   private volatile static int REPORT_INTERVAL = 1 * 1000;
 
-  Configuration conf;
+  final Configuration conf;
 
   // Constants
   static enum State {
@@ -125,6 +127,8 @@ public class GroomServer implements Runn
   // private BlockingQueue<GroomServerAction> tasksToCleanup = new
   // LinkedBlockingQueue<GroomServerAction>();
 
+  private final CheckpointRunner checkpointRunner;
+
   private class DispatchTasksHandler implements DirectiveHandler {
 
     public void handle(Directive directive) throws DirectiveException {
@@ -167,7 +171,8 @@ public class GroomServer implements Runn
 
   private class Instructor extends Thread {
     final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
-    final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+    final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = 
+      new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
 
     public void bind(Class<? extends Directive> instruction,
         DirectiveHandler handler) {
@@ -187,13 +192,7 @@ public class GroomServer implements Runn
       while (true) {
         try {
           Directive directive = buffer.take();
-          if (directive instanceof DispatchTasksDirective) {
-            ((DirectiveHandler) handlers.get(DispatchTasksDirective.class))
-                .handle(directive);
-          } else {
-            throw new RuntimeException("Directive is not supported."
-                + directive);
-          }
+          handlers.get(directive.getClass()).handle(directive);
         } catch (InterruptedException ie) {
           LOG.error("Unable to retrieve directive from the queue.", ie);
           Thread.currentThread().interrupt();
@@ -207,10 +206,16 @@ public class GroomServer implements Runn
   public GroomServer(Configuration conf) throws IOException {
     LOG.info("groom start");
     this.conf = conf;
-
     bspMasterAddr = BSPMaster.getAddress(conf);
     // FileSystem local = FileSystem.getLocal(conf);
     // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
+
+    CheckpointRunner ckptRunner = null;
+    if(this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      ckptRunner = 
+        new CheckpointRunner(CheckpointRunner.buildCommands(this.conf));
+    }
+    this.checkpointRunner = ckptRunner;
   }
 
   public synchronized void initialize() throws IOException {
@@ -300,6 +305,10 @@ public class GroomServer implements Runn
     this.instructor.bind(DispatchTasksDirective.class,
         new DispatchTasksHandler());
     instructor.start();
+    if(this.conf.getBoolean("bsp.checkpoint.enabled", false) && 
+       null != this.checkpointRunner && !this.checkpointRunner.isAlive()) {
+      this.checkpointRunner.start(); 
+    }
     this.running = true;
     this.initialized = true;
   }
@@ -311,9 +320,7 @@ public class GroomServer implements Runn
 
   @Override
   public void dispatch(Directive directive) throws IOException {
-    if (!instructor.isAlive())
-      throw new IOException();
-
+    if (!instructor.isAlive()) throw new IOException();
     instructor.put(directive);
   }
 
@@ -656,7 +663,10 @@ public class GroomServer implements Runn
     cleanupStorage();
     this.workerServer.stop();
     RPC.stopProxy(masterClient);
-
+    if(this.conf.getBoolean("bsp.checkpoint.enabled", false) && 
+       null != this.checkpointRunner && this.checkpointRunner.isAlive()) {
+      this.checkpointRunner.stop();
+    }
     if (taskReportServer != null) {
       taskReportServer.stop();
       taskReportServer = null;
@@ -742,7 +752,7 @@ public class GroomServer implements Runn
      */
     public synchronized void killAndCleanup(boolean wasFailure)
         throws IOException {
-      runner.kill();
+      runner.killBsp();
     }
 
     /**
@@ -815,14 +825,40 @@ public class GroomServer implements Runn
     // TODO Later, peers list should be returned.
     return this.groomHostName + ":" + Constants.DEFAULT_PEER_PORT;
   }
+ 
+  /** 
+   * Checkpointer child process.
+   */
+  public static final class CheckpointerChild {
+
+    public static void main(String[] args) throws Throwable {
+      if(LOG.isDebugEnabled()) LOG.debug("Starting Checkpointer child process.");
+      HamaConfiguration defaultConf = new HamaConfiguration();
+      int ret = 0;
+      if(null != args && 1 == args.length) {
+        int port = Integer.parseInt(args[0]);
+        defaultConf.setInt("bsp.checkpoint.port", 
+          Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
+        if(LOG.isDebugEnabled()) 
+          LOG.debug("Supplied checkpointer port value:"+port);
+        Checkpointer ckpt = new Checkpointer(defaultConf);
+        ckpt.start();
+        ckpt.join();
+        LOG.info("Checkpoint finishes its execution.");
+      }else {
+        throw new IllegalArgumentException(
+        "Port value is not provided for checkpointing service.");
+      }
+    }
+  }
 
   /**
-   * The main() for child processes.
+   * The main() for BSPPeer child processes.
    */
-  public static class Child {
+  public static class BSPPeerChild {
 
     public static void main(String[] args) throws Throwable {
-      LOG.debug("Child starting");
+      if(LOG.isDebugEnabled()) LOG.debug("BSPPeerChild starting");
 
       HamaConfiguration defaultConf = new HamaConfiguration();
       // report address
@@ -842,9 +878,13 @@ public class GroomServer implements Runn
       BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
 
       defaultConf.set(Constants.PEER_HOST, args[3]);
+      if(null != args && 5 == args.length ) {
+        defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
+      } 
       defaultConf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
 
       BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical);
+      bspPeer.reinitialize();
       bspPeer.setJobConf(job);
       bspPeer.setAllPeerNames(umbilical.getAllPeerNames().getAllPeerNames());
 
@@ -917,4 +957,4 @@ public class GroomServer implements Runn
   public PeerNames getAllPeerNames() {
     return allPeerNames;
   }
-}
\ No newline at end of file
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1162501&r1=1162500&r2=1162501&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Sun Aug 28 11:42:16 2011
@@ -22,12 +22,23 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.util.Vector;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import static java.util.concurrent.TimeUnit.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.RunJar;
+import org.apache.hama.checkpoint.CheckpointRunner;
 
 /** 
  * Base class that runs a task in a separate process. 
@@ -36,11 +47,73 @@ public class TaskRunner extends Thread {
 
   public static final Log LOG = LogFactory.getLog(TaskRunner.class);
 
-  boolean killed = false;
-  private Process process;
-  private Task task;
-  private BSPJob conf;
-  private GroomServer groomServer;
+  boolean bspKilled = false;
+  private Process bspProcess; 
+
+  private final Task task;
+  private final BSPJob conf;
+  private final GroomServer groomServer;
+
+  class BspChildRunner implements Callable {
+    private final List<String> commands; 
+    private final File workDir;
+    private final ScheduledExecutorService sched;
+    private final AtomicReference<ScheduledFuture> future;
+
+    BspChildRunner(List<String> commands, File workDir){ 
+      this.commands = commands;
+      this.workDir = workDir;
+      this.sched = Executors.newScheduledThreadPool(1);
+      this.future = new AtomicReference<ScheduledFuture>();
+    }
+
+    void start() {
+      this.future.set(this.sched.schedule(this, 0, SECONDS));
+      LOG.info("Start building BSPPeer process.");
+    }
+
+    void stop() {
+      killBsp(); 
+      this.sched.schedule(this, 0, SECONDS);
+      LOG.info("Stop BSPPeer process.");
+    }
+ 
+    void join() throws InterruptedException, ExecutionException {
+      this.future.get().get();
+    }
+
+    public Object call() throws Exception {
+      ProcessBuilder builder = new ProcessBuilder(commands);
+      builder.directory(workDir);
+      try {
+        bspProcess = builder.start();
+        new Thread() {
+          public void run() {
+            logStream(bspProcess.getErrorStream()); // copy log output
+          }
+        }.start();
+
+        new Thread() {
+          public void run() {
+            logStream(bspProcess.getInputStream());
+          }
+        }.start();
+
+        int exit_code = bspProcess.waitFor();
+        if (!bspKilled && exit_code != 0) {
+          throw new IOException("BSP task process exit with nonzero status of "
+              + exit_code + ".");
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Thread is interrupted when execeuting Checkpointer process.", e);
+      } catch(IOException ioe) {
+        LOG.error("Error when executing BSPPeer process.", ioe);
+      } finally {
+        killBsp();
+      }
+      return null;
+    }
+  }
 
   public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
     this.task = bspTask;
@@ -61,108 +134,127 @@ public class TaskRunner extends Thread {
     return true;
   }
 
-  public void run() {
-    try {
-      String sep = System.getProperty("path.separator");
-      File workDir = new File(new File(task.getJobFile()).getParent(), "work");
-      boolean isCreated = workDir.mkdirs();
-      if(!isCreated) {
-        LOG.debug("TaskRunner.workDir : " + workDir);
-      }
+  private File createWorkDirectory(){
+    File workDir = new File(new File(task.getJobFile()).getParent(), "work");
+    boolean isCreated = workDir.mkdirs();
+    if(LOG.isDebugEnabled() && !isCreated) {
+      LOG.debug("TaskRunner.workDir : " + workDir);
+    }
+    return workDir;
+  }
 
-      StringBuffer classPath = new StringBuffer();
-      // start with same classpath as parent process
-      classPath.append(System.getProperty("java.class.path"));
-      classPath.append(sep);
+  private String assembleClasspath(BSPJob jobConf, File workDir) {
+    StringBuffer classPath = new StringBuffer();
+    // start with same classpath as parent process
+    classPath.append(System.getProperty("java.class.path"));
+    classPath.append(System.getProperty("path.separator"));
 
-      String jar = conf.getJar();
-      if (jar != null) { // if jar exists, it into workDir
+    String jar = jobConf.getJar();
+    if (jar != null) { // if jar exists, it into workDir
+      try {
         RunJar.unJar(new File(jar), workDir);
-        File[] libs = new File(workDir, "lib").listFiles();
-        if (libs != null) {
-          for (int i = 0; i < libs.length; i++) {
-            classPath.append(sep); // add libs from jar to classpath
-            classPath.append(libs[i]);
-          }
+      } catch(IOException ioe) {
+        LOG.error("Unable to uncompressing file to "+workDir.toString(), ioe);
+      }
+      File[] libs = new File(workDir, "lib").listFiles();
+      if (libs != null) {
+        for (int i = 0; i < libs.length; i++) {
+          // add libs from jar to classpath
+          classPath.append(System.getProperty("path.separator")); 
+          classPath.append(libs[i]);
         }
-        classPath.append(sep);
-        classPath.append(new File(workDir, "classes"));
-        classPath.append(sep);
-        classPath.append(workDir);
       }
+      classPath.append(System.getProperty("path.separator"));
+      classPath.append(new File(workDir, "classes"));
+      classPath.append(System.getProperty("path.separator"));
+      classPath.append(workDir);
+    }
+    return classPath.toString();
+  } 
 
-      // Build exec child jmv args.
-      Vector<String> vargs = new Vector<String>();
-      File jvm = // use same jvm as parent
-      new File(new File(System.getProperty("java.home"), "bin"), "java");
-      vargs.add(jvm.toString());
-
-      // bsp.child.java.opts
-      String javaOpts = conf.getConf().get("bsp.child.java.opts", "-Xmx200m");
-      javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
-      
-      String[] javaOptsSplit = javaOpts.split(" ");
-      for (int i = 0; i < javaOptsSplit.length; i++) {
-        vargs.add(javaOptsSplit[i]);
-      }
+  private List<String> buildJvmArgs(BSPJob jobConf, String classPath, Class child){
+    // Build exec child jmv args.
+    List<String> vargs = new ArrayList<String>(); 
+    File jvm = // use same jvm as parent
+    new File(new File(System.getProperty("java.home"), "bin"), "java");
+    vargs.add(jvm.toString());
+
+    // bsp.child.java.opts
+    String javaOpts = jobConf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+    javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
+    
+    String[] javaOptsSplit = javaOpts.split(" ");
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
 
-      // Add classpath.
-      vargs.add("-classpath");
-      vargs.add(classPath.toString());
-      // Add main class and its arguments
-      vargs.add(GroomServer.Child.class.getName()); // main of Child
+    // Add classpath.
+    vargs.add("-classpath");
+    vargs.add(classPath.toString());
+    // Add main class and its arguments
+    if(LOG.isDebugEnabled())
+      LOG.debug("Executing child Process "+child.getName());
+    vargs.add(child.getName()); // main of bsp or checkpointer Child
 
+
+    if(GroomServer.BSPPeerChild.class.equals(child)) {
       InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
       vargs.add(addr.getHostName());
       vargs.add(Integer.toString(addr.getPort()));
       vargs.add(task.getTaskID().toString());
       vargs.add(groomServer.groomHostName);
+    }
 
-      // Run java
-      runChild((String[]) vargs.toArray(new String[0]), workDir);
-    } catch (IOException e) {
-      LOG.error(e);
+    if(jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
+      String ckptPort = 
+        jobConf.getConf().get("bsp.checkpoint.port", 
+        CheckpointRunner.DEFAULT_PORT);
+      if(LOG.isDebugEnabled())
+        LOG.debug("Checkpointer's port:"+ckptPort);
+      vargs.add(ckptPort);
     }
+
+    return vargs;
   }
 
   /**
-   * Run the child process
+   * Build working environment and launch BSPPeer and Checkpointer processes.
+   * And transmit data from BSPPeer's inputstream to Checkpointer's  
+   * OutputStream. 
    */
-  private void runChild(String[] args, File dir) throws IOException {
-    this.process = Runtime.getRuntime().exec(args, null, dir);
-    try {
-      new Thread() {
-        public void run() {
-          logStream(process.getErrorStream()); // copy log output
-        }
-      }.start();
-
-      logStream(process.getInputStream()); // normally empty
-
-      int exit_code = process.waitFor();
-      if (!killed && exit_code != 0) {
-        throw new IOException("Task process exit with nonzero status of "
-            + exit_code + ".");
-      }
+  public void run() {
+    File workDir = createWorkDirectory(); 
+    String classPath = assembleClasspath(conf, workDir);
+    if(LOG.isDebugEnabled())
+      LOG.debug("Spawned child's classpath "+classPath);
+    List<String> bspArgs = 
+      buildJvmArgs(conf, classPath, GroomServer.BSPPeerChild.class);
 
-    } catch (InterruptedException e) {
-      throw new IOException(e.toString());
-    } finally {
-      kill();
+    BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
+    bspPeer.start();
+    try {
+      bspPeer.join();
+    } catch(InterruptedException ie) {
+      LOG.error("BSPPeer child process is interrupted.", ie);
+    } catch(ExecutionException ee) {
+      LOG.error("Failure occurs when retrieving tasks result.", ee);
     }
+    LOG.info("Finishes executing BSPPeer child process.");
   }
 
   /**
-   * Kill the child process
+   * Kill bsppeer child process.
    */
-  public void kill() {
-    if (process != null) {
-      process.destroy();
+  public void killBsp() {
+    if (bspProcess != null) {
+      bspProcess.destroy();
     }
-    killed = true;
+    bspKilled = true;
   }
 
   /**
+   * Log process's input/ output stream.
+   * @param output stream to be logged.
    */
   private void logStream(InputStream output) {
     try {

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java?rev=1162501&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/CheckpointRunner.java Sun Aug 28 11:42:16 2011
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.checkpoint;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.GroomServer.CheckpointerChild;
+import org.apache.hama.bsp.GroomServer;
+import static java.util.concurrent.TimeUnit.*;
+
+
+public final class CheckpointRunner implements Callable {
+
+  public static final Log LOG = LogFactory.getLog(CheckpointRunner.class);
+  public static final String DEFAULT_PORT = "1590";
+  
+  private final List<String> commands;
+  private final ScheduledExecutorService sched;
+  private final AtomicReference<Process> process;
+  private final AtomicBoolean isAlive = new AtomicBoolean(false);
+
+  public CheckpointRunner(List<String> commands) {
+    this.commands = commands;
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Command for executing Checkpoint runner:"+
+      Arrays.toString(this.commands.toArray()));
+    }
+    this.sched = Executors.newScheduledThreadPool(10);
+    this.process = new AtomicReference<Process>();
+  }
+
+  public static final List<String> buildCommands(final Configuration config) {
+    List<String> vargs = new ArrayList<String>();
+    File jvm =
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+    vargs.add(jvm.toString());
+
+    String javaOpts = config.get("bsp.checkpoint.child.java.opts", "-Xmx50m");
+    String[] javaOptsSplit = javaOpts.split(" ");
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+    vargs.add("-classpath");
+    vargs.add(System.getProperty("java.class.path"));
+    vargs.add(CheckpointerChild.class.getName());
+    String port = config.get("bsp.checkpoint.port", DEFAULT_PORT);
+    if(LOG.isDebugEnabled())
+      LOG.debug("Checkpointer's port:"+port);
+    vargs.add(port);
+
+    return vargs;
+  }
+
+  public void start() {
+    if(!isAlive.compareAndSet(false, true)) {
+      throw new IllegalStateException(this.getClass().getName()+
+      " is already running.");
+    }
+    this.sched.schedule(this, 0, SECONDS);
+    LOG.info("Start building checkpointer process.");
+  }
+
+  public void stop() {
+    kill();
+    this.sched.shutdown();
+    LOG.info("Stop checkpointer process.");
+  }
+
+  public Process getProcess() {
+    return this.process.get();
+  }
+
+  public void kill() {
+    if (this.process.get() != null) {
+      this.process.get().destroy();
+    }
+    isAlive.set(false);
+  }
+
+  public boolean isAlive() {
+    return isAlive.get();
+  }
+
+  public Object call() throws Exception {
+    ProcessBuilder builder = new ProcessBuilder(commands);
+    try{
+      this.process.set(builder.start());
+      new Thread() {
+        @Override
+        public void run() {
+          logStream(process.get().getErrorStream());
+        }
+      }.start();
+
+      new Thread() {
+        @Override
+        public void run() {
+          logStream(process.get().getInputStream());
+        }
+      }.start();
+
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        @Override
+        public void run() {
+          LOG.info("Destroying checkpointer process.");
+          getProcess().destroy();
+        }
+      });
+
+      int exit_code = this.process.get().waitFor();
+      if (!isAlive() && exit_code != 0) {
+        throw new IOException("Checkpointer process exit with nonzero status of "
+            + exit_code + ".");
+      }
+    } catch(InterruptedException e){
+      LOG.warn("Thread is interrupted when execeuting Checkpointer process.", e);
+    } catch(IOException ioe) {
+      LOG.error("Error when executing Checkpointer process.", ioe);
+    } finally {
+      kill();
+    }
+    return null;
+  }
+
+  private void logStream(InputStream output) {
+    try {
+      BufferedReader in = new BufferedReader(new InputStreamReader(output));
+      String line;
+      while ((line = in.readLine()) != null) {
+        LOG.info(line);
+      }
+    } catch (IOException e) {
+      LOG.warn("Error reading checkpoint process's inputstream.", e);
+    } finally {
+      try {
+        output.close();
+      } catch (IOException e) {
+        LOG.warn("Error closing checkpoint's inputstream.", e);
+      }
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java?rev=1162501&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/checkpoint/Checkpointer.java Sun Aug 28 11:42:16 2011
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.checkpoint;
+
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import static java.util.concurrent.TimeUnit.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.GroomServerRunner;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class is responsible for checkpointing messages to hdfs. 
+ */
+public final class Checkpointer implements Callable {
+
+  public static Log LOG = LogFactory.getLog(Checkpointer.class);
+
+  private final ScheduledExecutorService scheduler = 
+    Executors.newScheduledThreadPool(1);
+  private final FileSystem dfs; 
+  private final AtomicBoolean ckptState = new AtomicBoolean(false);
+  private final BSPMessageDeserializer messageDeserializer;
+  private final AtomicReference<ScheduledFuture> future =  
+    new AtomicReference<ScheduledFuture>(); 
+
+  /** 
+   * Reading from socket inputstream as DataInput.
+   */
+  public static final class BSPMessageDeserializer implements Callable {
+    final BlockingQueue<BSPSerializableMessage> messageQueue = 
+      new LinkedBlockingQueue<BSPSerializableMessage>();
+    final ScheduledExecutorService sched; 
+    final ScheduledExecutorService workers; 
+    final AtomicBoolean serializerState = new AtomicBoolean(false);
+    final ServerSocket server;
+    
+    public BSPMessageDeserializer(final int port) throws IOException { 
+      this.sched = Executors.newScheduledThreadPool(1);
+      this.workers = Executors.newScheduledThreadPool(10);
+      this.server = new ServerSocket(port); 
+      LOG.info("Deserializer's port is opened at "+port);
+    }
+
+    public int port() {
+      return this.server.getLocalPort(); 
+    }
+
+    public void start() {
+      if(!serializerState.compareAndSet(false, true)) {
+        throw new IllegalStateException("BSPMessageDeserializer has been "+
+        "started up.");
+      }
+      this.sched.schedule(this, 0, SECONDS);
+      LOG.info("BSPMessageDeserializer is started.");
+    }
+
+    public void stop() {
+      try {
+        this.server.close();
+      } catch(IOException ioe) {
+        LOG.error("Unable to close message serializer server socket.", ioe);
+      }
+      this.sched.shutdown();
+      this.workers.shutdown();
+      this.serializerState.set(false);
+      LOG.info("BSPMessageDeserializer is stopped.");
+    }
+
+    public boolean state(){
+      return this.serializerState.get();
+    }
+
+    /**
+     * Message is enqueued for communcating data sent from BSPPeer.
+     */
+    public BlockingQueue<BSPSerializableMessage> messageQueue() {
+      return this.messageQueue;
+    }
+
+    public Object call() throws Exception {
+      try {
+        while(state()) {
+          Socket connection = server.accept();
+          final DataInput in = new DataInputStream(connection.getInputStream());
+          this.workers.schedule(new Callable() {
+            public Object call() throws Exception {
+              BSPSerializableMessage tmp = new BSPSerializableMessage();
+              tmp.readFields(in);
+              messageQueue().put(tmp);
+              return null;
+            }
+          }, 0, SECONDS);
+        }
+      } catch(EOFException eofe) {
+        LOG.info("Closing checkpointer's input stream.", eofe);
+      } catch(IOException ioe) {
+        LOG.error("Error when reconstructing BSPSerializableMessage.", ioe);
+      }
+      return null;
+    }
+  }
+
+  public Checkpointer(final Configuration conf) throws IOException {
+    this.dfs = FileSystem.get(conf); 
+    if(null == this.dfs) 
+      throw new NullPointerException("HDFS instance not found.");
+    int port = conf.getInt("bsp.checkpoint.port", 
+      Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
+    if(LOG.isDebugEnabled()) 
+      LOG.debug("Checkpoint port value:"+port); 
+    this.messageDeserializer = new BSPMessageDeserializer(port);
+  }
+
+  /**
+   * Activate the checkpoint thread.
+   */
+  public void start(){
+    if(!ckptState.compareAndSet(false, true)) {
+      throw new IllegalStateException("Checkpointer has been started up.");
+    }
+    this.messageDeserializer.start();
+    this.future.set(this.scheduler.schedule(this, 0, SECONDS));
+    LOG.info("Checkpointer is started.");
+  }
+
+  /**
+   * Stop checkpoint thread.
+   */
+  public void stop(){
+    this.messageDeserializer.stop();
+    this.scheduler.shutdown();
+    this.ckptState.set(false);
+    LOG.info("Checkpointer is stopped.");
+  }
+  
+  /**
+   * Check if checkpointer is running.
+   * @return true if checkpointer is runing; false otherwise.
+   */
+  public boolean isAlive(){
+    return !this.scheduler.isShutdown() && this.ckptState.get();
+  }
+
+  public void join() throws InterruptedException, ExecutionException {
+    this.future.get().get();
+  }
+
+  public Boolean call() throws Exception {
+    BlockingQueue<BSPSerializableMessage> queue = 
+      this.messageDeserializer.messageQueue();
+    while(isAlive()) {
+      BSPSerializableMessage msg = queue.take();
+      String path = msg.checkpointedPath();
+      if(null == path || path.toString().isEmpty()) 
+        throw new NullPointerException("File dest is not provided.");
+      FSDataOutputStream out = this.dfs.create(new Path(path)); 
+      msg.messageBundle().write(out);
+      try { } finally { try { out.close(); } catch(IOException e) {
+        LOG.error("Fail to close hdfs output stream.", e); } 
+      } 
+    }
+    try {  } finally { LOG.info("Stop checkpointing."); this.stop(); }
+    return new Boolean(true);
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java?rev=1162501&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/BSPSerializerWrapper.java Sun Aug 28 11:42:16 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+
+public final class BSPSerializerWrapper {
+
+  private final BSPPeer.BSPMessageSerializer serializer;
+
+  public BSPSerializerWrapper(Configuration conf, int port) throws IOException {
+    this.serializer = new BSPPeer(conf, null, null).new BSPMessageSerializer(
+      conf.getInt("bsp.checkpoint.port", port)); 
+  }  
+
+  public final void serialize(final BSPSerializableMessage tmp) 
+      throws IOException {
+    this.serializer.serialize(tmp);
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java?rev=1162501&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/checkpoint/TestCheckpoint.java Sun Aug 28 11:42:16 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.checkpoint;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.util.List;
+
+import junit.framework.TestCase;
+import static junit.framework.Assert.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BSPPeer.BSPSerializableMessage;
+import org.apache.hama.bsp.BSPSerializerWrapper;
+import org.apache.hama.bsp.DoubleMessage;
+import org.apache.hama.HamaConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TestCheckpoint extends TestCase {
+
+  public static final Log LOG = LogFactory.getLog(TestCheckpoint.class);
+
+  private CheckpointRunner runner;
+  private BSPSerializerWrapper serializer;
+  static final String TEST_STRING = "Test String";
+  private FileSystem hdfs;
+  static final DoubleMessage estimate = 
+    new DoubleMessage("192.168.1.123:61000", 3.1415926d);
+
+  public void setUp() throws Exception {
+    Configuration conf = new HamaConfiguration();
+    this.hdfs = FileSystem.get(conf);
+    assertNotNull("File system object should exist.", this.hdfs);
+    this.runner =  
+      new CheckpointRunner(CheckpointRunner.buildCommands(conf));
+    assertNotNull("Checkpoint instance should exist.", this.runner);
+    this.runner.start();
+    Thread.sleep(1000*1);
+    Process process = this.runner.getProcess();
+    assertNotNull("Checkpoint process should be created.", process);
+    this.serializer = new BSPSerializerWrapper(conf, 
+      Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
+  }
+
+  private BSPMessageBundle createMessageBundle() {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    bundle.addMessage(estimate);
+    return bundle;
+  }
+
+  private String checkpointedPath() {
+      return "/tmp/" + "job_201108221205_000" + "/" + "0" +
+      "/" + "attempt_201108221205_0001_000000_0";
+  }
+
+  public void testCheckpoint() throws Exception {
+    this.serializer.serialize(new BSPSerializableMessage(
+    checkpointedPath(), createMessageBundle()));
+    Thread.sleep(1000); 
+    Path path = new Path(checkpointedPath());
+    boolean exists = this.hdfs.exists(path);
+    assertTrue("Check if file is actually written to hdfs.", exists); 
+    BSPMessageBundle bundle = new BSPMessageBundle(); 
+    DataInput in = new DataInputStream(this.hdfs.open(path));
+    bundle.readFields(in);
+    List<BSPMessage> messages = bundle.getMessages();
+    assertEquals("Only one message exists.", 1,  messages.size());
+    for(BSPMessage message: messages) {
+      String peer = (String)message.getTag();
+      assertEquals("BSPPeer value in form of <ip>:<port>.", peer, estimate.getTag());
+      Double pi = (Double)message.getData();
+      assertEquals("Message content.", pi, estimate.getData());
+    }
+  }
+
+  public void tearDown() throws Exception {
+    this.runner.stop();
+  }
+  
+}



Mime
View raw message