hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1411991 - /hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Date Wed, 21 Nov 2012 06:04:27 GMT
Author: edwardyoon
Date: Wed Nov 21 06:04:26 2012
New Revision: 1411991

URL: http://svn.apache.org/viewvc?rev=1411991&view=rev
Log:
Monitoring of tasks is too sensitive.

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411991&r1=1411990&r2=1411991&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Wed Nov 21 06:04:26
2012
@@ -207,10 +207,9 @@ public class GroomServer implements Runn
             try {
               startRecoveryTask(recoverAction);
             } catch (IOException e) {
-              throw new DirectiveException(
-                  new StringBuffer().append("Error starting the recovery task")
-                  .append(t.getTaskID()).toString(),
-                  e);
+              throw new DirectiveException(new StringBuffer()
+                  .append("Error starting the recovery task")
+                  .append(t.getTaskID()).toString(), e);
             }
           }
         }
@@ -617,17 +616,17 @@ public class GroomServer implements Runn
       }
 
       Iterator<TaskAttemptID> taskIterator = tasks.keySet().iterator();
-      while(taskIterator.hasNext()){
+      while (taskIterator.hasNext()) {
         TaskAttemptID taskAttId = taskIterator.next();
-        if(taskAttId.getTaskID().equals(t.getTaskID().getTaskID())){
-          if(LOG.isDebugEnabled()){
+        if (taskAttId.getTaskID().equals(t.getTaskID().getTaskID())) {
+          if (LOG.isDebugEnabled()) {
             LOG.debug("Removing tasks with id = " + t.getTaskID().getTaskID());
           }
           taskIterator.remove();
           runningTasks.remove(taskAttId);
         }
       }
-      
+
       tasks.put(t.getTaskID(), tip);
       runningTasks.put(t.getTaskID(), tip);
     }
@@ -637,14 +636,14 @@ public class GroomServer implements Runn
       String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
           .stringifyException(e));
       LOG.warn(msg);
-      
+
       try {
         tip.killAndCleanup(true);
       } catch (IOException ie2) {
         LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n"
             + StringUtils.stringifyException(ie2));
       }
-      throw new IOException("Errro localizing the job.",e);
+      throw new IOException("Errro localizing the job.", e);
     }
   }
 
@@ -807,20 +806,17 @@ public class GroomServer implements Runn
             + " monitorPeriod = "
             + monitorPeriod
             + " check = "
-            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && 
-                (((tip.lastPingedTimestamp == 0 && 
-                ((currentTime - tip.startTime) > 10 * monitorPeriod)) || 
-                ((tip.lastPingedTimestamp > 0) && 
-                    (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp
== 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp
> 0) && (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod)))));
 
       // Task is out of contact if it has not pinged since more than
       // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
       // to get started.
+      
+      // TODO Please refactor this conditions
+      // NOTE: (currentTime - tip.lastPingedTimestamp) > 6 * monitorPeriod 
+ 
       if (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING)
-          && (((tip.lastPingedTimestamp == 0 
-          && ((currentTime - tip.startTime) > 10 * monitorPeriod)) 
-            || ((tip.lastPingedTimestamp > 0) 
-                && (currentTime - tip.lastPingedTimestamp) > monitorPeriod))))
{
+          && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime)
> 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp)
> 6 * monitorPeriod)))) {
 
         LOG.info("adding purge task: " + tip.getTask().getTaskID());
 
@@ -1048,7 +1044,7 @@ public class GroomServer implements Runn
 
       // runner could be null if task-cleanup attempt is not localized yet
       if (runner != null) {
-        if(LOG.isDebugEnabled()){
+        if (LOG.isDebugEnabled()) {
           LOG.debug("Killing process for " + this.task.getTaskID());
         }
         runner.killBsp();
@@ -1058,7 +1054,7 @@ public class GroomServer implements Runn
 
     public synchronized void killRunner() throws IOException {
       if (runner != null) {
-        if(LOG.isDebugEnabled()){
+        if (LOG.isDebugEnabled()) {
           LOG.debug("Killing process for " + this.task.getTaskID());
         }
         runner.killBsp();
@@ -1251,12 +1247,11 @@ public class GroomServer implements Runn
         defaultConf.setInt("bsp.checkpoint.port", Integer.parseInt(args[4]));
       }
       defaultConf.setInt(Constants.PEER_PORT, peerPort);
-      
+
       long superstep = Long.parseLong(args[4]);
       TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
       LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
 
-
       try {
         // use job-specified working directory
         FileSystem.get(job.getConfiguration()).setWorkingDirectory(



Mime
View raw message