Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 17127 invoked from network); 17 Mar 2011 20:23:39 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Mar 2011 20:23:39 -0000 Received: (qmail 64436 invoked by uid 500); 17 Mar 2011 20:23:39 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 64398 invoked by uid 500); 17 Mar 2011 20:23:39 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 64390 invoked by uid 99); 17 Mar 2011 20:23:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Mar 2011 20:23:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Mar 2011 20:23:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 3B4C72388C2D; Thu, 17 Mar 2011 20:22:08 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1082677 [15/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu... Date: Thu, 17 Mar 2011 20:21:54 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110317202208.3B4C72388C2D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,303 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.List; + +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.v2.lib.TypeConverter; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; +import org.apache.hadoop.yarn.security.SchedulerSecurityInfo; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.yarn.ApplicationMaster; +import org.apache.hadoop.yarn.ApplicationState; +import org.apache.hadoop.yarn.YarnRemoteException; +import org.apache.hadoop.mapreduce.v2.api.JobReport; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.TaskReport; + +public class ClientServiceDelegate { + private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class); + private Configuration conf; + private ApplicationID appId; + private final ResourceMgrDelegate rm; + private MRClientProtocol realProxy = null; + private String serviceAddr = ""; + private String serviceHttpAddr = ""; + + ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, + ApplicationID appId) throws AvroRemoteException { + this.conf = conf; + this.rm = rm; + this.appId = appId; + if (appId != null) { + refreshProxy(); + } + } + + private void refreshProxy() throws AvroRemoteException { + ApplicationMaster appMaster = rm.getApplicationMaster(appId); + if (ApplicationState.COMPLETED.equals(appMaster.state)) { + serviceAddr = conf.get("jobhistory.server.hostname") + ":" + + conf.get("jobhistory.server.port"); + LOG.debug("Reconnecting to job history server " + serviceAddr); + } else { + /* TODO check to confirm its really launched */ + serviceAddr = appMaster.host + ":" + appMaster.rpcPort; + serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort; + } + try { + instantiateProxy(serviceAddr); + } catch (IOException e) { + throw new YarnException(e); + } + } + + void instantiateProxy(ApplicationID applicationId, ApplicationMaster appMaster) + throws IOException { + try { + this.appId = applicationId; + LOG.info("Trying to connect to the ApplicationManager of" + + " application " + applicationId + " running at " + appMaster); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + serviceAddr = appMaster.host + ":" + + appMaster.rpcPort; + serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort; + if (UserGroupInformation.isSecurityEnabled()) { + String clientTokenEncoded = appMaster.clientToken.toString(); + Token clientToken = new Token(); + clientToken.decodeFromUrlString(clientTokenEncoded); + clientToken.setService(new Text(appMaster.host.toString() + ":" + + appMaster.rpcPort)); + currentUser.addToken(clientToken); + } + instantiateProxy(serviceAddr); + LOG.info("Connection to the ApplicationManager established."); + } catch (IOException e) { + throw (new IOException(e)); + } + } + + private void instantiateProxy(final String serviceAddr) throws IOException { + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + realProxy = currentUser.doAs(new PrivilegedAction() { + @Override + public MRClientProtocol run() { + Configuration myConf = new Configuration(conf); + myConf.setClass( + CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME, + SchedulerSecurityInfo.class, SecurityInfo.class); + YarnRPC rpc = YarnRPC.create(myConf); + return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class, + NetUtils.createSocketAddr(serviceAddr), myConf); + } + }); + } + + public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException, + InterruptedException { + appId = TypeConverter.toYarn(arg0).appID; + org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0); + if (realProxy == null) refreshProxy(); + try { + return TypeConverter.fromYarn(realProxy.getCounters(jobID)); + } catch(Exception e) { + LOG.debug("Failing to contact application master", e); + refreshProxy(); + return TypeConverter.fromYarn(realProxy.getCounters(jobID)); + } + } + + public String getJobHistoryDir() throws IOException, InterruptedException { + //TODO fix this + return ""; + } + + public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, + int arg2) throws IOException, InterruptedException { + appId = TypeConverter.toYarn(arg0).appID; + if (realProxy == null) refreshProxy(); + + org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0); + List list = null; + try { + list = realProxy.getTaskAttemptCompletionEvents(jobID, + arg1, arg2); + } catch(Exception e) { + LOG.debug("Failed to contact application master ", e); + refreshProxy(); + list = realProxy.getTaskAttemptCompletionEvents(jobID, + arg1, arg2); + } + return TypeConverter.fromYarn( + list.toArray(new org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[0])); + } + + public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID + arg0) + throws IOException, + InterruptedException { + + List list = null; + org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID = TypeConverter.toYarn(arg0); + appId = TypeConverter.toYarn(arg0.getJobID()).appID; + if (realProxy == null) refreshProxy(); + + try { + list = realProxy.getDiagnostics(attemptID); + } catch(Exception e) { + LOG.debug("Failed to contact application master ", e); + refreshProxy(); + list = realProxy.getDiagnostics(attemptID); + } + String[] result = new String[list.size()]; + int i = 0; + for (CharSequence c : list) { + result[i++] = c.toString(); + } + return result; + } + + //this method is here due to package restriction of + //TaskReport constructor + public static org.apache.hadoop.mapred.TaskReport[] fromYarn( + List reports) { + org.apache.hadoop.mapred.TaskReport[] result = + new org.apache.hadoop.mapred.TaskReport[reports.size()]; + int i = 0; + for (TaskReport report : reports) { + List diag = report.diagnostics; + String[] diagnosticArr = new String[diag.size()]; + int j = 0; + for (CharSequence c : diag) { + diagnosticArr[j++] = c.toString(); + } + org.apache.hadoop.mapred.TaskReport oldReport = + new org.apache.hadoop.mapred.TaskReport( + TypeConverter.fromYarn(report.id), report.progress, + report.state.toString(), + diagnosticArr, TypeConverter.fromYarn(report.state), + report.startTime, report.finishTime, + new org.apache.hadoop.mapred.Counters( + TypeConverter.fromYarn(report.counters))); + result[i++] = oldReport; + } + return result; + } + + + public JobReport getJobReport(org.apache.hadoop.mapreduce.v2.api.JobID jobID) + throws AvroRemoteException, YarnRemoteException { + appId = jobID.appID; + if (realProxy == null) refreshProxy(); + + try { + return realProxy.getJobReport(jobID); + } catch (Exception e) { + refreshProxy(); + return realProxy.getJobReport(jobID); + } + } + + public JobStatus getJobStatus(org.apache.hadoop.mapreduce.v2.api.JobID jobId) + throws AvroRemoteException, YarnRemoteException { + appId = jobId.appID; + if (realProxy == null) refreshProxy(); + String trackingUrl = serviceAddr; + String stagingDir = conf.get("yarn.apps.stagingDir"); + String jobFile = stagingDir + "/" + jobId.toString(); + return TypeConverter.fromYarn(getJobReport(jobId), jobFile, serviceHttpAddr); + } + + + public JobStatus getJobStatus(JobID jobID) throws YarnRemoteException, + AvroRemoteException { + return getJobStatus(TypeConverter.toYarn(jobID)); + } + + public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType) + throws YarnRemoteException, AvroRemoteException { + List taskReports = null; + org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID); + appId = nJobID.appID; + if (realProxy == null) refreshProxy(); + + try { + taskReports = realProxy.getTaskReports(nJobID, + TypeConverter.toYarn(taskType)); + } catch(Exception e) { + LOG.debug("Failed to contact application master ", e); + refreshProxy(); + taskReports = realProxy.getTaskReports(nJobID, + TypeConverter.toYarn(taskType)); + } + return (org.apache.hadoop.mapreduce.TaskReport[])TypeConverter.fromYarn + (taskReports).toArray(); + } + + public Void killJob(JobID jobID) throws YarnRemoteException, + AvroRemoteException { + org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID); + appId = nJobID.appID; + if (realProxy == null) refreshProxy(); + + try { + realProxy.killJob(nJobID); + } catch(Exception e) { + LOG.debug("Failed to contact application master ", e); + refreshProxy(); + realProxy.killJob(nJobID); + } + return null; + } + + public boolean killTask(TaskAttemptID taskAttemptID, boolean killed) + throws YarnRemoteException, AvroRemoteException { + org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID + = TypeConverter.toYarn(taskAttemptID); + appId = attemptID.taskID.jobID.appID; + if (realProxy == null) refreshProxy(); + + try { + realProxy.killTaskAttempt(attemptID); + } catch(Exception e) { + LOG.debug("Failed to contact application master ", e); + refreshProxy(); + realProxy.killTaskAttempt(attemptID); + } + return true; + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,226 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.QueueAclsInfo; +import org.apache.hadoop.mapreduce.QueueInfo; +import org.apache.hadoop.mapreduce.TaskTrackerInfo; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.lib.TypeConverter; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityInfo; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.conf.YARNApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.yarn.ApplicationMaster; +import org.apache.hadoop.yarn.ApplicationState; +import org.apache.hadoop.yarn.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.ClientRMProtocol; +import org.apache.hadoop.yarn.YarnClusterMetrics; + +// TODO: This should be part of something like yarn-client. +public class ResourceMgrDelegate { + private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); + + private Configuration conf; + ClientRMProtocol applicationsManager; + private ApplicationID applicationId; + + public ResourceMgrDelegate(Configuration conf) throws UnsupportedFileSystemException { + this.conf = conf; + YarnRPC rpc = YarnRPC.create(conf); + InetSocketAddress rmAddress = + NetUtils.createSocketAddr(conf.get( + YarnConfiguration.APPSMANAGER_ADDRESS, + YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS)); + LOG.info("Connecting to ResourceManager at " + rmAddress); + Configuration appsManagerServerConf = new Configuration(this.conf); + appsManagerServerConf.setClass( + CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME, + ClientRMSecurityInfo.class, SecurityInfo.class); + applicationsManager = + (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, + rmAddress, appsManagerServerConf); + LOG.info("Connected to ResourceManager at " + rmAddress); + } + + public void cancelDelegationToken(Token arg0) + throws IOException, InterruptedException { + return; + } + + + public TaskTrackerInfo[] getActiveTrackers() throws IOException, + InterruptedException { + return null; + } + + + public JobStatus[] getAllJobs() throws IOException, InterruptedException { + return null; + } + + + public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, + InterruptedException { + throw new IOException("Not implemented"); + } + + + public QueueInfo[] getChildQueues(String arg0) throws IOException, + InterruptedException { + throw new IOException("Not implemented"); + } + + + public ClusterMetrics getClusterMetrics() throws IOException, + InterruptedException { + YarnClusterMetrics metrics = applicationsManager.getClusterMetrics(); + ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, + metrics.numNodeManagers * 10, metrics.numNodeManagers * 2, 1, + metrics.numNodeManagers, 0, 0); + return oldMetrics; + } + + + public Token getDelegationToken(Text arg0) + throws IOException, InterruptedException { + throw new IOException("Not Implemented"); + } + + + public String getFilesystemName() throws IOException, InterruptedException { + return FileSystem.get(conf).getUri().toString(); + } + + public JobID getNewJobID() throws IOException, InterruptedException { + applicationId = applicationsManager.getNewApplicationId(); + return TypeConverter.fromYarn(applicationId); + } + + + public QueueInfo getQueue(String arg0) throws IOException, + InterruptedException { + throw new IOException("Not implemented"); + } + + + public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, + InterruptedException { + throw new IOException("Not implemented"); + } + + + public QueueInfo[] getQueues() throws IOException, InterruptedException { + throw new IOException("Not implemented"); + } + + + public QueueInfo[] getRootQueues() throws IOException, InterruptedException { + throw new IOException("Not Implemented"); + } + + + public String getStagingAreaDir() throws IOException, InterruptedException { +// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR); + Path path = new Path(conf.get(YARNApplicationConstants.APPS_STAGING_DIR_KEY)); + LOG.info("DEBUG --- getStagingAreaDir: dir=" + path); + return path.toString(); + } + + + public String getSystemDir() throws IOException, InterruptedException { + Path sysDir = new Path( + YARNApplicationConstants.JOB_SUBMIT_DIR); + FileContext.getFileContext(conf).delete(sysDir, true); + return sysDir.toString(); + } + + + public long getTaskTrackerExpiryInterval() throws IOException, + InterruptedException { + return 0; + } + + public void setJobPriority(JobID arg0, String arg1) throws IOException, + InterruptedException { + return; + } + + + public long getProtocolVersion(String arg0, long arg1) throws IOException { + return 0; + } + + public long renewDelegationToken(Token arg0) + throws IOException, InterruptedException { + throw new IOException("Not implemented"); + } + + + public ApplicationID submitApplication(ApplicationSubmissionContext appContext) + throws IOException { + appContext.applicationId = applicationId; + applicationsManager.submitApplication(appContext); + LOG.info("Submitted application " + applicationId + " to ResourceManager"); + return applicationId; + } + + public ApplicationMaster getApplicationMaster(ApplicationID appId) + throws AvroRemoteException { + ApplicationMaster appMaster = + applicationsManager.getApplicationMaster(appId); + while (appMaster.state != ApplicationState.RUNNING && + appMaster.state != ApplicationState.KILLED && + appMaster.state != ApplicationState.FAILED && + appMaster.state != ApplicationState.COMPLETED) { + appMaster = applicationsManager.getApplicationMaster(appId); + try { + LOG.info("Waiting for appMaster to start.."); + Thread.sleep(2000); + } catch(InterruptedException ie) { + //DO NOTHING + } + } + return appMaster; + } + + public ApplicationID getApplicationId() { + return applicationId; + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,533 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.QueueAclsInfo; +import org.apache.hadoop.mapreduce.QueueInfo; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapreduce.TaskTrackerInfo; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YARNApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.AvroUtil; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.yarn.ApplicationMaster; +import org.apache.hadoop.yarn.ApplicationState; +import org.apache.hadoop.yarn.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.LocalResource; +import org.apache.hadoop.yarn.LocalResourceType; +import org.apache.hadoop.yarn.LocalResourceVisibility; +import org.apache.hadoop.yarn.Resource; +import org.apache.hadoop.yarn.URL; + +/** + * This class enables the current JobClient (0.22 hadoop) to run on YARN. + */ +public class YARNRunner implements ClientProtocol { + + private static final Log LOG = LogFactory.getLog(YARNRunner.class); + + public static final String YARN_AM_RESOURCE_KEY = "yarn.am.mapreduce.resource.mb"; + private static final int DEFAULT_YARN_AM_RESOURCE = 1024; + + private ResourceMgrDelegate resMgrDelegate; + private ClientServiceDelegate clientServiceDelegate; + private YarnConfiguration conf; + + /** + * Yarn runner incapsulates the client interface of + * yarn + * @param conf the configuration object for the client + */ + public YARNRunner(Configuration conf, ApplicationID appID) + throws AvroRemoteException { + this.conf = new YarnConfiguration(conf); + try { + this.resMgrDelegate = new ResourceMgrDelegate(conf); + this.clientServiceDelegate = new ClientServiceDelegate(conf, + resMgrDelegate, appID); + } catch (UnsupportedFileSystemException ufe) { + throw new RuntimeException("Error in instantiating YarnClient", ufe); + } + } + + /** + * Yarn runner incapsulates the client interface of + * yarn + * @param conf the configuration object for the client + */ + public YARNRunner(Configuration conf) throws AvroRemoteException { + this(conf, null); + } + @Override + public void cancelDelegationToken(Token arg0) + throws IOException, InterruptedException { + resMgrDelegate.cancelDelegationToken(arg0); + } + + @Override + public TaskTrackerInfo[] getActiveTrackers() throws IOException, + InterruptedException { + throw new IOException("Not implemented"); + } + + @Override + public JobStatus[] getAllJobs() throws IOException, InterruptedException { + return resMgrDelegate.getAllJobs(); + } + + @Override + public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, + InterruptedException { + return resMgrDelegate.getBlacklistedTrackers(); + } + + @Override + public QueueInfo[] getChildQueues(String arg0) throws IOException, + InterruptedException { + return resMgrDelegate.getChildQueues(arg0); + } + + @Override + public ClusterMetrics getClusterMetrics() throws IOException, + InterruptedException { + return resMgrDelegate.getClusterMetrics(); + } + + @Override + public Token getDelegationToken(Text arg0) + throws IOException, InterruptedException { + return resMgrDelegate.getDelegationToken(arg0); + } + + @Override + public String getFilesystemName() throws IOException, InterruptedException { + return resMgrDelegate.getFilesystemName(); + } + + @Override + public JobID getNewJobID() throws IOException, InterruptedException { + return resMgrDelegate.getNewJobID(); + } + + @Override + public QueueInfo getQueue(String arg0) throws IOException, + InterruptedException { + return resMgrDelegate.getQueue(arg0); + } + + @Override + public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, + InterruptedException { + return resMgrDelegate.getQueueAclsForCurrentUser(); + } + + @Override + public QueueInfo[] getQueues() throws IOException, InterruptedException { + return resMgrDelegate.getQueues(); + } + + @Override + public QueueInfo[] getRootQueues() throws IOException, InterruptedException { + return resMgrDelegate.getRootQueues(); + } + + @Override + public String getStagingAreaDir() throws IOException, InterruptedException { + return resMgrDelegate.getStagingAreaDir(); + } + + @Override + public String getSystemDir() throws IOException, InterruptedException { + return resMgrDelegate.getSystemDir(); + } + + @Override + public long getTaskTrackerExpiryInterval() throws IOException, + InterruptedException { + return resMgrDelegate.getTaskTrackerExpiryInterval(); + } + + @Override + public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) + throws IOException, InterruptedException{ + + // Upload only in security mode: TODO + Path applicationTokensFile = + new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE); + try { + ts.writeTokenStorageFile(applicationTokensFile, conf); + } catch (IOException e) { + throw new YarnException(e); + } + + // XXX Remove + Path submitJobDir = new Path(jobSubmitDir); + FileContext defaultFS = FileContext.getFileContext(conf); + Path submitJobFile = + defaultFS.makeQualified(JobSubmissionFiles.getJobConfPath(submitJobDir)); + FSDataInputStream in = defaultFS.open(submitJobFile); + conf.addResource(in); + // --- + + // Construct necessary information to start the MR AM + ApplicationSubmissionContext appContext = + getApplicationSubmissionContext(conf, jobSubmitDir, ts); + setupDistributedCache(conf, appContext); + + // XXX Remove + in.close(); + // --- + + // Submit to ResourceManager + ApplicationID applicationId = resMgrDelegate.submitApplication(appContext); + + ApplicationMaster appMaster = + resMgrDelegate.getApplicationMaster(applicationId); + if (appMaster.state == ApplicationState.FAILED || appMaster.state == + ApplicationState.KILLED) { + throw new AvroRemoteException("failed to run job"); + } + clientServiceDelegate.instantiateProxy(applicationId, appMaster); + return clientServiceDelegate.getJobStatus(jobId); + } + + private LocalResource createApplicationResource(FileContext fs, Path p) + throws IOException { + LocalResource rsrc = new LocalResource(); + FileStatus rsrcStat = fs.getFileStatus(p); + rsrc.resource = AvroUtil.getYarnUrlFromPath(rsrcStat.getPath()); + rsrc.size = rsrcStat.getLen(); + rsrc.timestamp = rsrcStat.getModificationTime(); + rsrc.type = LocalResourceType.FILE; + rsrc.state = LocalResourceVisibility.APPLICATION; + return rsrc; + } + + private ApplicationSubmissionContext getApplicationSubmissionContext( + Configuration jobConf, + String jobSubmitDir, Credentials ts) throws IOException { + ApplicationSubmissionContext appContext = + new ApplicationSubmissionContext(); + ApplicationID applicationId = resMgrDelegate.getApplicationId(); + appContext.applicationId = applicationId; + Resource capability = new Resource(); + capability.memory = + conf.getInt(YARN_AM_RESOURCE_KEY, DEFAULT_YARN_AM_RESOURCE); + LOG.info("Master capability = " + capability); + appContext.masterCapability = capability; + + FileContext defaultFS = FileContext.getFileContext(conf); + Path jobConfPath = new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE); + + URL yarnUrlForJobSubmitDir = + AvroUtil.getYarnUrlFromPath(defaultFS.makeQualified(new Path( + jobSubmitDir))); + appContext.resources = new HashMap(); + LOG.debug("Creating setup context, jobSubmitDir url is " + + yarnUrlForJobSubmitDir); + + appContext.resources.put(YARNApplicationConstants.JOB_SUBMIT_DIR, + yarnUrlForJobSubmitDir); + + appContext.resources_todo = new HashMap(); + appContext.resources_todo.put(YARNApplicationConstants.JOB_CONF_FILE, + createApplicationResource(defaultFS, + jobConfPath)); + appContext.resources_todo.put(YARNApplicationConstants.JOB_JAR, + createApplicationResource(defaultFS, + new Path(jobSubmitDir, YARNApplicationConstants.JOB_JAR))); + // TODO gross hack + for (String s : new String[] { "job.split", "job.splitmetainfo", + YarnConfiguration.APPLICATION_TOKENS_FILE }) { + appContext.resources_todo.put( + YARNApplicationConstants.JOB_SUBMIT_DIR + "/" + s, + createApplicationResource(defaultFS, + new Path(jobSubmitDir, s))); + } + + // TODO: Only if security is on. + List fsTokens = new ArrayList(); + for (Token token : ts.getAllTokens()) { + fsTokens.add(token.encodeToUrlString()); + } + + // TODO - Remove this! + appContext.fsTokens = fsTokens; + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + appContext.fsTokens_todo = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + + // Add queue information + appContext.queue = + jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); + + // Add job name + appContext.applicationName = jobConf.get(JobContext.JOB_NAME, "N/A"); + + // Add the command line + String javaHome = "$JAVA_HOME"; + Vector vargs = new Vector(8); + vargs.add(javaHome + "/bin/java"); + vargs.add(conf.get(YARNApplicationConstants.MR_APPMASTER_COMMAND_OPTS, + "-Dhadoop.root.logger=DEBUG,console -Xmx1024m")); + + // Add { job jar, MR app jar } to classpath. + appContext.environment = new HashMap(); + MRApps.setInitialClasspath(appContext.environment); + MRApps.addToClassPath(appContext.environment, + YARNApplicationConstants.JOB_JAR); + MRApps.addToClassPath(appContext.environment, + YARNApplicationConstants.YARN_MAPREDUCE_APP_JAR_PATH); + vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster"); + vargs.add(String.valueOf(applicationId.clusterTimeStamp)); + vargs.add(String.valueOf(applicationId.id)); + vargs.add("1>logs/stderr"); + vargs.add("2>logs/stdout"); + + Vector vargsFinal = new Vector(8); + // Final commmand + StringBuilder mergedCommand = new StringBuilder(); + for (CharSequence str : vargs) { + mergedCommand.append(str).append(" "); + } + vargsFinal.add("mkdir logs;" + mergedCommand.toString()); + + LOG.info("Command to launch container for ApplicationMaster is : " + + mergedCommand); + + appContext.command = vargsFinal; + // TODO: RM should get this from RPC. + appContext.user = UserGroupInformation.getCurrentUser().getShortUserName(); + return appContext; + } + + /** + * TODO: Copied for now from TaskAttemptImpl.java ... fixme + */ + private void setupDistributedCache(Configuration conf, + ApplicationSubmissionContext container) throws IOException { + + // Cache archives + parseDistributedCacheArtifacts(container, LocalResourceType.ARCHIVE, + DistributedCache.getCacheArchives(conf), + DistributedCache.getArchiveTimestamps(conf), + getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), + DistributedCache.getArchiveVisibilities(conf), + DistributedCache.getArchiveClassPaths(conf)); + + // Cache files + parseDistributedCacheArtifacts(container, LocalResourceType.FILE, + DistributedCache.getCacheFiles(conf), + DistributedCache.getFileTimestamps(conf), + getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES), + DistributedCache.getFileVisibilities(conf), + DistributedCache.getFileClassPaths(conf)); + } + + // TODO - Move this to MR! + // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType) + private static void parseDistributedCacheArtifacts( + ApplicationSubmissionContext container, LocalResourceType type, + URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], + Path[] classpaths) throws IOException { + + if (uris != null) { + // Sanity check + if ((uris.length != timestamps.length) || (uris.length != sizes.length) || + (uris.length != visibilities.length)) { + throw new IllegalArgumentException("Invalid specification for " + + "distributed-cache artifacts of type " + type + " :" + + " #uris=" + uris.length + + " #timestamps=" + timestamps.length + + " #visibilities=" + visibilities.length + ); + } + + Map classPaths = new HashMap(); + if (classpaths != null) { + for (Path p : classpaths) { + classPaths.put(p.toUri().getPath().toString(), p); + } + } + for (int i = 0; i < uris.length; ++i) { + URI u = uris[i]; + Path p = new Path(u.toString()); + // Add URI fragment or just the filename + Path name = new Path((null == u.getFragment()) + ? p.getName() + : u.getFragment()); + if (name.isAbsolute()) { + throw new IllegalArgumentException("Resource name must be relative"); + } + container.resources_todo.put( + name.toUri().getPath(), + getLocalResource( + uris[i], type, + visibilities[i] + ? LocalResourceVisibility.PUBLIC + : LocalResourceVisibility.PRIVATE, + sizes[i], timestamps[i]) + ); + if (classPaths.containsKey(u.getPath())) { + MRApps.addToClassPath(container.environment, name.toUri().getPath()); + } + } + } + } + + // TODO - Move this to MR! + private static long[] getFileSizes(Configuration conf, String key) { + String[] strs = conf.getStrings(key); + if (strs == null) { + return null; + } + long[] result = new long[strs.length]; + for(int i=0; i < strs.length; ++i) { + result[i] = Long.parseLong(strs[i]); + } + return result; + } + + private static LocalResource getLocalResource(URI uri, + LocalResourceType type, LocalResourceVisibility visibility, + long size, long timestamp) { + LocalResource resource = new LocalResource(); + resource.resource = AvroUtil.getYarnUrlFromURI(uri); + resource.type = type; + resource.state = visibility; + resource.size = size; + resource.timestamp = timestamp; + return resource; + } + + @Override + public void setJobPriority(JobID arg0, String arg1) throws IOException, + InterruptedException { + resMgrDelegate.setJobPriority(arg0, arg1); + } + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + return resMgrDelegate.getProtocolVersion(arg0, arg1); + } + + @Override + public long renewDelegationToken(Token arg0) + throws IOException, InterruptedException { + return resMgrDelegate.renewDelegationToken(arg0); + } + + + @Override + public Counters getJobCounters(JobID arg0) throws IOException, + InterruptedException { + return clientServiceDelegate.getJobCounters(arg0); + } + + @Override + public String getJobHistoryDir() throws IOException, InterruptedException { + return clientServiceDelegate.getJobHistoryDir(); + } + + @Override + public JobStatus getJobStatus(JobID jobID) throws IOException, + InterruptedException { + JobStatus status = clientServiceDelegate.getJobStatus(jobID); + if (status.isJobComplete()) { + // Clean up the Container running the ApplicationMaster. + } + return status; + } + + @Override + public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, + int arg2) throws IOException, InterruptedException { + return clientServiceDelegate.getTaskCompletionEvents(arg0, arg1, arg2); + } + + @Override + public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException, + InterruptedException { + return clientServiceDelegate.getTaskDiagnostics(arg0); + } + + @Override + public TaskReport[] getTaskReports(JobID jobID, TaskType taskType) + throws IOException, InterruptedException { + return clientServiceDelegate + .getTaskReports(jobID, taskType); + } + + @Override + public void killJob(JobID arg0) throws IOException, InterruptedException { + clientServiceDelegate.killJob(arg0); + } + + @Override + public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException, + InterruptedException { + return clientServiceDelegate.killTask(arg0, arg1); + } + + @Override + public AccessControlList getQueueAdmins(String arg0) throws IOException { + return new AccessControlList("*"); + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,34 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.ClientFactory; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; + +public class YarnClientFactory extends ClientFactory { + + @Override + protected ClientProtocol createClient(Configuration conf) + throws IOException { + return new YARNRunner(conf); + } + } Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,283 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.avro.ipc.AvroRemoteException; +import org.apache.avro.ipc.Server; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.ApplicationID; +import org.apache.hadoop.yarn.ApplicationMaster; +import org.apache.hadoop.yarn.ApplicationState; +import org.apache.hadoop.yarn.ApplicationStatus; +import org.apache.hadoop.yarn.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.ClientRMProtocol; +import org.apache.hadoop.yarn.YarnClusterMetrics; +import org.apache.hadoop.yarn.YarnRemoteException; +import org.apache.hadoop.mapreduce.v2.api.CounterGroup; +import org.apache.hadoop.mapreduce.v2.api.Counters; +import org.apache.hadoop.mapreduce.v2.api.JobID; +import org.apache.hadoop.mapreduce.v2.api.JobReport; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent; +import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport; +import org.apache.hadoop.mapreduce.v2.api.TaskReport; +import org.junit.Test; + +public class TestClientRedirect { + + private static final Log LOG = LogFactory.getLog(TestClientRedirect.class); + private static final String RMADDRESS = "0.0.0.0:8054"; + private static final String AMHOSTNAME = "0.0.0.0"; + private static final int AMPORT = 10020; + private boolean firstRedirect = false; + private boolean secondRedirect = false; + + @Test + public void testRedirect() throws Exception { + + Configuration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS); + conf.set("jobhistory.server.hostname", AMHOSTNAME); + conf.setInt("jobhistory.server.port", AMPORT); + RMService rmService = new RMService("test"); + rmService.init(conf); + rmService.start(); + + MRClientProtocolService clientService = + new MRClientProtocolService(); + clientService.init(conf); + clientService.start(conf); + + LOG.info("services started"); + YARNRunner yarnRunner = new YARNRunner(conf); + Throwable t = null; + org.apache.hadoop.mapreduce.JobID jobID = + new org.apache.hadoop.mapred.JobID("201103121733", 1); + yarnRunner.getJobCounters(jobID); + Assert.assertTrue(firstRedirect); + Assert.assertTrue(secondRedirect); + + rmService.stop(); + clientService.stop(); + } + + class RMService extends AbstractService implements ClientRMProtocol { + private ApplicationsManager applicationsManager; + private String clientServiceBindAddress; + InetSocketAddress clientBindAddress; + private Server server; + + public RMService(String name) { + super(name); + } + + @Override + public void init(Configuration conf) { + clientServiceBindAddress = RMADDRESS; + /* + clientServiceBindAddress = conf.get( + YarnConfiguration.APPSMANAGER_ADDRESS, + YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS); + */ + clientBindAddress = NetUtils.createSocketAddr(clientServiceBindAddress); + super.init(conf); + } + + @Override + public void start() { + // All the clients to appsManager are supposed to be authenticated via + // Kerberos if security is enabled, so no secretManager. + YarnRPC rpc = YarnRPC.create(getConfig()); + Configuration clientServerConf = new Configuration(getConfig()); + this.server = rpc.getServer(ClientRMProtocol.class, this, + clientBindAddress, clientServerConf, null); + this.server.start(); + super.start(); + } + + @Override + public ApplicationID getNewApplicationId() throws AvroRemoteException { + return null; + } + + @Override + public ApplicationMaster getApplicationMaster(ApplicationID applicationId) + throws AvroRemoteException { + ApplicationMaster master = new ApplicationMaster(); + master.applicationId = applicationId; + master.status = new ApplicationStatus(); + master.status.applicationId = applicationId; + if (firstRedirect == false) { + master.state = ApplicationState.RUNNING; + } else { + master.state = ApplicationState.COMPLETED; + } + master.host = AMHOSTNAME; + master.rpcPort = AMPORT; + return master; + } + + @Override + public Void submitApplication(ApplicationSubmissionContext context) + throws AvroRemoteException { + throw new AvroRemoteException("Test"); + } + + @Override + public Void finishApplication(ApplicationID applicationId) + throws AvroRemoteException { + return null; + } + + @Override + public YarnClusterMetrics getClusterMetrics() throws AvroRemoteException { + return null; + } + } + + class MRClientProtocolService extends AbstractService + implements MRClientProtocol { + private InetSocketAddress bindAddress; + private Server server; + + public MRClientProtocolService() { + super("TestClientService"); + } + + public void start(Configuration conf) { + YarnRPC rpc = YarnRPC.create(conf); + //TODO : use fixed port ?? + InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTNAME + ":" + AMPORT); + InetAddress hostNameResolved = null; + try { + address.getAddress(); + hostNameResolved = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + throw new YarnException(e); + } + + server = + rpc.getServer(MRClientProtocol.class, this, address, + conf, null); + server.start(); + this.bindAddress = + NetUtils.createSocketAddr(hostNameResolved.getHostAddress() + + ":" + server.getPort()); + super.start(); + } + + public void stop() { + server.close(); + super.stop(); + } + + @Override + public Counters getCounters(JobID jobID) throws AvroRemoteException, + YarnRemoteException { + if (firstRedirect == false) { + firstRedirect = true; + throw RPCUtil.getRemoteException(new IOException("Fail")); + } + else { + secondRedirect = true; + Counters counters = new Counters(); + counters.groups = new HashMap(); + return counters; + } + } + + @Override + public List getDiagnostics( + org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID) + throws AvroRemoteException, YarnRemoteException { + return null; + } + + @Override + public JobReport getJobReport(JobID jobID) throws AvroRemoteException, + YarnRemoteException { + return null; + } + + @Override + public List getTaskAttemptCompletionEvents( + JobID jobID, int fromEventId, int maxEvents) + throws AvroRemoteException, YarnRemoteException { + return null; + } + + @Override + public TaskAttemptReport getTaskAttemptReport( + org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID) + throws AvroRemoteException, YarnRemoteException { + return null; + } + + @Override + public TaskReport getTaskReport(org.apache.hadoop.mapreduce.v2.api.TaskID taskID) + throws AvroRemoteException, YarnRemoteException { + return null; + } + + @Override + public List getTaskReports(JobID jobID, + org.apache.hadoop.mapreduce.v2.api.TaskType taskType) + throws AvroRemoteException, YarnRemoteException { + return null; + } + + @Override + public Void killJob(JobID jobID) throws AvroRemoteException, + YarnRemoteException { + return null; + } + + @Override + public Void killTask(org.apache.hadoop.mapreduce.v2.api.TaskID taskID) + throws AvroRemoteException, YarnRemoteException { + return null; + } + + @Override + public Void killTaskAttempt( + org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID) + throws AvroRemoteException, YarnRemoteException { + return null; + } + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,111 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.ShuffleHandler; +import org.apache.hadoop.mapred.YarnClientFactory; +import org.apache.hadoop.mapreduce.ClientFactory; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.conf.YARNApplicationConstants; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.nodemanager.NMConfig; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.service.Service; + +/** + * Configures and starts the MR specific components in the YARN cluster. + * + */ +public class MiniMRYarnCluster extends MiniYARNCluster { + + public static final String APPJAR = + "../hadoop-mapreduce-client-app/target/" + + YARNApplicationConstants.HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME; + + private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class); + private JobHistoryServer historyServer; + + public MiniMRYarnCluster(String testName) { + super(testName); + //TODO: add the history server + //historyServer = new JobHistoryServerWrapper(); + //addService(historyServer); + } + + @Override + public void init(Configuration conf) { + conf.setClass("mapreduce.clientfactory.class.name", + YarnClientFactory.class, ClientFactory.class); + conf.setStrings(YARNApplicationConstants.NM_HOSTS_CONF_KEY, + new String[] { NMConfig.DEFAULT_NM_BIND_ADDRESS }); + conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name")); + conf.set(YARNApplicationConstants.APPS_STAGING_DIR_KEY, new File( + getTestWorkDir(), + "apps_staging_dir/${user.name}/").getAbsolutePath()); + conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of + // which shuffle doesn't happen + + //configure the shuffle service in NM + conf.setStrings(AuxServices.AUX_SERVICES, + new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID }); + conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, + ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class, + Service.class); + super.init(conf); + } + + private class JobHistoryServerWrapper extends AbstractService { + public JobHistoryServerWrapper() { + super(JobHistoryServerWrapper.class.getName()); + } + + @Override + public synchronized void start() { + try { + historyServer = new JobHistoryServer(); + historyServer.init(getConfig()); + new Thread() { + public void run() { + historyServer.start(); + }; + }.start(); + while (historyServer.getServiceState() == STATE.INITED) { + LOG.info("Waiting for HistoryServer to start..."); + Thread.sleep(1500); + } + if (historyServer.getServiceState() != STATE.STARTED) { + throw new IOException("HistoryServer failed to start"); + } + super.start(); + } catch (Throwable t) { + throw new YarnException(t); + } + } + } +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,220 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2; + +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.FailingMapper; +import org.apache.hadoop.RandomTextWriterJob; +import org.apache.hadoop.SleepJob; +import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.conf.YARNApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.YarnServerConfig; +import org.apache.hadoop.yarn.server.nodemanager.NMConfig; +import org.apache.hadoop.yarn.server.resourcemanager.RMConfig; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMRJobs { + + private static final Log LOG = LogFactory.getLog(TestMRJobs.class); + + private static MiniMRYarnCluster mrCluster; + + @Before + public void setup() throws InterruptedException, IOException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); + return; + } + + if (mrCluster == null) { + mrCluster = new MiniMRYarnCluster(getClass().getName()); + mrCluster.init(new Configuration()); + mrCluster.start(); + } + } + + @Test + public void testSleepJob() throws IOException, InterruptedException, + ClassNotFoundException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); + return; + } + + SleepJob sleepJob = new SleepJob(); + sleepJob.setConf(mrCluster.getConfig()); + //Job with 3 maps and 2 reduces + Job job = sleepJob.createJob(3, 2, 10000, 1, 5000, 1); + // TODO: We should not be setting MRAppJar as job.jar. It should be + // uploaded separately by YarnRunner. + job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath()); + job.waitForCompletion(true); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + } + + @Test + public void testRandomWriter() throws IOException, InterruptedException, + ClassNotFoundException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); + return; + } + + RandomTextWriterJob randomWriterJob = new RandomTextWriterJob(); + mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072"); + mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024"); + Job job = randomWriterJob.createJob(mrCluster.getConfig()); + FileOutputFormat.setOutputPath(job, new Path(mrCluster.getTestWorkDir().getAbsolutePath(), + "random-output")); + // TODO: We should not be setting MRAppJar as job.jar. It should be + // uploaded separately by YarnRunner. + job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath()); + job.waitForCompletion(true); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + } + + @Test + public void testFailingMapper() throws IOException, InterruptedException, + ClassNotFoundException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test."); + return; + } + + int numMaps = 1; + mrCluster.getConfig().setInt(MRJobConfig.NUM_MAPS, numMaps); + + mrCluster.getConfig().setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout + mrCluster.getConfig().setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the no of attempts + + Job job = new Job(mrCluster.getConfig()); + + job.setJarByClass(FailingMapper.class); + job.setJobName("failmapper"); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + job.setInputFormatClass(RandomInputFormat.class); + job.setMapperClass(FailingMapper.class); + + job.setOutputFormatClass(TextOutputFormat.class); + job.setNumReduceTasks(0); + + FileOutputFormat.setOutputPath(job, new Path(mrCluster.getTestWorkDir().getAbsolutePath(), + "failmapper-output")); + // TODO: We should not be setting MRAppJar as job.jar. It should be + // uploaded separately by YarnRunner. + job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath()); + job.waitForCompletion(true); + TaskID taskID = new TaskID(job.getJobID(), TaskType.MAP, 0); + TaskAttemptID aId = new TaskAttemptID(taskID, 0); + System.out.println("Diagnostics for " + aId + " :"); + for (String diag : job.getTaskDiagnostics(aId)) { + System.out.println(diag); + } + aId = new TaskAttemptID(taskID, 1); + System.out.println("Diagnostics for " + aId + " :"); + for (String diag : job.getTaskDiagnostics(aId)) { + System.out.println(diag); + } + + TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2); + Assert.assertEquals(TaskCompletionEvent.Status.FAILED, + events[0].getStatus().FAILED); + Assert.assertEquals(TaskCompletionEvent.Status.FAILED, + events[1].getStatus().FAILED); + Assert.assertEquals(JobStatus.State.FAILED, job.getJobState()); + } + +//@Test + public void testSleepJobWithSecurityOn() throws IOException, + InterruptedException, ClassNotFoundException { + + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + return; + } + + mrCluster.getConfig().set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + mrCluster.getConfig().set(RMConfig.RM_KEYTAB, "/etc/krb5.keytab"); + mrCluster.getConfig().set(NMConfig.NM_KEYTAB, "/etc/krb5.keytab"); + mrCluster.getConfig().set(YarnConfiguration.RM_SERVER_PRINCIPAL_KEY, + "rm/sightbusy-lx@LOCALHOST"); + mrCluster.getConfig().set(YarnServerConfig.NM_SERVER_PRINCIPAL_KEY, + "nm/sightbusy-lx@LOCALHOST"); + UserGroupInformation.setConfiguration(mrCluster.getConfig()); + + // Keep it in here instead of after RM/NM as multiple user logins happen in + // the same JVM. + UserGroupInformation user = UserGroupInformation.getCurrentUser(); + + LOG.info("User name is " + user.getUserName()); + for (Token str : user.getTokens()) { + LOG.info("Token is " + str.encodeToUrlString()); + } + user.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + SleepJob sleepJob = new SleepJob(); + sleepJob.setConf(mrCluster.getConfig()); + Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0); + // //Job with reduces + // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1); + // TODO: We should not be setting MRAppJar as job.jar. It should be + // uploaded separately by YarnRunner. + job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath()); + job.waitForCompletion(true); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + return null; + } + }); + } + +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml Thu Mar 17 20:21:13 2011 @@ -0,0 +1,39 @@ + + + hadoop-mapreduce-client + org.apache.hadoop + ${yarn.version} + + 4.0.0 + org.apache.hadoop + hadoop-mapreduce-client-shuffle + hadoop-mapreduce-client-shuffle + ${yarn.version} + http://maven.apache.org + + + + org.apache.hadoop + yarn + ${yarn.version} + pom + + + org.apache.hadoop + yarn-server + ${yarn.version} + pom + + + org.apache.hadoop + yarn-server-nodemanager + ${yarn.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${yarn.version} + + + + Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1082677&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Mar 17 20:21:13 2011 @@ -0,0 +1,427 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapred; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.Executors; +import java.util.Map; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; +import javax.crypto.SecretKey; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.mapred.IndexCache; + +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelFutureProgressListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.DefaultFileRegion; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.FileRegion; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.frame.TooLongFrameException; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.QueryStringDecoder; +import org.jboss.netty.handler.stream.ChunkedWriteHandler; +import org.jboss.netty.util.CharsetUtil; + +import static org.jboss.netty.buffer.ChannelBuffers.*; +import static org.jboss.netty.handler.codec.http.HttpHeaders.*; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*; +import static org.jboss.netty.handler.codec.http.HttpMethod.*; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.DataOutputByteBuffer; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; + +import org.apache.hadoop.yarn.ApplicationID; + +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.server.nodemanager.NMConfig; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer; +import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.AvroUtil; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; + +// DEBUG +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.log4j.Level; + +// TODO packaging +public class ShuffleHandler extends AbstractService + implements AuxServices.AuxiliaryService { + + private static final Log LOG = LogFactory.getLog(ShuffleHandler.class); + static { + //DEBUG + ((Log4JLogger)LOG).getLogger().setLevel(Level.DEBUG); + } + + private int port; + private ChannelFactory selector; + private final ChannelGroup accepted = new DefaultChannelGroup(); + + public static final String MAPREDUCE_SHUFFLE_SERVICEID = + "mapreduce.shuffle"; + + private static final Map userRsrc = + new ConcurrentHashMap(); + private static final JobTokenSecretManager secretManager = + new JobTokenSecretManager(); + + public static final String SHUFFLE_PORT = "mapreduce.shuffle.port"; + + public ShuffleHandler() { + super("httpshuffle"); + } + + @Override + public void initApp(String user, ApplicationID appId, ByteBuffer secret) { + // TODO these bytes should be versioned + try { + DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(secret); + Token jt = new Token(); + jt.readFields(in); + // TODO: Once SHuffle is out of NM, this can use MR APIs + JobID jobId = new JobID(Long.toString(appId.clusterTimeStamp), appId.id); + userRsrc.put(jobId.toString(), user); + LOG.info("Added token for " + jobId.toString()); + secretManager.addTokenForJob(jobId.toString(), jt); + } catch (IOException e) { + LOG.error("Error during initApp", e); + // TODO add API to AuxiliaryServices to report failures + } + } + + @Override + public void stopApp(ApplicationID appId) { + JobID jobId = new JobID(Long.toString(appId.clusterTimeStamp), appId.id); + secretManager.removeTokenForJob(jobId.toString()); + } + + @Override + public synchronized void init(Configuration conf) { + selector = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); + super.init(new Configuration(conf)); + } + + // TODO change AbstractService to throw InterruptedException + @Override + public synchronized void start() { + Configuration conf = getConfig(); + ServerBootstrap bootstrap = new ServerBootstrap(selector); + bootstrap.setPipelineFactory(new HttpPipelineFactory(conf)); + int port = conf.getInt("mapreduce.shuffle.port", 8080); + accepted.add(bootstrap.bind(new InetSocketAddress(port))); + LOG.info(getName() + " listening on port " + port); + super.start(); + } + + @Override + public synchronized void stop() { + accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); + ServerBootstrap bootstrap = new ServerBootstrap(selector); + bootstrap.releaseExternalResources(); + super.stop(); + } + + public static class HttpPipelineFactory implements ChannelPipelineFactory { + + private final Shuffle SHUFFLE; + + public HttpPipelineFactory(Configuration conf) { + SHUFFLE = new Shuffle(conf); + } + + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline( + new HttpRequestDecoder(), + new HttpChunkAggregator(1 << 16), + new HttpResponseEncoder(), + new ChunkedWriteHandler(), + SHUFFLE); + // TODO factor security manager into pipeline + // TODO factor out encode/decode to permit binary shuffle + // TODO factor out decode of index to permit alt. models + } + + } + + static class Shuffle extends SimpleChannelUpstreamHandler { + + private final Configuration conf; + private final IndexCache indexCache; + private final LocalDirAllocator lDirAlloc = + new LocalDirAllocator(NMConfig.NM_LOCAL_DIR); + + public Shuffle(Configuration conf) { + this.conf = conf; + indexCache = new IndexCache(new JobConf(conf)); + } + + private static List splitMaps(List mapq) { + if (null == mapq) { + return null; + } + final List ret = new ArrayList(); + for (String s : mapq) { + Collections.addAll(ret, s.split(",")); + } + return ret; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) + throws Exception { + HttpRequest request = (HttpRequest) evt.getMessage(); + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + final Map> q = + new QueryStringDecoder(request.getUri()).getParameters(); + final List mapIds = splitMaps(q.get("map")); + final List reduceQ = q.get("reduce"); + final List jobQ = q.get("job"); + if (LOG.isDebugEnabled()) { + LOG.debug("RECV: " + request.getUri() + + "\n mapId: " + mapIds + + "\n reduceId: " + reduceQ + + "\n jobId: " + jobQ); + } + + if (mapIds == null || reduceQ == null || jobQ == null) { + sendError(ctx, "Required param job, map and reduce", BAD_REQUEST); + return; + } + if (reduceQ.size() != 1 || jobQ.size() != 1) { + sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST); + return; + } + int reduceId; + String jobId; + try { + reduceId = Integer.parseInt(reduceQ.get(0)); + jobId = jobQ.get(0); + } catch (NumberFormatException e) { + sendError(ctx, "Bad reduce parameter", BAD_REQUEST); + return; + } catch (IllegalArgumentException e) { + sendError(ctx, "Bad job parameter", BAD_REQUEST); + return; + } + final String reqUri = request.getUri(); + if (null == reqUri) { + // TODO? add upstream? + sendError(ctx, FORBIDDEN); + return; + } + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + try { + verifyRequest(jobId, ctx, request, response, + new URL("http", "", 8080, reqUri)); + } catch (IOException e) { + LOG.warn("Shuffle failure ", e); + sendError(ctx, e.getMessage(), UNAUTHORIZED); + return; + } + + Channel ch = evt.getChannel(); + ch.write(response); + // TODO refactor the following into the pipeline + ChannelFuture lastMap = null; + for (String mapId : mapIds) { + try { + lastMap = + sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId); + if (null == lastMap) { + sendError(ctx, NOT_FOUND); + return; + } + } catch (IOException e) { + LOG.error("Shuffle error ", e); + sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR); + return; + } + } + lastMap.addListener(ChannelFutureListener.CLOSE); + } + + private void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid); + if (null == tokenSecret) { + LOG.info("Request for unknown token " + appid); + throw new IOException("could not find jobid"); + } + // string to encrypt + String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri); + // hash from the fetcher + String urlHashStr = + request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH); + if (urlHashStr == null) { + LOG.info("Missing header hash for " + appid); + throw new IOException("fetcher cannot be authenticated"); + } + if (LOG.isDebugEnabled()) { + int len = urlHashStr.length(); + LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." + + urlHashStr.substring(len-len/2, len-1)); + } + // verify - throws exception + SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret); + // verification passed - encode the reply + String reply = + SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret); + response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); + if (LOG.isDebugEnabled()) { + int len = reply.length(); + LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" + + reply.substring(len-len/2, len-1)); + } + } + + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, + String user, String jobId, String mapId, int reduce) + throws IOException { + // TODO replace w/ rsrc alloc + // $x/$user/appcache/$appId/output/$mapId + // TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job + JobID jobID = JobID.forName(jobId); + ApplicationID appID = new ApplicationID(); + appID.clusterTimeStamp = Long.parseLong(jobID.getJtIdentifier()); + appID.id = jobID.getId(); + final String base = + ApplicationLocalizer.USERCACHE + "/" + user + "/" + + ApplicationLocalizer.APPCACHE + "/" + + AvroUtil.toString(appID) + "/output" + "/" + mapId; + LOG.debug("DEBUG0 " + base); + // Index file + Path indexFileName = lDirAlloc.getLocalPathToRead( + base + "/file.out.index", conf); + // Map-output file + Path mapOutputFileName = lDirAlloc.getLocalPathToRead( + base + "/file.out", conf); + LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " + + indexFileName); + IndexRecord info = + indexCache.getIndexInformation(mapId, reduce, indexFileName); + final ShuffleHeader header = + new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); + final DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + File spillfile = new File(mapOutputFileName.toString()); + RandomAccessFile spill; + try { + spill = new RandomAccessFile(spillfile, "r"); + } catch (FileNotFoundException e) { + LOG.info(spillfile + " not found"); + return null; + } + final FileRegion partition = new DefaultFileRegion( + spill.getChannel(), info.startOffset, info.partLength); + ChannelFuture writeFuture = ch.write(partition); + writeFuture.addListener(new ChannelFutureListener() { + // TODO error handling; distinguish IO/connection failures, + // attribute to appropriate spill output + @Override + public void operationComplete(ChannelFuture future) { + partition.releaseExternalResources(); + } + }); + return writeFuture; + } + + private void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + sendError(ctx, "", status); + } + + private void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent( + ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + } + + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + Channel ch = e.getChannel(); + Throwable cause = e.getCause(); + if (cause instanceof TooLongFrameException) { + sendError(ctx, BAD_REQUEST); + return; + } + + cause.printStackTrace(); + if (ch.isConnected()) { + LOG.error("Shuffle error " + e); + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + + } +}