incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1173973 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/ipc/
Date Thu, 22 Sep 2011 07:21:57 GMT
Author: edwardyoon
Date: Thu Sep 22 07:21:57 2011
New Revision: 1173973

URL: http://svn.apache.org/viewvc?rev=1173973&view=rev
Log:
Add statusUpdate() method to BSPPeerProtocol.

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1173973&r1=1173972&r2=1173973&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Sep 22 07:21:57 2011
@@ -9,6 +9,7 @@ Release 0.4 - Unreleased
 
   BUG FIXES
 
+    HAMA-432: Add statusUpdate() method to BSPPeerProtocol (edwardyoon)
     HAMA-437: PiEstimator is not working in Local Mode (Thomas Jungblut)
     HAMA-387: Fixed barrier synchronization problem (ChiaHung Lin via edwardyoon)
     HAMA-436: Web Interface does not update Superstep Count (Thomas Jungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1173973&r1=1173972&r2=1173973&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Sep 22 07:21:57
2011
@@ -60,7 +60,7 @@ import org.apache.zookeeper.data.Stat;
  * This class represents a BSP peer.
  */
 public class BSPPeer implements Watcher, BSPPeerInterface {
-  
+
   public static final Log LOG = LogFactory.getLog(BSPPeer.class);
 
   private final Configuration conf;
@@ -314,7 +314,12 @@ public class BSPPeer implements Watcher,
 
       BSPPeerInterface peer = peers.get(entry.getKey());
       if (peer == null) {
-        peer = getBSPPeerConnection(entry.getKey());
+        try {
+          peer = getBSPPeerConnection(entry.getKey());
+        } catch (NullPointerException ne) {
+          umbilical.fatalError(taskid, entry.getKey().getHostName()
+              + " doesn't exists.");
+        }
       }
       Iterable<BSPMessage> messages = entry.getValue();
       BSPMessageBundle bundle = new BSPMessageBundle();
@@ -333,7 +338,7 @@ public class BSPPeer implements Watcher,
 
     leaveBarrier();
     currentTaskStatus.incrementSuperstepCount();
-    umbilical.incrementSuperstepCount(taskid);
+    umbilical.statusUpdate(taskid, currentTaskStatus);
 
     // Clear outgoing queues.
     clearOutgoingQueues();
@@ -348,122 +353,129 @@ public class BSPPeer implements Watcher,
     localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
   }
 
-  private void createZnode(final String path) throws KeeperException, 
+  private void createZnode(final String path) throws KeeperException,
       InterruptedException {
     createZnode(path, CreateMode.PERSISTENT);
   }
 
-  private void createEphemeralZnode(final String path) throws KeeperException, 
+  private void createEphemeralZnode(final String path) throws KeeperException,
       InterruptedException {
     createZnode(path, CreateMode.EPHEMERAL);
   }
 
-  private void createZnode(final String path, final CreateMode mode) throws KeeperException,

-      InterruptedException {
+  private void createZnode(final String path, final CreateMode mode)
+      throws KeeperException, InterruptedException {
     Stat s = zk.exists(path, false);
-    if(null == s) {
+    if (null == s) {
       try {
         zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
-      } catch(KeeperException.NodeExistsException nee) {
-        LOG.warn("Ignore because znode may be already created at "+path, nee);
+      } catch (KeeperException.NodeExistsException nee) {
+        LOG.warn("Ignore because znode may be already created at " + path, nee);
       }
     }
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + 
-      this.getSuperstepCount());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
+          + this.getSuperstepCount());
     }
 
-    createZnode(bspRoot); 
+    createZnode(bspRoot);
 
-    final String pathToJobIdZnode = 
-      bspRoot + "/" + taskid.getJobID().toString();
+    final String pathToJobIdZnode = bspRoot + "/"
+        + taskid.getJobID().toString();
     createZnode(pathToJobIdZnode);
 
-    final String pathToSuperstepZnode = 
-      pathToJobIdZnode + "/" + getSuperstepCount();
-    createZnode(pathToSuperstepZnode); 
-    
-    zk.exists(pathToSuperstepZnode+"/ready", new Watcher() {
+    final String pathToSuperstepZnode = pathToJobIdZnode + "/"
+        + getSuperstepCount();
+    createZnode(pathToSuperstepZnode);
+
+    zk.exists(pathToSuperstepZnode + "/ready", new Watcher() {
       @Override
       public void process(WatchedEvent event) {
-        synchronized(mutex) {
+        synchronized (mutex) {
           try {
-            Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
-            if(null != s) {
-              zk.delete(pathToSuperstepZnode+"/ready", 0);
+            Stat s = zk.exists(pathToSuperstepZnode + "/ready", false);
+            if (null != s) {
+              zk.delete(pathToSuperstepZnode + "/ready", 0);
             }
-          } catch(KeeperException.NoNodeException nne) {
+          } catch (KeeperException.NoNodeException nne) {
             LOG.warn("Ignore because znode may be deleted.", nne);
-          } catch(Exception e) {
+          } catch (Exception e) {
             throw new RuntimeException(e);
           }
           mutex.notifyAll();
         }
       }
-    }); 
+    });
     zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 
-    synchronized(mutex) {
+    synchronized (mutex) {
       List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
-      if(LOG.isDebugEnabled()) 
-        LOG.debug("enterBarrier() znode size within "+pathToSuperstepZnode+" is "+
-        znodes.size()+". Znodes include " +znodes);
+      if (LOG.isDebugEnabled())
+        LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
+            + " is " + znodes.size() + ". Znodes include " + znodes);
       if (znodes.size() < jobConf.getNumBspTask()) {
         mutex.wait();
       } else {
-        createEphemeralZnode(pathToSuperstepZnode+"/ready");
+        createEphemeralZnode(pathToSuperstepZnode + "/ready");
       }
     }
     return true;
   }
 
   protected boolean leaveBarrier() throws KeeperException, InterruptedException {
-    final String pathToSuperstepZnode = 
-      bspRoot + "/" + taskid.getJobID().toString() + "/" + getSuperstepCount();
-    while(true) {
+    final String pathToSuperstepZnode = bspRoot + "/"
+        + taskid.getJobID().toString() + "/" + getSuperstepCount();
+    while (true) {
       synchronized (mutex) {
-        final List<String> znodes = zk.getChildren(pathToSuperstepZnode, false); 
+        final List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
         final int size = znodes.size();
-        if(null == znodes || znodes.isEmpty()) return true;
-        if(1 == size) {
-          zk.delete(getNodeName(), 0); 
+        if (null == znodes || znodes.isEmpty())
+          return true;
+        if (1 == size) {
+          zk.delete(getNodeName(), 0);
           return true;
         }
         Collections.sort(znodes);
         final String lowest = znodes.get(0);
-        final String highest = znodes.get(size-1);
-        if (getNodeName().equals(pathToSuperstepZnode+"/"+lowest)) { 
-          Stat s = zk.exists(pathToSuperstepZnode+"/"+highest, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-              synchronized(mutex) {
-                mutex.notifyAll();
-              }
-            }
-          });
-          if(null != s) mutex.wait();
-        }else{
+        final String highest = znodes.get(size - 1);
+        if (getNodeName().equals(pathToSuperstepZnode + "/" + lowest)) {
+          Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
+              new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                  synchronized (mutex) {
+                    mutex.notifyAll();
+                  }
+                }
+              });
+          if (null != s)
+            mutex.wait();
+        } else {
           Stat s1 = zk.exists(getNodeName(), false);
-          if(null != s1) zk.delete(getNodeName(), 0);
-          Stat s2 = zk.exists(pathToSuperstepZnode+"/"+lowest, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-              synchronized(mutex) {
-                mutex.notifyAll();
-              }
-            }
-          });
-          if(null != s2) mutex.wait();
+          if (null != s1)
+            zk.delete(getNodeName(), 0);
+          Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
+              new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                  synchronized (mutex) {
+                    mutex.notifyAll();
+                  }
+                }
+              });
+          if (null != s2)
+            mutex.wait();
         }
       }
     }
   }
 
   private String getNodeName() {
-    return bspRoot + "/" + taskid.getJobID().toString() + "/" + getSuperstepCount() + "/"
+ taskid.toString() ;
+    return bspRoot + "/" + taskid.getJobID().toString() + "/"
+        + getSuperstepCount() + "/" + taskid.toString();
   }
 
   @Override
@@ -509,7 +521,8 @@ public class BSPPeer implements Watcher,
     return BSPPeerInterface.versionID;
   }
 
-  protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress addr) {
+  protected BSPPeerInterface getBSPPeerConnection(InetSocketAddress addr)
+      throws NullPointerException {
     BSPPeerInterface peer;
     synchronized (this.peers) {
       peer = peers.get(addr);
@@ -519,7 +532,7 @@ public class BSPPeer implements Watcher,
           peer = (BSPPeerInterface) RPC.getProxy(BSPPeerInterface.class,
               BSPPeerInterface.versionID, addr, this.conf);
         } catch (IOException e) {
-
+          LOG.error(e);
         }
         this.peers.put(addr, peer);
       }
@@ -537,8 +550,10 @@ public class BSPPeer implements Watcher,
 
   private InetSocketAddress getAddress(String peerName) {
     String[] peerAddrParts = peerName.split(":");
-    if(peerAddrParts.length != 2){
-      throw new ArrayIndexOutOfBoundsException("Peername must consist of exactly ONE \":\"!
Given peername was: " + peerName);
+    if (peerAddrParts.length != 2) {
+      throw new ArrayIndexOutOfBoundsException(
+          "Peername must consist of exactly ONE \":\"! Given peername was: "
+              + peerName);
     }
     return new InetSocketAddress(peerAddrParts[0], Integer
         .parseInt(peerAddrParts[1]));
@@ -548,13 +563,14 @@ public class BSPPeer implements Watcher,
   public String[] getAllPeerNames() {
     String[] result = null;
     try {
-      result = zk.getChildren("/" + jobConf.getJobID().toString(), this).toArray(new String[0]);
+      result = zk.getChildren("/" + jobConf.getJobID().toString(), this)
+          .toArray(new String[0]);
     } catch (KeeperException e) {
       e.printStackTrace();
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
-    
+
     return result;
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1173973&r1=1173972&r2=1173973&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Sep 22
07:21:57 2011
@@ -249,14 +249,14 @@ public class GroomServer implements Runn
 
     CheckpointRunner ckptRunner = null;
     if (this.conf.getBoolean("bsp.checkpoint.enabled", false)) {
-      ckptRunner = new CheckpointRunner(
-          CheckpointRunner.buildCommands(this.conf));
+      ckptRunner = new CheckpointRunner(CheckpointRunner
+          .buildCommands(this.conf));
     }
     this.checkpointRunner = ckptRunner;
 
     try {
-      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
-          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+          .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
     } catch (IOException e) {
       LOG.error("Exception during reinitialization!", e);
     }
@@ -268,9 +268,8 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(
-          conf.get("bsp.dns.interface", "default"),
-          conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+          "default"), conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(conf.getStrings("bsp.local.dir"));
@@ -840,6 +839,21 @@ public class GroomServer implements Runn
     public int hashCode() {
       return task.getTaskID().hashCode();
     }
+
+    public void reportProgress(TaskStatus taskStatus2) {
+      LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + "% "
+          + taskStatus.getStateString());
+
+      if (this.done) {
+        LOG.info(task.getTaskID()
+            + " Ignoring status-update since "
+            + ((this.done) ? "task is 'done'" : ("runState: " + this.taskStatus
+                .getRunState())));
+        return;
+      }
+
+      this.taskStatus.statusUpdate(taskStatus);
+    }
   }
 
   public boolean isRunning() {
@@ -882,8 +896,8 @@ public class GroomServer implements Runn
       // int ret = 0;
       if (null != args && 1 == args.length) {
         int port = Integer.parseInt(args[0]);
-        defaultConf.setInt("bsp.checkpoint.port",
-            Integer.parseInt(CheckpointRunner.DEFAULT_PORT));
+        defaultConf.setInt("bsp.checkpoint.port", Integer
+            .parseInt(CheckpointRunner.DEFAULT_PORT));
         if (LOG.isDebugEnabled())
           LOG.debug("Supplied checkpointer port value:" + port);
         Checkpointer ckpt = new Checkpointer(defaultConf);
@@ -977,17 +991,35 @@ public class GroomServer implements Runn
     }
   }
 
-  public void incrementSuperstepCount(TaskAttemptID taskid) throws IOException {
-    TaskInProgress tip = tasks.get(taskid);
-    tip.getStatus().incrementSuperstepCount();
-  }
-
   @Override
   public boolean ping(TaskAttemptID taskid) throws IOException {
     // TODO Auto-generated method stub
     return false;
   }
 
+  /**
+   * A child task had a fatal error. Kill the task.
+   */
+  @Override
+  public void fatalError(TaskAttemptID taskId, String message)
+      throws IOException {
+    LOG.fatal("Task: " + taskId + " - Killed : " + message);
+    // TODO kill the task.
+  }
+
+  @Override
+  public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    TaskInProgress tip = tasks.get(taskId);
+    if (tip != null) {
+      tip.reportProgress(taskStatus);
+      return true;
+    } else {
+      LOG.warn("Progress from unknown child task: " + taskId);
+      return false;
+    }
+  }
+
   @Override
   public void done(TaskAttemptID taskid, boolean shouldBePromoted)
       throws IOException {

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1173973&r1=1173972&r2=1173973&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java Thu Sep 22
07:21:57 2011
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.WritableUtil
  * Describes the current status of a task. This is not intended to be a
  * comprehensive piece of data.
  */
-class TaskStatus implements Writable, Cloneable {
+public class TaskStatus implements Writable, Cloneable {
   static final Log LOG = LogFactory.getLog(TaskStatus.class);
 
   // enumeration for reporting current phase of a task.
@@ -94,6 +94,10 @@ class TaskStatus implements Writable, Cl
     return progress;
   }
 
+  public void setSuperstepCount(long superstepCount) {
+    this.superstepCount = superstepCount;  
+  }
+  
   public void setProgress(float progress) {
     this.progress = progress;
   }
@@ -186,6 +190,7 @@ class TaskStatus implements Writable, Cl
    * @param status updated status
    */
   synchronized void statusUpdate(TaskStatus status) {
+    this.superstepCount = status.getSuperstepCount();
     this.progress = status.getProgress();
     this.runState = status.getRunState();
     this.stateString = status.getStateString();
@@ -206,14 +211,16 @@ class TaskStatus implements Writable, Cl
    * This update is done in BSPMaster when a cleanup attempt of task reports its
    * status. Then update only specific fields, not all.
    * 
+   * @param superstepCount
    * @param runState
    * @param progress
    * @param state
    * @param phase
    * @param finishTime
    */
-  synchronized void statusUpdate(State runState, float progress, String state,
+  synchronized void statusUpdate(long superstepCount, State runState, float progress, String
state,
       Phase phase, long finishTime) {
+    setSuperstepCount(superstepCount);
     setRunState(runState);
     setProgress(progress);
     setStateString(state);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1173973&r1=1173972&r2=1173973&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Thu Sep
22 07:21:57 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.Task;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskStatus;
 
 /**
  * Protocol that task child process uses to contact its parent process.
@@ -52,7 +53,20 @@ public interface BSPPeerProtocol extends
   /** Report that the task encounted a local filesystem error. */
   void fsError(TaskAttemptID taskId, String message) throws IOException;
 
-  void incrementSuperstepCount(TaskAttemptID taskid) throws IOException;
+  /** Report that the task encounted a fatal error. */
+  void fatalError(TaskAttemptID taskId, String message) throws IOException;
+  
+  /**
+   * Report child's progress to parent.
+   * 
+   * @param taskId task-id of the child
+   * @param taskStatus status of the child
+   * @throws IOException
+   * @throws InterruptedException
+   * @return True if the task is known
+   */
+  boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+      throws IOException, InterruptedException;
 
   /**
    * @param taskid



Mime
View raw message