Return-Path: X-Original-To: apmail-hadoop-mapreduce-issues-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DEC40990C for ; Wed, 4 Apr 2012 04:25:51 +0000 (UTC) Received: (qmail 86933 invoked by uid 500); 4 Apr 2012 04:25:51 -0000 Delivered-To: apmail-hadoop-mapreduce-issues-archive@hadoop.apache.org Received: (qmail 86824 invoked by uid 500); 4 Apr 2012 04:25:48 -0000 Mailing-List: contact mapreduce-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-issues@hadoop.apache.org Delivered-To: mailing list mapreduce-issues@hadoop.apache.org Received: (qmail 86788 invoked by uid 99); 4 Apr 2012 04:25:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Apr 2012 04:25:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.116] (HELO hel.zones.apache.org) (140.211.11.116) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Apr 2012 04:25:45 +0000 Received: from hel.zones.apache.org (hel.zones.apache.org [140.211.11.116]) by hel.zones.apache.org (Postfix) with ESMTP id 32253357096 for ; Wed, 4 Apr 2012 04:25:25 +0000 (UTC) Date: Wed, 4 Apr 2012 04:25:25 +0000 (UTC) From: "Nikhil S. Ketkar (Commented) (JIRA)" To: mapreduce-issues@hadoop.apache.org Message-ID: <388054474.10247.1333513525325.JavaMail.tomcat@hel.zones.apache.org> In-Reply-To: <1290885703.40356.1320057092306.JavaMail.tomcat@hel.zones.apache.org> Subject: [jira] [Commented] (MAPREDUCE-3315) Master-Worker Application on YARN MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/MAPREDUCE-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13246011#comment-13246011 ] Nikhil S. Ketkar commented on MAPREDUCE-3315: --------------------------------------------- Here is a brief description of the API in the current patch. This is quite preliminary and I will be improving on this based on feedback. In order to implement a new Master-Worker job, the user has to implement 4 classes, which are, WorkUnit, ResultUnit, Master and Worker. The WorkUnit and ResultUnit classes extend the MWMessage class which is an abstract class and is placed in the Master-Worker framework. Similarly, Master extends the MWApplicationMaster and Worker extends the MWWorkerRunner. Lets look at each of the classes one by one. Here is the code for the WorkUnit. Note that here, a single integer has been added as a payload and it represents the data that the Master will populate and the Worker will work on. The framework passes around MWMessage objects and is unaware of the additional data that might be contained in the MWMessage. It is the users reponsibility to populate and extract the payload information (in the Master and Worker classes) and also provide methods to serialize and deserialize the payload data. {code} public class WorkUnit extends MWMessage { int data; public int getData() { return data; } public void setData(int data) { this.data = data; } @Override public void writeWorkUnit(DataOutput out) throws IOException { out.writeInt(data); } @Override public void readFieldsWorkUnit(DataInput in) throws IOException { data = in.readInt(); } } {code} Similarly, here is the code for the ResultUnit. For our simple example its quite identical to the WorkUnit. As with the WorkUnit, the result unit also contains the payload and its the users responsibility to populate and extract the payload and provide functionality to serialize and deserialize the payload. {code} public class ResultUnit extends MWMessage { int data; public int getData() { return data; } public void setData(int data) { this.data = data; } @Override public void writeWorkUnit(DataOutput out) throws IOException { out.writeInt(data); } @Override public void readFieldsWorkUnit(DataInput in) throws IOException { data = in.readInt(); } {code} Now lets look at the API for the Worker. Any Worker should extend the MWWorkerRunner class. It should override the doWork method which basically receives a WorkUnit and returns a ResultUnit. For this simple example, I am simply populating the ResultUnit with the data in the WorkUnit. {code} public class MWWorker extends MWWorkerRunner { public static void main(String[] args) { MWWorker curr = new MWWorker(); try { curr.init("localhost", 16001); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public MWMessage doWork(MWMessage workUnit) { ResultUnit result = new ResultUnit(); int got = ((WorkUnit) workUnit).getData(); result.setData(got); return result; } } {code} Now, on to the Master. Any Master extends the MWApplicationMaster class and overrides the manageWorkers method. There are 4 methods in MWApplicationMaster that the master can use. The addWorker method which simply adds a worker. Similarly, there is a killWorker method that kills a worker. This basically allows the user to add workers and get rid of them based on the work load. To assign work, the user can use the addWork method which takes the WorkUnit as a parameter. This is a non-blocking call. To get a ResultUnit the user can use the waitForResult method which returns a ResultUnit. This is a blocking call. {code} public class MWMaster extends MWApplicationMaster { private static final Log LOG = LogFactory.getLog(MWMaster.class); public static void main(String[] args) throws InterruptedException, ParseException, IOException, URISyntaxException { MWMaster curr = new MWMaster(); curr.initiate(args); curr.terminate(); } @Override public void manageWorkers() { addWorker(); addWorker(); for(int i = 0; i < 100; i++) { WorkUnit curr = new WorkUnit(); curr.setData(i); addWork(curr); } for(int i = 0; i < 100; i++) { try { ResultUnit curr = (ResultUnit) waitForResult(); LOG.info("Receiveing Result" + curr.getData()); } catch (InterruptedException e) { e.printStackTrace(); } } killWorker(); killWorker(); } } {code} Currenly, I have not provided Client API for submission. The Client is basically a part of the framework, the code of this can be found in the MWClient class. Here is how the example application is to be launched. There are 4 required parameters, the MasterWorker Library Jar (--masterworkerlib) which contains the client code, the MasterWorker Application Jar (masterworkerapp) which contains the users application, and the main classes for the Master and the Worker (masterclass and workerclass) respectively. {code} hadoop jar masterworker-0.0.1-SNAPSHOT.jar org.apache.hadoop.yarn.applications.masterworker.MWClient --masterworkerlib hadoop-yarn-applications-masterworker-core-3.0.0-SNAPSHOT.jar --masterworkerapp hadoop-yarn-applications-masterworker-example-3.0.0-SNAPSHOT.jar --masterclass org.apache.hadoop.yarn.applications.masterworkerexample.MWMaster --workerclass org.apache.hadoop.yarn.applications.masterworkerexample.MWWorker {code} > Master-Worker Application on YARN > --------------------------------- > > Key: MAPREDUCE-3315 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-3315 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Reporter: Sharad Agarwal > Assignee: Sharad Agarwal > Fix For: 0.24.0 > > Attachments: MAPREDUCE-3315.patch > > > Currently master worker scenarios are forced fit into Map-Reduce. Now with YARN, these can be first class and would benefit real/near realtime workloads and be more effective in using the cluster resources. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira