hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1565212 - in /hama/trunk: core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/ipc/ core/src/test/java/org/apache/hama/bsp/ yarn/src/main/java/org/apache/hama/bsp/
Date Thu, 06 Feb 2014 12:49:11 GMT
Author: edwardyoon
Date: Thu Feb  6 12:49:10 2014
New Revision: 1565212

URL: http://svn.apache.org/r1565212
Log:
Handling BindException.

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
    hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb  6 12:49:10
2014
@@ -164,6 +164,7 @@ public final class BSPPeerImpl<K1, V1, K
         Constants.DEFAULT_PEER_HOST);
     int bindPort = conf
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+    
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
 
     // This function call may change the current peer address

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Feb  6 12:49:10
2014
@@ -125,6 +125,7 @@ public class GroomServer implements Runn
   /** Map from taskId -> TaskInProgress. */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
   Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
+  Map<TaskAttemptID, Integer> assignedPeerNames = null;
   Map<BSPJobID, RunningJob> runningJobs = null;
 
   // new nexus between GroomServer and BSPMaster
@@ -157,9 +158,17 @@ public class GroomServer implements Runn
       }
 
       if (actions != null) {
+        int prevPort = Constants.DEFAULT_PEER_PORT;
 
         for (GroomServerAction action : actions) {
           if (action instanceof LaunchTaskAction) {
+            Task t = ((LaunchTaskAction) action).getTask();
+
+            synchronized (assignedPeerNames) {
+              prevPort = BSPNetUtils.getNextAvailable(prevPort);
+              assignedPeerNames.put(t.getTaskID(), prevPort);
+            }
+
             LOG.info("Launch " + actions.length + " tasks.");
             startNewTask((LaunchTaskAction) action);
           } else if (action instanceof KillTaskAction) {
@@ -172,6 +181,7 @@ public class GroomServer implements Runn
                 tip.killAndCleanup(false);
                 tasks.remove(killAction.getTaskID());
                 runningTasks.remove(killAction.getTaskID());
+                assignedPeerNames.remove(killAction.getTaskID());
               } catch (IOException ioe) {
                 throw new DirectiveException("Error when killing a "
                     + "TaskInProgress.", ioe);
@@ -181,6 +191,12 @@ public class GroomServer implements Runn
             LOG.info("Recovery action task.");
             RecoverTaskAction recoverAction = (RecoverTaskAction) action;
             Task t = recoverAction.getTask();
+
+            synchronized (assignedPeerNames) {
+              prevPort = BSPNetUtils.getNextAvailable(prevPort);
+              assignedPeerNames.put(t.getTaskID(), prevPort);
+            }
+
             LOG.info("Recovery action task." + t.getTaskID());
             try {
               startRecoveryTask(recoverAction);
@@ -312,6 +328,8 @@ public class GroomServer implements Runn
     this.conf.set(Constants.PEER_HOST, localHostname);
     this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
     this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
+    this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
+        2 * this.maxCurrentTasks);
 
     int rpcPort = -1;
     String rpcAddr = null;
@@ -326,8 +344,6 @@ public class GroomServer implements Runn
       this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
       this.workerServer.start();
       this.rpcServer = rpcAddr + ":" + rpcPort;
-
-      LOG.info("Worker rpc server --> " + rpcServer);
     }
 
     server = new HttpServer("groomserver", rpcAddr, conf.getInt(
@@ -362,7 +378,7 @@ public class GroomServer implements Runn
     this.taskReportAddress = taskReportServer.getListenerAddress();
     this.conf.set("bsp.groom.report.address", taskReportAddress.getHostName()
         + ":" + taskReportAddress.getPort());
-    LOG.info("GroomServer up at: " + this.taskReportAddress);
+    LOG.info("TaskReportServer up at: " + this.taskReportAddress);
 
     this.groomHostName = rpcAddr;
     this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_');
@@ -1198,8 +1214,7 @@ public class GroomServer implements Runn
           defaultConf);
 
       final BSPTask task = (BSPTask) umbilical.getTask(taskid);
-      int peerPort = Constants.DEFAULT_PEER_PORT;
-      peerPort = BSPNetUtils.getNextAvailable(peerPort);
+      int peerPort = umbilical.getAssignedPortNum(taskid);
 
       defaultConf.addResource(new Path(task.getJobFile()));
       BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
@@ -1212,7 +1227,7 @@ public class GroomServer implements Runn
 
       long superstep = Long.parseLong(args[4]);
       TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
-      LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
+      LOG.debug("Starting peer for step " + superstep + " state = " + state);
 
       try {
         // use job-specified working directory
@@ -1334,6 +1349,11 @@ public class GroomServer implements Runn
   }
 
   @Override
+  public int getAssignedPortNum(TaskAttemptID taskid) {
+    return assignedPeerNames.get(taskid);
+  }
+
+  @Override
   public void process(WatchedEvent event) {
     // do nothing
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Feb  6 12:49:10
2014
@@ -408,6 +408,12 @@ public class LocalBSPRunner implements J
         throws IOException, InterruptedException {
       return true;
     }
+
+    @Override
+    public int getAssignedPortNum(TaskAttemptID taskid) {
+      // TODO Auto-generated method stub
+      return 0;
+    }
   }
 
   public static class LocalSyncClient extends BSPPeerSyncClient {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Thu Feb  6 12:49:10 2014
@@ -18,6 +18,7 @@
 package org.apache.hama.bsp.message;
 
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 
@@ -25,7 +26,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
@@ -75,20 +75,31 @@ public final class HamaMessageManagerImp
   private final void startRPCServer(Configuration conf,
       InetSocketAddress peerAddress) {
     try {
-      String bindAddress = conf.get(Constants.PEER_HOST,
-          Constants.DEFAULT_PEER_HOST);
-      InetSocketAddress selfAddress = new InetSocketAddress(bindAddress, 0);
-
-      // TODO Make number of RPC Server threads configurable
-      this.server = RPC.getServer(this, selfAddress.getHostName(),
-          selfAddress.getPort(), conf.getInt("hama.default.messenger.handler.threads.num",
5), false, conf);
+      startServer(peerAddress.getHostName(), peerAddress.getPort());
+    } catch (IOException ioe) {
+      LOG.error("Fail to start RPC server!", ioe);
+      throw new RuntimeException("RPC Server could not be launched!");
+    }
+  }
+
+  private void startServer(String hostName, int port) throws IOException {
+    int retry = 0;
+    try {
+      this.server = RPC.getServer(this, hostName, port,
+          conf.getInt("hama.default.messenger.handler.threads.num", 5), false,
+          conf);
+
       server.start();
-      
-      LOG.info(" BSPPeer address:" + server.getListenerAddress().getHostName()
+      LOG.info("BSPPeer address:" + server.getListenerAddress().getHostName()
           + " port:" + server.getListenerAddress().getPort());
-    } catch (IOException e) {
-      LOG.error("Fail to start RPC server!", e);
-      throw new RuntimeException("RPC Server could not be launched!");
+    } catch (BindException e) {
+      LOG.warn("Address already in use. Retrying " + hostName + ":" + port + 1);
+      startServer(hostName, port + 1);
+      retry++;
+
+      if (retry > 5) {
+        throw new RuntimeException("RPC Server could not be launched!");
+      }
     }
   }
 
@@ -111,7 +122,7 @@ public final class HamaMessageManagerImp
       if (compressor != null
           && (bundle.getApproximateSize() > conf.getLong(
               "hama.messenger.compression.threshold", 1048576))) {
-        
+
         BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
         bspPeerConnection.put(compMsgBundle);
         peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_MESSAGES, 1L);

Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Thu Feb  6 12:49:10
2014
@@ -67,4 +67,6 @@ public interface BSPPeerProtocol extends
   boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
       throws IOException, InterruptedException;
 
+  int getAssignedPortNum(TaskAttemptID taskid);
+
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java Thu Feb  6 12:49:10
2014
@@ -159,6 +159,12 @@ public class TestBSPTaskFaults extends T
         lastPingTime = 0L;
       }
     }
+
+    @Override
+    public int getAssignedPortNum(TaskAttemptID taskid) {
+      // TODO Auto-generated method stub
+      return 0;
+    }
   }
 
   @SuppressWarnings("unused")

Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Thu Feb  6
12:49:10 2014
@@ -405,4 +405,9 @@ public class BSPApplicationMaster implem
 
   }
 
+  @Override
+  public int getAssignedPortNum(TaskAttemptID taskid) {
+    return 0;
+  }
+
 }



Mime
View raw message