incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1162689 - in /incubator/hama/trunk/core: conf/ src/main/java/org/apache/hama/ src/main/java/org/apache/hama/bsp/ src/main/java/org/apache/hama/ipc/ src/test/java/org/apache/hama/ src/test/java/org/apache/hama/bsp/
Date Mon, 29 Aug 2011 09:12:52 GMT
Author: edwardyoon
Date: Mon Aug 29 09:12:51 2011
New Revision: 1162689

URL: http://svn.apache.org/viewvc?rev=1162689&view=rev
Log:
Commit HAMA-413 patch.

Modified:
    incubator/hama/trunk/core/conf/hama-default.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java

Modified: incubator/hama/trunk/core/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/conf/hama-default.xml?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/conf/hama-default.xml (original)
+++ incubator/hama/trunk/core/conf/hama-default.xml Mon Aug 29 09:12:51 2011
@@ -77,7 +77,7 @@
   </property>
   <property>
     <name>bsp.child.java.opts</name>
-    <value>-Xmx1024m</value>
+    <value>-Xmx512m</value>
     <description>Java opts for the groom server child processes.  
     The following symbol, if present, will be interpolated: @taskid@ is replaced 
     by current TaskID. Any other occurrences of '@' will go unchanged.
@@ -93,6 +93,12 @@
     <value>20</value>
     <description>Number of tasks that run in parallel when in local mode.</description>
   </property>
+  <property>
+    <name>bsp.tasks.maximum</name>
+    <value>3</value>
+    <description>The maximum number of BSP tasks that will be run simultaneously 
+    by a groom server.</description>
+  </property>
 
   <!--
   Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon Aug 29 09:12:51 2011
@@ -52,7 +52,9 @@ public interface Constants {
   public static final String GROOM_SERVER_IMPL= "hama.groomserver.impl";
   
   /** When we encode strings, we always specify UTF8 encoding */
-  static final String UTF8_ENCODING = "UTF-8";
+  public static final String UTF8_ENCODING = "UTF-8";
+  
+  public static final String MAX_TASKS_PER_GROOM = "bsp.tasks.maximum";
   
   ///////////////////////////////////////
   // Constants for ZooKeeper

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon Aug 29 09:12:51 2011
@@ -45,22 +45,34 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.http.HttpServer;
+import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
 import org.apache.hama.ipc.MasterProtocol;
-import org.apache.hama.ipc.GroomProtocol;
+import org.apache.hama.zookeeper.QuorumPeer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * BSPMaster is responsible to control all the groom servers and to manage bsp
  * jobs.
  */
 public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
-    GroomServerManager {
+    GroomServerManager, Watcher {
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
 
   private HamaConfiguration conf;
 
+  private ZooKeeper zk = null;
+  private String bspRoot = null;
+  
   /**
    * Constants for BSPMaster's status.
    */
@@ -163,7 +175,6 @@ public class BSPMaster implements JobSub
             } else if (jip.getStatus().getRunState() == JobStatus.KILLED) {
               GroomProtocol worker = findGroomServer(tmpStatus);
               Directive d1 = new DispatchTasksDirective(
-                  currentGroomServerPeers(),
                   new GroomServerAction[] { new KillTaskAction(ts.getTaskId()) });
               try {
                 worker.dispatch(d1);
@@ -307,11 +318,11 @@ public class BSPMaster implements JobSub
     }
     Throwable e = null;
     try {
-      GroomProtocol wc = (GroomProtocol) RPC.waitForProxy(
-          GroomProtocol.class, GroomProtocol.versionID,
-          resolveWorkerAddress(status.getRpcServer()), this.conf);
+      GroomProtocol wc = (GroomProtocol) RPC.waitForProxy(GroomProtocol.class,
+          GroomProtocol.versionID, resolveWorkerAddress(status.getRpcServer()),
+          this.conf);
       if (null == wc) {
-        LOG.warn("Fail to create Worker client at host " + status.getPeerName());
+        LOG.warn("Fail to create Worker client at host");
         return false;
       }
       // TODO: need to check if peer name has changed
@@ -415,10 +426,88 @@ public class BSPMaster implements JobSub
     BSPMaster result = new BSPMaster(conf, identifier);
     result.taskScheduler.setGroomServerManager(result);
     result.taskScheduler.start();
+    
+    // init zk root and child nodes
+    result.initZK(conf);
 
     return result;
   }
 
+  /**
+   * When start the cluster, cleans all zk nodes up.
+   * 
+   * @param conf
+   */
+  private void initZK(HamaConfiguration conf) {
+    try {
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+          .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+    } catch (IOException e) {
+      LOG.error("Exception during reinitialization!", e);
+    }
+
+    bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
+        Constants.DEFAULT_ZOOKEEPER_ROOT);
+    Stat s = null;
+    if (zk != null) {
+      try {
+        s = zk.exists(bspRoot, false);
+      } catch (Exception e) {
+        LOG.error(s, e);
+      }
+
+      if (s == null) {
+        try {
+          zk.create(bspRoot, new byte[0], Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT);
+        } catch (KeeperException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      } else {
+        this.clearZKNodes();
+      }
+    }
+  }
+
+  public void clearZKNodes() {
+    try {
+      for (String node : zk.getChildren(bspRoot, this)) {
+        zk.delete(bspRoot + "/" + node, 0);
+      }
+    } catch (KeeperException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void createJobRoot(String string) {
+    try {
+      zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT);
+    } catch (KeeperException e) {
+      LOG.error(e);
+    } catch (InterruptedException e) {
+      LOG.error(e);
+    }
+  }
+
+  public void deleteJobRoot(String string) {
+    try {
+      for (String node : zk.getChildren("/" + string, this)) {
+        zk.delete("/" + string + "/" + node, 0);
+      }
+      
+      zk.delete("/" + string, 0);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (KeeperException e) {
+      e.printStackTrace();
+    }
+  }
+  
   public static InetSocketAddress getAddress(Configuration conf) {
     String hamaMasterStr = conf.get("bsp.master.address", "localhost");
     int defaultPort = conf.getInt("bsp.master.port", 40000);
@@ -505,25 +594,25 @@ public class BSPMaster implements JobSub
 
   @Override
   public ClusterStatus getClusterStatus(boolean detailed) {
-    Map<String, String> groomPeersMap = null;
+    Map<String, String> groomsMap = null;
 
     // give the caller a snapshot of the cluster status
     int numGroomServers = groomServers.size();
     if (detailed) {
-      groomPeersMap = new HashMap<String, String>();
+      groomsMap = new HashMap<String, String>();
       for (Map.Entry<GroomServerStatus, GroomProtocol> entry : groomServers
           .entrySet()) {
         GroomServerStatus s = entry.getKey();
-        groomPeersMap.put(s.getGroomName(), s.getPeerName());
+        groomsMap.put(s.getGroomName(), s.getGroomHostName());
       }
+
     }
 
-    // TODO currently we only have one task slot per groom server
-    this.totalTaskCapacity = numGroomServers;
+    int tasksPerGroom = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
+    this.totalTaskCapacity = tasksPerGroom * numGroomServers;
 
     if (detailed) {
-      return new ClusterStatus(groomPeersMap, totalTasks, totalTaskCapacity,
-          state);
+      return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, state);
     } else {
       return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity,
           state);
@@ -555,15 +644,6 @@ public class BSPMaster implements JobSub
     jobInProgressListeners.remove(listener);
   }
 
-  @Override
-  public Map<String, String> currentGroomServerPeers() {
-    Map<String, String> tmp = new HashMap<String, String>();
-    for (GroomServerStatus status : groomServers.keySet()) {
-      tmp.put(status.getGroomName(), status.getPeerName());
-    }
-    return tmp;
-  }
-
   public String getBSPMasterName() {
     return host + ":" + port;
   }
@@ -715,10 +795,21 @@ public class BSPMaster implements JobSub
   }
 
   public void shutdown() {
+    try {
+      this.zk.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
     this.masterServer.stop();
   }
 
   public BSPMaster.State currentState() {
     return this.state;
   }
+
+  @Override
+  public void process(WatchedEvent event) {
+    // TODO Auto-generated method stub
+
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Mon Aug 29 09:12:51 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.bsp;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
@@ -24,20 +26,16 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,8 +43,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hama.checkpoint.CheckpointRunner;
 import org.apache.hama.Constants;
+import org.apache.hama.checkpoint.CheckpointRunner;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.util.Bytes;
 import org.apache.hama.zookeeper.QuorumPeer;
@@ -54,9 +52,8 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.ZooDefs.Ids;
 
 /**
  * This class represents a BSP peer.
@@ -75,18 +72,12 @@ public class BSPPeer implements Watcher,
   private final String bspRoot;
   private final String quorumServers;
 
-  private final Map<InetSocketAddress, BSPPeerInterface> peers = 
-    new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
-  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = 
-    new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
-  private ConcurrentLinkedQueue<BSPMessage> localQueue = 
-    new ConcurrentLinkedQueue<BSPMessage>();
-  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = 
-    new ConcurrentLinkedQueue<BSPMessage>();
-  private final Map<String, InetSocketAddress> peerSocketCache = 
-    new ConcurrentHashMap<String, InetSocketAddress>();
+  private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>();
+  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String, InetSocketAddress>();
 
-  private SortedSet<String> allPeerNames = new TreeSet<String>();
   private InetSocketAddress peerAddress;
   private TaskStatus currentTaskStatus;
 
@@ -97,15 +88,16 @@ public class BSPPeer implements Watcher,
 
   public static final class BSPSerializableMessage implements Writable {
     final AtomicReference<String> path = new AtomicReference<String>();
-    final AtomicReference<BSPMessageBundle> bundle = 
-      new AtomicReference<BSPMessageBundle>();
+    final AtomicReference<BSPMessageBundle> bundle = new AtomicReference<BSPMessageBundle>();
 
-    public BSPSerializableMessage(){}
+    public BSPSerializableMessage() {
+    }
 
-    public BSPSerializableMessage(final String path, final BSPMessageBundle bundle) {
-      if(null == path) 
+    public BSPSerializableMessage(final String path,
+        final BSPMessageBundle bundle) {
+      if (null == path)
         throw new NullPointerException("No path provided for checkpointing.");
-      if(null == bundle) 
+      if (null == bundle)
         throw new NullPointerException("No data provided for checkpointing.");
       this.path.set(path);
       this.bundle.set(bundle);
@@ -114,21 +106,21 @@ public class BSPPeer implements Watcher,
     public final String checkpointedPath() {
       return this.path.get();
     }
-    
-    public final BSPMessageBundle messageBundle(){
+
+    public final BSPMessageBundle messageBundle() {
       return this.bundle.get();
     }
 
-    @Override 
+    @Override
     public final void write(DataOutput out) throws IOException {
       out.writeUTF(this.path.get());
       this.bundle.get().write(out);
     }
 
-    @Override 
+    @Override
     public final void readFields(DataInput in) throws IOException {
       this.path.set(in.readUTF());
-      BSPMessageBundle pack = new BSPMessageBundle(); 
+      BSPMessageBundle pack = new BSPMessageBundle();
       pack.readFields(in);
       this.bundle.set(pack);
     }
@@ -139,45 +131,45 @@ public class BSPPeer implements Watcher,
     final Socket client;
     final ScheduledExecutorService sched;
 
-    public BSPMessageSerializer(final int port) { 
+    public BSPMessageSerializer(final int port) {
       Socket tmp = null;
       int cnt = 0;
       do {
         tmp = init(port);
-        cnt ++;
-        try {   
-          Thread.sleep(1000); 
-        } catch(InterruptedException ie) { 
-          LOG.warn("Thread is interrupted.", ie); 
+        cnt++;
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+          LOG.warn("Thread is interrupted.", ie);
           Thread.currentThread().interrupt();
         }
-      } while(null == tmp && 10 > cnt);
+      } while (null == tmp && 10 > cnt);
       this.client = tmp;
-      if(null == this.client)
+      if (null == this.client)
         throw new NullPointerException("Client socket is null.");
-      this.sched = Executors.newScheduledThreadPool(
-        conf.getInt("bsp.checkpoint.serializer_thread", 10));
-      LOG.info(BSPMessageSerializer.class.getName()+
-      " is ready to serialize message.");
+      this.sched = Executors.newScheduledThreadPool(conf.getInt(
+          "bsp.checkpoint.serializer_thread", 10));
+      LOG.info(BSPMessageSerializer.class.getName()
+          + " is ready to serialize message.");
     }
 
     private Socket init(final int port) {
-       Socket tmp = null;
-       try {
-         tmp = new Socket("localhost", port);
-       } catch(UnknownHostException uhe) {
-         LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
-       } catch(IOException ioe) {
-         LOG.warn("Fail to create socket.", ioe);
-       }
-       return tmp;
-    }
-   
-    void serialize(final BSPSerializableMessage tmp) throws IOException { 
-      if(LOG.isDebugEnabled())
-        LOG.debug("Messages are saved to "+tmp.checkpointedPath());
+      Socket tmp = null;
+      try {
+        tmp = new Socket("localhost", port);
+      } catch (UnknownHostException uhe) {
+        LOG.error("Unable to connect to BSPMessageDeserializer.", uhe);
+      } catch (IOException ioe) {
+        LOG.warn("Fail to create socket.", ioe);
+      }
+      return tmp;
+    }
+
+    void serialize(final BSPSerializableMessage tmp) throws IOException {
+      if (LOG.isDebugEnabled())
+        LOG.debug("Messages are saved to " + tmp.checkpointedPath());
       final DataOutput out = new DataOutputStream(client.getOutputStream());
-      this.sched.schedule(new Callable() {
+      this.sched.schedule(new Callable<Object>() {
         public Object call() throws Exception {
           tmp.write(out);
           return null;
@@ -189,12 +181,12 @@ public class BSPPeer implements Watcher,
       try {
         this.client.close();
         this.sched.shutdown();
-      } catch(IOException io) {
+      } catch (IOException io) {
         LOG.error("Fail to close client socket.", io);
       }
     }
 
-  }// message serializer 
+  }// message serializer
 
   /**
    * Protected default constructor for LocalBSPRunner.
@@ -210,7 +202,7 @@ public class BSPPeer implements Watcher,
    * BSPPeer Constructor.
    * 
    * BSPPeer acts on behalf of clients performing bsp() tasks.
-   *  
+   * 
    * @param conf is the configuration file containing bsp peer host, port, etc.
    * @param umbilical is the bsp protocol used to contact its parent process.
    * @param taskid is the id that current process holds.
@@ -228,55 +220,37 @@ public class BSPPeer implements Watcher,
     bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
         Constants.DEFAULT_ZOOKEEPER_ROOT);
     quorumServers = QuorumPeer.getZKQuorumServersString(conf);
-    if(LOG.isDebugEnabled()) LOG.debug("Quorum  " + quorumServers);
+    if (LOG.isDebugEnabled())
+      LOG.debug("Quorum  " + quorumServers);
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
     BSPMessageSerializer msgSerializer = null;
-    if(this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      msgSerializer = 
-        new BSPMessageSerializer(conf.getInt("bsp.checkpoint.port", 
-        Integer.parseInt(CheckpointRunner.DEFAULT_PORT)));
+    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      msgSerializer = new BSPMessageSerializer(conf.getInt(
+          "bsp.checkpoint.port", Integer
+              .parseInt(CheckpointRunner.DEFAULT_PORT)));
     }
     this.messageSerializer = msgSerializer;
   }
 
   public void reinitialize() {
     try {
-      if(LOG.isDebugEnabled()) LOG.debug("reinitialize(): " + getPeerName());
-      this.server = RPC.getServer(this, peerAddress.getHostName(),
-          peerAddress.getPort(), conf);
+      if (LOG.isDebugEnabled())
+        LOG.debug("reinitialize(): " + getPeerName());
+      this.server = RPC.getServer(this, peerAddress.getHostName(), peerAddress
+          .getPort(), conf);
       server.start();
       LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
           + peerAddress.getPort());
     } catch (IOException e) {
       LOG.error("Fail to start RPC server!", e);
     }
- 
+
     try {
       this.zk = new ZooKeeper(quorumServers, conf.getInt(
           Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
-    } catch(IOException e) {
+    } catch (IOException e) {
       LOG.error("Fail while reinitializing zookeeeper!", e);
     }
-  
-    Stat s = null;
-    if (zk != null) {
-      try {
-        s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false);
-      } catch (Exception e) {
-        LOG.error(s, e);
-      }
-
-      if (s == null) {
-        try {
-          zk.create(Constants.DEFAULT_ZOOKEEPER_ROOT, new byte[0],
-              Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        } catch (KeeperException e) {
-          LOG.error(e);
-        } catch (InterruptedException e) {
-          LOG.error(e);
-        }
-      }
-    }
   }
 
   @Override
@@ -316,11 +290,10 @@ public class BSPPeer implements Watcher,
 
   private String checkpointedPath() {
     String backup = conf.get("bsp.checkpoint.prefix_path", "/checkpoint/");
-    String ckptPath = 
-      backup + jobConf.getJobID().toString() + "/" + getSuperstepCount() + 
-      "/" + this.taskid.toString();
-    if(LOG.isDebugEnabled()) 
-      LOG.debug("Messages are to be saved to "+ckptPath);
+    String ckptPath = backup + jobConf.getJobID().toString() + "/"
+        + getSuperstepCount() + "/" + this.taskid.toString();
+    if (LOG.isDebugEnabled())
+      LOG.debug("Messages are to be saved to " + ckptPath);
     return ckptPath;
   }
 
@@ -332,8 +305,8 @@ public class BSPPeer implements Watcher,
   public void sync() throws IOException, KeeperException, InterruptedException {
     enterBarrier();
     long startTime = System.currentTimeMillis();
-    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = 
-        this.outgoingQueues.entrySet().iterator();
+    Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
+        .entrySet().iterator();
 
     while (it.hasNext()) {
       Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
@@ -348,11 +321,11 @@ public class BSPPeer implements Watcher,
       for (BSPMessage message : messages) {
         bundle.addMessage(message);
       }
-      
-      // checkpointing 
-      if(null != this.messageSerializer) {
+
+      // checkpointing
+      if (null != this.messageSerializer) {
         this.messageSerializer.serialize(new BSPSerializableMessage(
-          checkpointedPath(), bundle));
+            checkpointedPath(), bundle));
       }
 
       peer.put(bundle);
@@ -365,7 +338,7 @@ public class BSPPeer implements Watcher,
     leaveBarrier();
     currentTaskStatus.incrementSuperstepCount();
     umbilical.incrementSuperstepCount(taskid);
-    
+
     startTime = System.currentTimeMillis();
     // Clear outgoing queues.
     clearOutgoingQueues();
@@ -392,9 +365,8 @@ public class BSPPeer implements Watcher,
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
     LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
         + this.getSuperstepCount());
-    zk.create(bspRoot + "/" + getPeerName(),
-        Bytes.toBytes(this.getSuperstepCount()), Ids.OPEN_ACL_UNSAFE,
-        CreateMode.EPHEMERAL);
+    zk.create(getNodeName(), Bytes.toBytes(this.getSuperstepCount()),
+        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 
     while (true) {
       synchronized (mutex) {
@@ -410,7 +382,7 @@ public class BSPPeer implements Watcher,
   }
 
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
-    zk.delete(bspRoot + "/" + getPeerName(), 0);
+    zk.delete(getNodeName(), 0);
     while (true) {
       synchronized (mutex) {
         List<String> list = zk.getChildren(bspRoot, true);
@@ -424,6 +396,10 @@ public class BSPPeer implements Watcher,
     }
   }
 
+  private String getNodeName() {
+    return bspRoot + "/" + taskid.getJobID().toString() + "_" + getPeerName();
+  }
+
   @Override
   public void process(WatchedEvent event) {
     synchronized (mutex) {
@@ -438,8 +414,16 @@ public class BSPPeer implements Watcher,
 
   @Override
   public void close() throws IOException {
-    server.stop();
-    if(null != messageSerializer) this.messageSerializer.close();
+    this.clear();
+    try {
+      zk.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    if (server != null)
+      server.stop();
+    if (null != messageSerializer)
+      this.messageSerializer.close();
   }
 
   @Override
@@ -487,23 +471,22 @@ public class BSPPeer implements Watcher,
 
   private InetSocketAddress getAddress(String peerName) {
     String[] peerAddrParts = peerName.split(":");
-    return new InetSocketAddress(peerAddrParts[0],
-        Integer.parseInt(peerAddrParts[1]));
+    return new InetSocketAddress(peerAddrParts[0], Integer
+        .parseInt(peerAddrParts[1]));
   }
 
   @Override
   public String[] getAllPeerNames() {
-    return allPeerNames.toArray(new String[0]);
-  }
-
-  /**
-   * To be invoked by the Groom Server with a list of peers received from an
-   * heartbeat response (BSPMaster).
-   * 
-   * @param allPeers
-   */
-  void setAllPeerNames(Collection<String> allPeerNames) {
-    this.allPeerNames = new TreeSet<String>(allPeerNames);
+    String[] result = null;
+    try {
+      result = zk.getChildren("/" + jobConf.getJobID().toString(), this).toArray(new String[0]);
+    } catch (KeeperException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    
+    return result;
   }
 
   /**

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/ClusterStatus.java Mon Aug 29 09:12:51 2011
@@ -91,7 +91,7 @@ public class ClusterStatus implements Wr
   }
   
   /**
-   * Get the names of groom servers, and their peers, in the cluster.
+   * Get the names of groom servers, and their hostnames, in the cluster.
    * 
    * @return the active groom servers in the cluster.
    */  

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/DispatchTasksDirective.java Mon Aug 29 09:12:51 2011
@@ -20,10 +20,6 @@ package org.apache.hama.bsp;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,24 +33,17 @@ public final class DispatchTasksDirectiv
 
   public static final Log LOG = LogFactory.getLog(DispatchTasksDirective.class);
 
-  private Map<String, String> groomServerPeers;
   private GroomServerAction[] actions;
 
   public DispatchTasksDirective() {
     super();
   }
 
-  public DispatchTasksDirective(Map<String, String> groomServerPeers,
-      GroomServerAction[] actions) {
+  public DispatchTasksDirective(GroomServerAction[] actions) {
     super(Directive.Type.Request);
-    this.groomServerPeers = groomServerPeers;
     this.actions = actions;
   }
 
-  public Map<String, String> getGroomServerPeers() {
-    return this.groomServerPeers;
-  }
-
   public GroomServerAction[] getActions() {
     return this.actions;
   }
@@ -71,18 +60,6 @@ public final class DispatchTasksDirectiv
         action.write(out);
       }
     }
-    String[] groomServerNames = groomServerPeers.keySet()
-        .toArray(new String[0]);
-    WritableUtils.writeCompressedStringArray(out, groomServerNames);
-
-    List<String> groomServerAddresses = new ArrayList<String>(
-        groomServerNames.length);
-    for (String groomName : groomServerNames) {
-      groomServerAddresses.add(groomServerPeers.get(groomName));
-    }
-    WritableUtils.writeCompressedStringArray(out, groomServerAddresses
-        .toArray(new String[0]));
-
   }
 
   @Override
@@ -100,12 +77,5 @@ public final class DispatchTasksDirectiv
     } else {
       this.actions = null;
     }
-    String[] groomServerNames = WritableUtils.readCompressedStringArray(in);
-    String[] groomServerAddresses = WritableUtils.readCompressedStringArray(in);
-    groomServerPeers = new HashMap<String, String>(groomServerNames.length);
-
-    for (int i = 0; i < groomServerNames.length; i++) {
-      groomServerPeers.put(groomServerNames[i], groomServerAddresses[i]);
-    }
   }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Aug 29 09:12:51 2011
@@ -49,17 +49,24 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hama.checkpoint.Checkpointer;
-import org.apache.hama.checkpoint.CheckpointRunner;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.checkpoint.CheckpointRunner;
+import org.apache.hama.checkpoint.Checkpointer;
 import org.apache.hama.ipc.BSPPeerProtocol;
-import org.apache.hama.ipc.MasterProtocol;
 import org.apache.hama.ipc.GroomProtocol;
+import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.log4j.LogManager;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
 
 /**
  * A Groom Server (shortly referred to as groom) is a process that performs bsp
@@ -69,7 +76,8 @@ import org.apache.log4j.LogManager;
  * storages. Basically, a groom server and a data node should be run on one
  * physical node.
  */
-public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol {
+public class GroomServer implements Runnable, GroomProtocol, BSPPeerProtocol,
+    Watcher {
   public static final Log LOG = LogFactory.getLog(GroomServer.class);
   static final String SUBDIR = "groomServer";
 
@@ -82,6 +90,8 @@ public class GroomServer implements Runn
     NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
   };
 
+  private static ZooKeeper zk = null;
+
   // Running States and its related things
   volatile boolean initialized = false;
   volatile boolean running = true;
@@ -103,11 +113,12 @@ public class GroomServer implements Runn
 
   // Job
   private int failures;
-  private int maxCurrentTasks = 1;
+  private int maxCurrentTasks;
   Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
   /** Map from taskId -> TaskInProgress. */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
+  Map<TaskAttemptID, Integer> assignedPeerNames = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
   // new nexus between GroomServer and BSPMaster
@@ -122,8 +133,6 @@ public class GroomServer implements Runn
   InetSocketAddress taskReportAddress;
   Server taskReportServer = null;
 
-  private PeerNames allPeerNames = null;
-
   // private BlockingQueue<GroomServerAction> tasksToCleanup = new
   // LinkedBlockingQueue<GroomServerAction>();
 
@@ -135,15 +144,37 @@ public class GroomServer implements Runn
       GroomServerAction[] actions = ((DispatchTasksDirective) directive)
           .getActions();
 
-      allPeerNames = new PeerNames(((DispatchTasksDirective) directive)
-          .getGroomServerPeers().values());
-
       if (LOG.isDebugEnabled()) {
         LOG.debug("Got Response from BSPMaster with "
             + ((actions != null) ? actions.length : 0) + " actions");
       }
 
       if (actions != null) {
+        assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
+        int i = 0;
+
+        // add peers to BSPMaster.
+        // TODO find another way to manage all activate peers.
+        for (GroomServerAction action : actions) {
+          Task t = ((LaunchTaskAction) action).getTask();
+
+          int peerPort = (Constants.DEFAULT_PEER_PORT + i);
+
+          try {
+            zk.create("/" + t.getJobID().toString() + "/" + groomHostName + ":"
+                + peerPort, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+          } catch (KeeperException e) {
+            e.printStackTrace();
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+
+          assignedPeerNames.put(t.getTaskID(), peerPort);
+
+          i++;
+        }
+
         for (GroomServerAction action : actions) {
           if (action instanceof LaunchTaskAction) {
             startNewTask((LaunchTaskAction) action);
@@ -171,8 +202,7 @@ public class GroomServer implements Runn
 
   private class Instructor extends Thread {
     final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
-    final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = 
-      new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
+    final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();
 
     public void bind(Class<? extends Directive> instruction,
         DirectiveHandler handler) {
@@ -211,11 +241,18 @@ public class GroomServer implements Runn
     // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
 
     CheckpointRunner ckptRunner = null;
-    if(this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      ckptRunner = 
-        new CheckpointRunner(CheckpointRunner.buildCommands(this.conf));
+    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
+      ckptRunner = new CheckpointRunner(CheckpointRunner
+          .buildCommands(this.conf));
     }
     this.checkpointRunner = ckptRunner;
+
+    try {
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+          .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+    } catch (IOException e) {
+      LOG.error("Exception during reinitialization!", e);
+    }
   }
 
   public synchronized void initialize() throws IOException {
@@ -224,9 +261,8 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(
-          conf.get("bsp.dns.interface", "default"),
-          conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+          "default"), conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -239,6 +275,7 @@ public class GroomServer implements Runn
     this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
+    this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
 
     int rpcPort = -1;
     String rpcAddr = null;
@@ -293,8 +330,8 @@ public class GroomServer implements Runn
       throw new IllegalArgumentException("Error rpc address " + rpcAddr
           + " port" + rpcPort);
     if (!this.masterClient.register(new GroomServerStatus(groomServerName,
-        getBspPeerName(), cloneAndResetRunningTaskStatuses(), failures,
-        maxCurrentTasks, this.rpcServer))) {
+        cloneAndResetRunningTaskStatuses(), failures, maxCurrentTasks,
+        this.rpcServer, groomHostName))) {
       LOG.error("There is a problem in establishing communication"
           + " link with BSPMaster");
       throw new IOException("There is a problem in establishing"
@@ -305,9 +342,9 @@ public class GroomServer implements Runn
     this.instructor.bind(DispatchTasksDirective.class,
         new DispatchTasksHandler());
     instructor.start();
-    if(this.conf.getBoolean("bsp.checkpoint.enabled", false) && 
-       null != this.checkpointRunner && !this.checkpointRunner.isAlive()) {
-      this.checkpointRunner.start(); 
+    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)
+        && null != this.checkpointRunner && !this.checkpointRunner.isAlive()) {
+      this.checkpointRunner.start();
     }
     this.running = true;
     this.initialized = true;
@@ -320,7 +357,8 @@ public class GroomServer implements Runn
 
   @Override
   public void dispatch(Directive directive) throws IOException {
-    if (!instructor.isAlive()) throw new IOException();
+    if (!instructor.isAlive())
+      throw new IOException();
     instructor.put(directive);
   }
 
@@ -380,33 +418,6 @@ public class GroomServer implements Runn
   public State offerService() throws Exception {
     while (running && !shuttingDown) {
       try {
-
-        // Reports to a BSPMaster
-        for (Map.Entry<TaskAttemptID, TaskInProgress> e : runningTasks
-            .entrySet()) {
-          Thread.sleep(REPORT_INTERVAL);
-          TaskInProgress tip = e.getValue();
-          TaskStatus taskStatus = tip.getStatus();
-
-          if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
-            taskStatus.setProgress(taskStatus.getSuperstepCount());
-
-            if (!tip.runner.isAlive()) {
-              if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
-                taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-              }
-              taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
-            }
-          }
-
-          doReport(taskStatus);
-        }
-
-        Thread.sleep(REPORT_INTERVAL);
-      } catch (InterruptedException ie) {
-      }
-
-      try {
         if (justInited) {
           String dir = masterClient.getSystemDir();
           if (dir == null) {
@@ -430,6 +441,7 @@ public class GroomServer implements Runn
             + StringUtils.stringifyException(except);
         LOG.error(msg);
       }
+      Thread.sleep(REPORT_INTERVAL);
     }
     return State.NORMAL;
   }
@@ -464,15 +476,15 @@ public class GroomServer implements Runn
    */
   public void doReport(TaskStatus taskStatus) {
     GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
-        getBspPeerName(), updateTaskStatus(taskStatus), failures,
-        maxCurrentTasks, rpcServer);
+        updateTaskStatus(taskStatus), failures, maxCurrentTasks, rpcServer,
+        groomHostName);
     try {
       boolean ret = masterClient.report(new ReportGroomStatusDirective(
           groomStatus));
       if (!ret) {
         LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
-            + " groom name: " + groomStatus.getGroomName() + " peer name:"
-            + groomStatus.getPeerName() + " rpc server:" + rpcServer);
+            + " groom name: " + groomStatus.getGroomName() + " rpc server:"
+            + rpcServer);
       }
     } catch (IOException ioe) {
       LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
@@ -536,6 +548,10 @@ public class GroomServer implements Runn
           RunJar.unJar(new File(localJarFile.toString()), workDir);
         }
         rjob.localized = true;
+      } else {
+        HamaConfiguration conf = new HamaConfiguration();
+        conf.addResource(rjob.getJobFile());
+        jobConf = new BSPJob(conf, rjob.getJobId().toString());
       }
     }
 
@@ -658,13 +674,19 @@ public class GroomServer implements Runn
   }
 
   public synchronized void close() throws IOException {
+    try {
+      zk.close();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
     this.running = false;
     this.initialized = false;
     cleanupStorage();
     this.workerServer.stop();
     RPC.stopProxy(masterClient);
-    if(this.conf.getBoolean("bsp.checkpoint.enabled", false) && 
-       null != this.checkpointRunner && this.checkpointRunner.isAlive()) {
+    if (this.conf.getBoolean("bsp.checkpoint.enabled", false)
+        && null != this.checkpointRunner && this.checkpointRunner.isAlive()) {
       this.checkpointRunner.stop();
     }
     if (taskReportServer != null) {
@@ -726,9 +748,6 @@ public class GroomServer implements Runn
         localJobConf.setJar(localJarFile.toString());
       }
 
-      LOG.debug("localizeTask : " + localJobConf.getJar());
-      LOG.debug("localizeTask : " + localJobFile.toString());
-
       task.setConf(localJobConf);
     }
 
@@ -819,35 +838,28 @@ public class GroomServer implements Runn
   }
 
   /**
-   * @return bsp peer information in the form of "address:port".
-   */
-  public String getBspPeerName() {
-    // TODO Later, peers list should be returned.
-    return this.groomHostName + ":" + Constants.DEFAULT_PEER_PORT;
-  }
- 
-  /** 
    * Checkpointer child process.
    */
   public static final class CheckpointerChild {
 
     public static void main(String[] args) throws Throwable {
-      if(LOG.isDebugEnabled()) LOG.debug("Starting Checkpointer child process.");
+      if (LOG.isDebugEnabled())
+        LOG.debug("Starting Checkpointer child process.");
       HamaConfiguration defaultConf = new HamaConfiguration();
-      int ret = 0;
-      if(null != args && 1 == args.length) {
+      // int ret = 0;
+      if (null != args && 1 == args.length) {
         int port = Integer.parseInt(args[0]);
-        defaultConf.setInt("bsp.checkpoint.port", 
-          Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
-        if(LOG.isDebugEnabled()) 
-          LOG.debug("Supplied checkpointer port value:"+port);
+        defaultConf.setInt("bsp.checkpoint.port", Integer
+            .parseInt(CheckpointRunner.DEFAULT_PORT));
+        if (LOG.isDebugEnabled())
+          LOG.debug("Supplied checkpointer port value:" + port);
         Checkpointer ckpt = new Checkpointer(defaultConf);
         ckpt.start();
         ckpt.join();
         LOG.info("Checkpoint finishes its execution.");
-      }else {
+      } else {
         throw new IllegalArgumentException(
-        "Port value is not provided for checkpointing service.");
+            "Port value is not provided for checkpointing service.");
       }
     }
   }
@@ -858,7 +870,8 @@ public class GroomServer implements Runn
   public static class BSPPeerChild {
 
     public static void main(String[] args) throws Throwable {
-      if(LOG.isDebugEnabled()) LOG.debug("BSPPeerChild starting");
+      if (LOG.isDebugEnabled())
+        LOG.debug("BSPPeerChild starting");
 
       HamaConfiguration defaultConf = new HamaConfiguration();
       // report address
@@ -873,20 +886,20 @@ public class GroomServer implements Runn
           defaultConf);
 
       Task task = umbilical.getTask(taskid);
+      int peerPort = umbilical.getAssignedPortNum(taskid);
 
       defaultConf.addResource(new Path(task.getJobFile()));
       BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
 
       defaultConf.set(Constants.PEER_HOST, args[3]);
-      if(null != args && 5 == args.length ) {
+      if (null != args && 5 == args.length) {
         defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
-      } 
-      defaultConf.setInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+      }
+      defaultConf.setInt(Constants.PEER_PORT, peerPort);
 
       BSPPeer bspPeer = new BSPPeer(defaultConf, taskid, umbilical);
       bspPeer.reinitialize();
       bspPeer.setJobConf(job);
-      bspPeer.setAllPeerNames(umbilical.getAllPeerNames().getAllPeerNames());
 
       TaskStatus taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(),
           0, TaskStatus.State.RUNNING, "running", host,
@@ -945,7 +958,20 @@ public class GroomServer implements Runn
   @Override
   public void done(TaskAttemptID taskid, boolean shouldBePromoted)
       throws IOException {
-    // TODO Auto-generated method stub
+    TaskInProgress tip = runningTasks.get(taskid);
+    TaskStatus taskStatus = tip.getStatus();
+
+    if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+      taskStatus.setProgress(taskStatus.getSuperstepCount());
+
+      if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
+        taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+        taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
+      }
+    }
+
+    // TODO reduce the reporting times.
+    doReport(taskStatus);
   }
 
   @Override
@@ -954,7 +980,14 @@ public class GroomServer implements Runn
   }
 
   @Override
-  public PeerNames getAllPeerNames() {
-    return allPeerNames;
+  public int getAssignedPortNum(TaskAttemptID taskid) {
+    return assignedPeerNames.get(taskid);
   }
+
+  @Override
+  public void process(WatchedEvent event) {
+    // TODO Auto-generated method stub
+
+  }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerManager.java Mon Aug 29 09:12:51 2011
@@ -18,7 +18,6 @@
 package org.apache.hama.bsp;
 
 import java.util.Collection;
-import java.util.Map;
 
 import org.apache.hama.ipc.GroomProtocol;
 
@@ -70,10 +69,4 @@ interface GroomServerManager {
    * @param the JobInProgressListener to be removed.
    */
   void removeJobInProgressListener(JobInProgressListener listener);
-
-  /**
-   * Current GroomServer Peers.
-   * @return GroomName and PeerName(host:port) in pair. 
-   */
-  Map<String, String> currentGroomServerPeers();
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServerStatus.java Mon Aug 29 09:12:51 2011
@@ -50,8 +50,8 @@ public class GroomServerStatus implement
   }
 
   String groomName;
-  String peerName;
   String rpcServer;
+  String hostName;
   int failures;
   List<TaskStatus> taskReports;
 
@@ -63,34 +63,29 @@ public class GroomServerStatus implement
     taskReports = new CopyOnWriteArrayList<TaskStatus>();
   }
 
-  public GroomServerStatus(String groomName, String peerName,
+  public GroomServerStatus(String groomName, 
       List<TaskStatus> taskReports, int failures, int maxTasks) {
-    this(groomName, peerName, taskReports, failures, maxTasks, "");
+    this(groomName, taskReports, failures, maxTasks, "", "");
   }
 
-  public GroomServerStatus(String groomName, String peerName,
-      List<TaskStatus> taskReports, int failures, int maxTasks, String rpc) {
+  public GroomServerStatus(String groomName, 
+      List<TaskStatus> taskReports, int failures, int maxTasks, String rpc, String hostName) {
     this.groomName = groomName;
-    this.peerName = peerName;
     this.taskReports = new ArrayList<TaskStatus>(taskReports);
     this.failures = failures;
     this.maxTasks = maxTasks;
     this.rpcServer = rpc;
+    this.hostName = hostName;
   }
 
   public String getGroomName() {
     return groomName;
   }
 
-  /**
-   * The host (and port) from where the groom server can be reached.
-   * 
-   * @return The groom server address in the form of "hostname:port"
-   */
-  public String getPeerName() {
-    return peerName;
+  public String getGroomHostName() {
+    return hostName;
   }
-
+  
   public String getRpcServer() {
     return rpcServer;
   }
@@ -147,8 +142,8 @@ public class GroomServerStatus implement
   public int hashCode() {
     int result = 17;
     result = 37 * result + groomName.hashCode();
-    result = 37 * result + peerName.hashCode();
     result = 37 * result + rpcServer.hashCode();
+    result = 37 * result + hostName.hashCode();
     /*
      * result = 37*result + (int)failures; result = 37*result +
      * taskReports.hashCode(); result = 37*result +
@@ -169,8 +164,6 @@ public class GroomServerStatus implement
     GroomServerStatus s = (GroomServerStatus) o;
     if (!s.groomName.equals(groomName))
       return false;
-    if (!s.peerName.equals(peerName))
-      return false;
     if (!s.rpcServer.equals(rpcServer))
       return false;
     /*
@@ -189,8 +182,9 @@ public class GroomServerStatus implement
   @Override
   public void readFields(DataInput in) throws IOException {
     this.groomName = Text.readString(in);
-    this.peerName = Text.readString(in);
     this.rpcServer = Text.readString(in);
+    this.hostName = Text.readString(in);
+    
     this.failures = in.readInt();
     this.maxTasks = in.readInt();
     taskReports.clear();
@@ -211,8 +205,9 @@ public class GroomServerStatus implement
   @Override
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, groomName);
-    Text.writeString(out, peerName);
     Text.writeString(out, rpcServer);
+    Text.writeString(out, hostName);
+    
     out.writeInt(failures);
     out.writeInt(maxTasks);
     out.writeInt(taskReports.size());

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon Aug 29 09:12:51 2011
@@ -80,6 +80,7 @@ class JobInProgress {
     this.localFs = FileSystem.getLocal(conf);
     this.jobFile = jobFile;
     this.master = master;
+    
     this.status = new JobStatus(jobId, null, 0L, 0L,
         JobStatus.State.PREP.value());
     this.startTime = System.currentTimeMillis();
@@ -191,6 +192,10 @@ class JobInProgress {
     this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
         0L, 0L, JobStatus.RUNNING);
 
+    // delete all nodes before start 
+    master.clearZKNodes();
+    master.createJobRoot(this.getJobID().toString());
+    
     tasksInited = true;
     LOG.debug("Job is initialized.");
   }
@@ -245,7 +250,10 @@ class JobInProgress {
       this.status.setFinishTime(this.finishTime);
 
       LOG.debug("Job successfully done.");
-
+      
+      // delete job root
+      master.deleteJobRoot(this.getJobID().toString());
+      
       garbageCollect();
     }
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/PeerNames.java Mon Aug 29 09:12:51 2011
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.bsp;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * 
- */
-public class PeerNames implements Writable {
-  Collection<String> allPeers;
-  
-  public PeerNames() {
-    this.allPeers = new ArrayList<String>();
-  }
-  
-  public PeerNames(Collection<String> allPeers) {
-    this.allPeers = allPeers;
-  }
-  
-  public Collection<String> getAllPeerNames() {
-    return allPeers;
-  }
-  
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(allPeers.size());
-    for (String peerName : allPeers) {
-      Text.writeString(out, peerName);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int peersNum = in.readInt();
-    for (int i = 0; i < peersNum; i++) {
-      allPeers.add(Text.readString(in));
-    }
-  }
-
-}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Aug 29 09:12:51 2011
@@ -20,7 +20,9 @@ package org.apache.hama.bsp;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -129,7 +131,16 @@ class SimpleTaskScheduler extends TaskSc
 
     public void run() {
       // obtain tasks
-      Task t = jip.obtainNewTask(this.stus, groomNum);
+      List<GroomServerAction> actions = new ArrayList<GroomServerAction>();
+      Task t = null;
+      int cnt = 0;
+      while((t = jip.obtainNewTask(this.stus, groomNum) ) != null) {
+        actions.add(new LaunchTaskAction(t));
+        cnt++;
+
+        if(cnt > (this.stus.getMaxTasks() - 1))
+          break;
+      }
       
       // assembly into actions
       // List<Task> tasks = new ArrayList<Task>();
@@ -137,9 +148,7 @@ class SimpleTaskScheduler extends TaskSc
         GroomProtocol worker = groomServerManager.findGroomServer(this.stus);
         try {
           // dispatch() to the groom server
-          Directive d1 = new DispatchTasksDirective(groomServerManager
-              .currentGroomServerPeers(), new GroomServerAction[] { 
-              new LaunchTaskAction(t)});
+          Directive d1 = new DispatchTasksDirective(actions.toArray(new GroomServerAction[0]));
           worker.dispatch(d1);
         } catch (IOException ioe) {
           LOG.error("Fail to dispatch tasks to GroomServer "

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Mon Aug 29 09:12:51 2011
@@ -17,23 +17,22 @@
  */
 package org.apache.hama.bsp;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import static java.util.concurrent.TimeUnit.*;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Mon Aug 29 09:12:51 2011
@@ -21,7 +21,6 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.hama.Constants;
-import org.apache.hama.bsp.PeerNames;
 import org.apache.hama.bsp.Task;
 import org.apache.hama.bsp.TaskAttemptID;
 
@@ -56,8 +55,9 @@ public interface BSPPeerProtocol extends
   void incrementSuperstepCount(TaskAttemptID taskid) throws IOException;
 
   /**
-   * @return the all BSPPeer names.
+   * @param taskid
+   * @return assigned port number
    */
-  PeerNames getAllPeerNames();
+  int getAssignedPortNum(TaskAttemptID taskid);
 
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java Mon Aug 29 09:12:51 2011
@@ -19,8 +19,8 @@ package org.apache.hama.ipc;
 
 import java.io.IOException;
 
-import org.apache.hama.bsp.GroomServerStatus;
 import org.apache.hama.bsp.Directive;
+import org.apache.hama.bsp.GroomServerStatus;
 
 /**
  * A new protocol for GroomServers communicate with BSPMaster. This
@@ -44,6 +44,6 @@ public interface MasterProtocol extends 
    */
   boolean report(Directive directive) throws IOException;
 
-  public String getSystemDir();  
+  public String getSystemDir();
 
 }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/HamaClusterTestCase.java Mon Aug 29 09:12:51 2011
@@ -31,7 +31,7 @@ public abstract class HamaClusterTestCas
   protected MiniBSPCluster bspCluster;
   protected MiniZooKeeperCluster zooKeeperCluster;
   protected boolean startDfs;
-  protected int numOfGroom = 2;
+  protected int numOfGroom = 1;
 
   /** default constructor */
   public HamaClusterTestCase() {

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1162689&r1=1162688&r2=1162689&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Mon Aug 29 09:12:51 2011
@@ -61,10 +61,8 @@ public class TestBSPMasterGroomServer ex
     BSPJobClient jobClient = new BSPJobClient(configuration);
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
     ClusterStatus cluster = jobClient.getClusterStatus(false);
-    assertEquals(this.numOfGroom, cluster.getMaxTasks());
-    
-    // TODO test the multi-tasks 
-    bsp.setNumBspTask(1);
+    assertEquals(this.numOfGroom, cluster.getGroomServers());
+    bsp.setNumBspTask(2);
     
     FileSystem fileSys = FileSystem.get(conf);
 
@@ -76,12 +74,14 @@ public class TestBSPMasterGroomServer ex
 
   private static void checkOutput(FileSystem fileSys, ClusterStatus cluster,
       HamaConfiguration conf) throws Exception {
-    for (int i = 0; i < 1; i++) { // TODO test the multi-tasks
+    for (int i = 0; i < 2; i++) {
       SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, new Path(
           TMP_OUTPUT + i), conf);
       LongWritable timestamp = new LongWritable();
       Text message = new Text();
       reader.next(timestamp, message);
+      
+      LOG.info("output: " + message);
       assertTrue("Check if `Hello BSP' gets printed.", message.toString()
           .indexOf("Hello BSP from") >= 0);
       reader.close();



Mime
View raw message