hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Suraj Menon (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HAMA-780) New launched child processes by fault tolerance may not be able to contact each other
Date Sun, 21 Jul 2013 16:14:48 GMT

    [ https://issues.apache.org/jira/browse/HAMA-780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13714731#comment-13714731
] 

Suraj Menon commented on HAMA-780:
----------------------------------

Read through the path, looks good. The assumption here was that the node would get deleted
because the task node is registered as an Ephemeral Zookeeper node. Apparently, that is not
a correct assumption :) Thanks.
                
> New launched child processes by fault tolerance may not be able to contact each other
> -------------------------------------------------------------------------------------
>
>                 Key: HAMA-780
>                 URL: https://issues.apache.org/jira/browse/HAMA-780
>             Project: Hama
>          Issue Type: Bug
>          Components: bsp core
>    Affects Versions: 0.6.2
>            Reporter: MaoYuan Xian
>            Assignee: MaoYuan Xian
>         Attachments: HAMA-780.patch
>
>
> When fault tolerance enabled, sometimes recovery process fail because of new launched
child process can not send message to each other.
> I can finally find the cause:
> On one hand, when a new child process is launched for recovery, its port is set via following
logic:
> {code}
>       final BSPTask task = (BSPTask) umbilical.getTask(taskid);
>       int peerPort = umbilical.getAssignedPortNum(taskid);
>       ...
>       defaultConf.setInt(Constants.PEER_PORT, peerPort);
> {code}
> These logic will find the lowest available port for new comming process:
> {code}
>   public static int getNextAvailable(int fromPort) {
>     ...
>     for (int i = fromPort + 1; i <= MAX_PORT_NUMBER; i++) {
>       if (available(i)) {
>         return i;
>       }
>     }
>     ...
>   }
> {code}
> List a use case here:
> Run one job with 3 child tasks, they are listening to hostname:61001, hostname:61002,
hostname:61003
> In case the task listens to hostname:61002 failed (because of disk problem or kill by
system's memory protection program), the 61002 port is release now.
> Recovery process start, trigger three new processes, assign to the addresses hostname:61002,
hostname:61004, hostname:61005. (61001, 61003 is still be held by old child task before they
quit).
> During this recovery phase, we can find the /bsp/job_id/peers directory in zookeeper
is something like
> {quote}
> hostname:61001, hostname:61002, hostname:61005, hostname:61003, hostname:61004
> {quote}
> One the other hand, new launched child processes try to find each other from zookeeper
when they are launch (in BSPPeerImpl.java):
> {code}
>   private final void initPeerNames() {
>     if (allPeers == null) {
>       allPeers = syncClient.getAllPeerNames(taskId);
>     }
>   }
> {code}
> {code}
>   public String[] getAllPeerNames(TaskAttemptID taskId) {
>     if (allPeers == null) {
>       TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
>       try {
>         List<String> var = zk.getChildren(
>             constructKey(taskId.getJobID(), "peers"), this);
>         allPeers = var.toArray(new String[var.size()]);
>         for (String s : allPeers) {
>           ...
>           boolean result = getValueFromBytes(data, thatTask);
>           if (result) {
>             LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
>                 + thatTask.getTaskID().getId() + " : " + s);
>             sortedMap.put(thatTask.getTaskID().getId(), s);
>           }
>         }
>       } catch (Exception e) {
>         LOG.error(e);
>         throw new RuntimeException("All peer names could not be retrieved!");
>       }
>     ...
>   }
> {code}
> Open the log, we can find the following:
> {quote}
> 13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from zookeeper: attempt_201307122024_0005_000001_0
ID:1 : hostname:61001
> 13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from zookeeper: attempt_201307122024_0005_000000_1
ID:0 : hostname:61002
> 13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from zookeeper: attempt_201307122024_0005_000002_1
ID:2 : hostname:61005
> 13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from zookeeper: attempt_201307122024_0005_000002_0
ID:2 : hostname:61003
> 13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from zookeeper: attempt_201307122024_0005_000001_1
ID:1 : hostname:61004
> {quote}
> New adding peer hostname:61005 is put before the hostname:61003, which make the sortedMap
in ZooKeeperSyncClientImpl has the map <2, hostname:61003> (the above code sortedMap.put(thatTask.getTaskID().getId(),
s) makes this happen). The new round of processes communication will run into mal-function
because the message should be sent to "hostname:61005" will be sent to "hostname:61003"

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message