Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 17A15FC56 for ; Mon, 1 Apr 2013 16:48:38 +0000 (UTC) Received: (qmail 98027 invoked by uid 500); 1 Apr 2013 16:48:37 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 97926 invoked by uid 500); 1 Apr 2013 16:48:37 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 97429 invoked by uid 99); 1 Apr 2013 16:48:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Apr 2013 16:48:32 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Apr 2013 16:48:27 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DAF522388BF1; Mon, 1 Apr 2013 16:47:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1463203 [3/8] - in /hadoop/common/branches/HDFS-347/hadoop-yarn-project: ./ hadoop-yarn/ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org... Date: Mon, 01 Apr 2013 16:47:34 -0000 To: yarn-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130401164742.DAF522388BF1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java Mon Apr 1 16:47:16 2013 @@ -23,6 +23,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Shell; /** * Plugin to calculate resource information on the system. @@ -31,6 +32,18 @@ import org.apache.hadoop.util.Reflection @InterfaceAudience.Private @InterfaceStability.Unstable public abstract class ResourceCalculatorPlugin extends Configured { + + protected String processPid = null; + + /** + * set the pid of the process for which getProcResourceValues + * will be invoked + * + * @param pid + */ + public void setProcessPid(String pid) { + processPid = pid; + } /** * Obtain the total size of the virtual memory present in the system. @@ -109,10 +122,12 @@ public abstract class ResourceCalculator // No class given, try a os specific class try { - String osName = System.getProperty("os.name"); - if (osName.startsWith("Linux")) { + if (Shell.LINUX) { return new LinuxResourceCalculatorPlugin(); } + if (Shell.WINDOWS) { + return new WindowsResourceCalculatorPlugin(); + } } catch (SecurityException se) { // Failed to get Operating System name. return null; Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorProcessTree.java Mon Apr 1 16:47:16 2013 @@ -145,14 +145,11 @@ public abstract class ResourceCalculator } // No class given, try a os specific class - try { - String osName = System.getProperty("os.name"); - if (osName.startsWith("Linux")) { - return new ProcfsBasedProcessTree(pid); - } - } catch (SecurityException se) { - // Failed to get Operating System name. - return null; + if (ProcfsBasedProcessTree.isAvailable()) { + return new ProcfsBasedProcessTree(pid); + } + if (WindowsBasedProcessTree.isAvailable()) { + return new WindowsBasedProcessTree(pid); } // Not supported on this system. Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/InfoBlock.java Mon Apr 1 16:47:16 2013 @@ -20,7 +20,11 @@ package org.apache.hadoop.yarn.webapp.vi import org.apache.hadoop.yarn.webapp.ResponseInfo; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; -import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR; + import com.google.inject.Inject; @@ -47,7 +51,19 @@ public class InfoBlock extends HtmlBlock String value = String.valueOf(item.value); if (item.url == null) { if (!item.isRaw) { - tr.td(value); + TD>>> td = tr.td(); + if ( value.lastIndexOf('\n') > 0) { + String []lines = value.split("\n"); + DIV>>>> singleLineDiv; + for ( String line :lines) { + singleLineDiv = td.div(); + singleLineDiv._r(line); + singleLineDiv._(); + } + } else { + td._r(value); + } + td._(); } else { tr.td()._r(value)._(); } Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/JQueryUI.java Mon Apr 1 16:47:16 2013 @@ -107,12 +107,21 @@ public class JQueryUI extends HtmlBlock protected void initDataTables(List list) { String defaultInit = "{bJQueryUI: true, sPaginationType: 'full_numbers'}"; + String stateSaveInit = "bStateSave : true, " + + "\"fnStateSave\": function (oSettings, oData) { " + + "sessionStorage.setItem( oSettings.sTableId, JSON.stringify(oData) ); }, " + + "\"fnStateLoad\": function (oSettings) { " + + "return JSON.parse( sessionStorage.getItem(oSettings.sTableId) );}, "; + for (String id : split($(DATATABLES_ID))) { if (Html.isValidId(id)) { String init = $(initID(DATATABLES, id)); if (init.isEmpty()) { init = defaultInit; } + // for inserting stateSaveInit + int pos = init.indexOf('{') + 1; + init = new StringBuffer(init).insert(pos, stateSaveInit).toString(); list.add(join(id,"DataTable = $('#", id, "').dataTable(", init, ").fnSetFilteringDelay(188);")); String postInit = $(postInitID(DATATABLES, id)); @@ -126,9 +135,12 @@ public class JQueryUI extends HtmlBlock String init = $(initSelector(DATATABLES)); if (init.isEmpty()) { init = defaultInit; - } + } + int pos = init.indexOf('{') + 1; + init = new StringBuffer(init).insert(pos, stateSaveInit).toString(); list.add(join(" $('", escapeJavaScript(selector), "').dataTable(", init, - ").fnSetFilteringDelay(288);")); + ").fnSetFilteringDelay(288);")); + } } Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Mon Apr 1 16:47:16 2013 @@ -135,8 +135,12 @@ - The maximum number of application master retries. - yarn.resourcemanager.am.max-retries + The maximum number of application attempts. It's a global + setting for all application masters. Each application master can specify + its individual maximum number of application attempts via the API, but the + individual number cannot be more than the global upper bound. If it is, + the resourcemanager will override it. + yarn.resourcemanager.am.max-attempts 1 @@ -448,6 +452,20 @@ + Whether physical memory limits will be enforced for + containers. + yarn.nodemanager.pmem-check-enabled + true + + + + Whether virtual memory limits will be enforced for + containers. + yarn.nodemanager.vmem-check-enabled + true + + + Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage @@ -597,6 +615,20 @@ 2000 + + Max time, in seconds, to wait to establish a connection to RM when NM starts. + The NM will shutdown if it cannot connect to RM within the specified max time period. + If the value is set as -1, then NM will retry forever. + yarn.nodemanager.resourcemanager.connect.wait.secs + 900 + + + + Time interval, in seconds, between each NM attempt to connect to RM. + yarn.nodemanager.resourcemanager.connect.retry_interval.secs + 30 + + yarn.nodemanager.aux-services.mapreduce.shuffle.class Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Mon Apr 1 16:47:16 2013 @@ -23,9 +23,9 @@ import junit.framework.Assert; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; -import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.impl.pb.AMResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; import org.junit.Test; public class TestRecordFactory { @@ -35,15 +35,17 @@ public class TestRecordFactory { RecordFactory pbRecordFactory = RecordFactoryPBImpl.get(); try { - AMResponse response = pbRecordFactory.newRecordInstance(AMResponse.class); - Assert.assertEquals(AMResponsePBImpl.class, response.getClass()); + AllocateResponse response = + pbRecordFactory.newRecordInstance(AllocateResponse.class); + Assert.assertEquals(AllocateResponsePBImpl.class, response.getClass()); } catch (YarnException e) { e.printStackTrace(); Assert.fail("Failed to crete record"); } try { - AllocateRequest response = pbRecordFactory.newRecordInstance(AllocateRequest.class); + AllocateRequest response = + pbRecordFactory.newRecordInstance(AllocateRequest.class); Assert.assertEquals(AllocateRequestPBImpl.class, response.getClass()); } catch (YarnException e) { e.printStackTrace(); Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Mon Apr 1 16:47:16 2013 @@ -25,6 +25,7 @@ import static org.junit.Assert.assertEqu import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.util.EnumSet; import java.util.HashMap; @@ -47,10 +48,12 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -113,7 +116,127 @@ public class TestFSDownload { return ret; } - @Test + static LocalResource createTarFile(FileContext files, Path p, int len, + Random r, LocalResourceVisibility vis) throws IOException, + URISyntaxException { + + FSDataOutputStream outFile = null; + try { + byte[] bytes = new byte[len]; + Path tarPath = new Path(p.toString()); + outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE)); + r.nextBytes(bytes); + outFile.write(bytes); + } finally { + if (outFile != null) + outFile.close(); + } + StringBuffer tarCommand = new StringBuffer(); + URI u = new URI(p.getParent().toString()); + tarCommand.append("cd '"); + tarCommand.append(FileUtil.makeShellPath(u.getPath().toString())); + tarCommand.append("' ; "); + tarCommand.append("tar -czf " + p.getName() + ".tar " + p.getName()); + String[] shellCmd = { "bash", "-c", tarCommand.toString() }; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error untarring file " + p + + ". Tar process exited with exit code " + exitcode); + } + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + + ".tar"))); + ret.setSize(len); + ret.setType(LocalResourceType.ARCHIVE); + ret.setVisibility(vis); + ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar")) + .getModificationTime()); + return ret; + } + + static LocalResource createJarFile(FileContext files, Path p, int len, + Random r, LocalResourceVisibility vis) throws IOException, + URISyntaxException { + + FSDataOutputStream outFile = null; + try { + byte[] bytes = new byte[len]; + Path tarPath = new Path(p.toString()); + outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE)); + r.nextBytes(bytes); + outFile.write(bytes); + } finally { + if (outFile != null) + outFile.close(); + } + StringBuffer tarCommand = new StringBuffer(); + URI u = new URI(p.getParent().toString()); + tarCommand.append("cd '"); + tarCommand.append(FileUtil.makeShellPath(u.getPath().toString())); + tarCommand.append("' ; "); + tarCommand.append("jar cf " + p.getName() + ".jar " + p.getName()); + String[] shellCmd = { "bash", "-c", tarCommand.toString() }; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error untarring file " + p + + ". Tar process exited with exit code " + exitcode); + } + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + + ".jar"))); + ret.setSize(len); + ret.setType(LocalResourceType.ARCHIVE); + ret.setVisibility(vis); + ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".jar")) + .getModificationTime()); + return ret; + } + + static LocalResource createZipFile(FileContext files, Path p, int len, + Random r, LocalResourceVisibility vis) throws IOException, + URISyntaxException { + + FSDataOutputStream outFile = null; + try { + byte[] bytes = new byte[len]; + Path tarPath = new Path(p.toString()); + outFile = files.create(tarPath, EnumSet.of(CREATE, OVERWRITE)); + r.nextBytes(bytes); + outFile.write(bytes); + } finally { + if (outFile != null) + outFile.close(); + } + StringBuffer zipCommand = new StringBuffer(); + URI u = new URI(p.getParent().toString()); + zipCommand.append("cd '"); + zipCommand.append(FileUtil.makeShellPath(u.getPath().toString())); + zipCommand.append("' ; "); + zipCommand.append("gzip " + p.getName()); + String[] shellCmd = { "bash", "-c", zipCommand.toString() }; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error untarring file " + p + + ". Tar process exited with exit code " + exitcode); + } + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString() + + ".zip"))); + ret.setSize(len); + ret.setType(LocalResourceType.ARCHIVE); + ret.setVisibility(vis); + ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".gz")) + .getModificationTime()); + return ret; + } + + @Test (timeout=10000) public void testDownloadBadPublic() throws IOException, URISyntaxException, InterruptedException { Configuration conf = new Configuration(); @@ -161,7 +284,7 @@ public class TestFSDownload { } } - @Test + @Test (timeout=10000) public void testDownload() throws IOException, URISyntaxException, InterruptedException { Configuration conf = new Configuration(); @@ -229,6 +352,175 @@ public class TestFSDownload { } } + @SuppressWarnings("deprecation") + @Test (timeout=10000) + public void testDownloadArchive() throws IOException, URISyntaxException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + System.out.println("SEED: " + sharedSeed); + + Map> pending = new HashMap>(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + LocalDirAllocator dirs = new LocalDirAllocator( + TestFSDownload.class.getName()); + + int size = rand.nextInt(512) + 512; + LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; + + Path p = new Path(basedir, "" + 1); + LocalResource rsrc = createTarFile(files, p, size, rand, vis); + Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + FSDownload fsd = new FSDownload(files, + UserGroupInformation.getCurrentUser(), conf, destPath, rsrc, + new Random(sharedSeed)); + pending.put(rsrc, exec.submit(fsd)); + + try { + FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus( + basedir); + for (FileStatus filestatus : filesstatus) { + if (filestatus.isDir()) { + FileStatus[] childFiles = files.getDefaultFileSystem().listStatus( + filestatus.getPath()); + for (FileStatus childfile : childFiles) { + if (childfile.getPath().getName().equalsIgnoreCase("1.tar.tmp")) { + Assert.fail("Tmp File should not have been there " + + childfile.getPath()); + } + } + } + } + }catch (Exception e) { + throw new IOException("Failed exec", e); + } + finally { + exec.shutdown(); + } + } + + @SuppressWarnings("deprecation") + @Test (timeout=10000) + public void testDownloadPatternJar() throws IOException, URISyntaxException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + System.out.println("SEED: " + sharedSeed); + + Map> pending = new HashMap>(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + LocalDirAllocator dirs = new LocalDirAllocator( + TestFSDownload.class.getName()); + + int size = rand.nextInt(512) + 512; + LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; + + Path p = new Path(basedir, "" + 1); + LocalResource rsrcjar = createJarFile(files, p, size, rand, vis); + rsrcjar.setType(LocalResourceType.PATTERN); + Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + FSDownload fsdjar = new FSDownload(files, + UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar, + new Random(sharedSeed)); + pending.put(rsrcjar, exec.submit(fsdjar)); + + try { + FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus( + basedir); + for (FileStatus filestatus : filesstatus) { + if (filestatus.isDir()) { + FileStatus[] childFiles = files.getDefaultFileSystem().listStatus( + filestatus.getPath()); + for (FileStatus childfile : childFiles) { + if (childfile.getPath().getName().equalsIgnoreCase("1.jar.tmp")) { + Assert.fail("Tmp File should not have been there " + + childfile.getPath()); + } + } + } + } + }catch (Exception e) { + throw new IOException("Failed exec", e); + } + finally { + exec.shutdown(); + } + } + + @SuppressWarnings("deprecation") + @Test (timeout=10000) + public void testDownloadArchiveZip() throws IOException, URISyntaxException, + InterruptedException { + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext files = FileContext.getLocalFSFileContext(conf); + final Path basedir = files.makeQualified(new Path("target", + TestFSDownload.class.getSimpleName())); + files.mkdir(basedir, null, true); + conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); + + Random rand = new Random(); + long sharedSeed = rand.nextLong(); + rand.setSeed(sharedSeed); + System.out.println("SEED: " + sharedSeed); + + Map> pending = new HashMap>(); + ExecutorService exec = Executors.newSingleThreadExecutor(); + LocalDirAllocator dirs = new LocalDirAllocator( + TestFSDownload.class.getName()); + + int size = rand.nextInt(512) + 512; + LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE; + + Path p = new Path(basedir, "" + 1); + LocalResource rsrczip = createZipFile(files, p, size, rand, vis); + Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf); + FSDownload fsdzip = new FSDownload(files, + UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip, + new Random(sharedSeed)); + pending.put(rsrczip, exec.submit(fsdzip)); + + try { + FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus( + basedir); + for (FileStatus filestatus : filesstatus) { + if (filestatus.isDir()) { + FileStatus[] childFiles = files.getDefaultFileSystem().listStatus( + filestatus.getPath()); + for (FileStatus childfile : childFiles) { + if (childfile.getPath().getName().equalsIgnoreCase("1.gz.tmp")) { + Assert.fail("Tmp File should not have been there " + + childfile.getPath()); + } + } + } + } + }catch (Exception e) { + throw new IOException("Failed exec", e); + } + finally { + exec.shutdown(); + } + } + private void verifyPermsRecursively(FileSystem fs, FileContext files, Path p, LocalResourceVisibility vis) throws IOException { @@ -261,7 +553,7 @@ public class TestFSDownload { } } - @Test + @Test (timeout=10000) public void testDirDownload() throws IOException, InterruptedException { Configuration conf = new Configuration(); FileContext files = FileContext.getLocalFSFileContext(conf); Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java Mon Apr 1 16:47:16 2013 @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.util; +import static org.junit.Assert.fail; + import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -36,6 +38,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; @@ -104,17 +107,21 @@ public class TestProcfsBasedProcessTree new Path(TEST_ROOT_DIR.getAbsolutePath()), true); } - @Test + @Test (timeout = 30000) public void testProcessTree() throws Exception { + if (!Shell.LINUX) { + System.out + .println("ProcfsBasedProcessTree is not available on this system. Not testing"); + return; + + } try { - if (!ProcfsBasedProcessTree.isAvailable()) { - System.out - .println("ProcfsBasedProcessTree is not available on this system. Not testing"); - return; - } + Assert.assertTrue(ProcfsBasedProcessTree.isAvailable()); } catch (Exception e) { LOG.info(StringUtils.stringifyException(e)); + Assert.assertTrue("ProcfsBaseProcessTree should be available on Linux", + false); return; } // create shell script @@ -183,11 +190,20 @@ public class TestProcfsBasedProcessTree // destroy the process and all its subprocesses destroyProcessTree(pid); - if (isSetsidAvailable()) { // whole processtree should be gone - Assert.assertFalse("Proceesses in process group live", - isAnyProcessInTreeAlive(p)); - } else {// process should be gone - Assert.assertFalse("ProcessTree must have been gone", isAlive(pid)); + boolean isAlive = true; + for (int tries = 100; tries > 0; tries--) { + if (isSetsidAvailable()) {// whole processtree + isAlive = isAnyProcessInTreeAlive(p); + } else {// process + isAlive = isAlive(pid); + } + if (!isAlive) { + break; + } + Thread.sleep(100); + } + if (isAlive) { + fail("ProcessTree shouldn't be alive"); } LOG.info("Process-tree dump follows: \n" + processTreeDump); @@ -328,7 +344,7 @@ public class TestProcfsBasedProcessTree * @throws IOException if there was a problem setting up the * fake procfs directories or files. */ - @Test + @Test (timeout = 30000) public void testCpuAndMemoryForProcessTree() throws IOException { // test processes @@ -402,7 +418,7 @@ public class TestProcfsBasedProcessTree * @throws IOException if there was a problem setting up the * fake procfs directories or files. */ - @Test + @Test (timeout = 30000) public void testMemForOlderProcesses() throws IOException { // initial list of processes String[] pids = { "100", "200", "300", "400" }; @@ -509,7 +525,7 @@ public class TestProcfsBasedProcessTree * @throws IOException if there was a problem setting up the * fake procfs directories or files. */ - @Test + @Test (timeout = 30000) public void testDestroyProcessTree() throws IOException { // test process String pid = "100"; @@ -535,7 +551,7 @@ public class TestProcfsBasedProcessTree * * @throws IOException */ - @Test + @Test (timeout = 30000) public void testProcessTreeDump() throws IOException { Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java Mon Apr 1 16:47:16 2013 @@ -18,9 +18,13 @@ package org.apache.hadoop.yarn.util; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -30,9 +34,12 @@ import org.junit.Test; public class TestRackResolver { + private static Log LOG = LogFactory.getLog(TestRackResolver.class); + public static final class MyResolver implements DNSToSwitchMapping { int numHost1 = 0; + public static String resolvedHost1 = "host1"; @Override public List resolve(List hostList) { @@ -43,7 +50,10 @@ public class TestRackResolver { if (hostList.isEmpty()) { return returnList; } - if (hostList.get(0).equals("host1")) { + LOG.info("Received resolve request for " + + hostList.get(0)); + if (hostList.get(0).equals("host1") + || hostList.get(0).equals(resolvedHost1)) { numHost1++; returnList.add("/rack1"); } @@ -53,6 +63,10 @@ public class TestRackResolver { return returnList; } + @Override + public void reloadCachedMappings() { + // nothing to do here, since RawScriptBasedMapping has no cache. + } } @Test @@ -62,6 +76,12 @@ public class TestRackResolver { CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, MyResolver.class, DNSToSwitchMapping.class); RackResolver.init(conf); + try { + InetAddress iaddr = InetAddress.getByName("host1"); + MyResolver.resolvedHost1 = iaddr.getHostAddress(); + } catch (UnknownHostException e) { + // Ignore if not found + } Node node = RackResolver.resolve("host1"); Assert.assertEquals("/rack1", node.getNetworkLocation()); node = RackResolver.resolve("host1"); Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java Mon Apr 1 16:47:16 2013 @@ -18,10 +18,28 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; -import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; public interface NodeHeartbeatResponse { - public abstract HeartbeatResponse getHeartbeatResponse(); + int getResponseId(); + NodeAction getNodeAction(); + + List getContainersToCleanup(); + + List getApplicationsToCleanup(); + + void setResponseId(int responseId); + void setNodeAction(NodeAction action); + + MasterKey getMasterKey(); + void setMasterKey(MasterKey secretKey); + + void addAllContainersToCleanup(List containers); - public abstract void setHeartbeatResponse(HeartbeatResponse heartbeatResponse); + void addAllApplicationsToCleanup(List applications); } Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java Mon Apr 1 16:47:16 2013 @@ -18,11 +18,16 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; -import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; public interface RegisterNodeManagerResponse { - public abstract RegistrationResponse getRegistrationResponse(); - - public abstract void setRegistrationResponse(RegistrationResponse registrationResponse); + MasterKey getMasterKey(); + + void setMasterKey(MasterKey secretKey); + + NodeAction getNodeAction(); + + void setNodeAction(NodeAction nodeAction); } Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java Mon Apr 1 16:47:16 2013 @@ -18,14 +18,25 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.ArrayList; +import java.util.List; +import java.util.Iterator; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -34,8 +45,9 @@ public class NodeHeartbeatResponsePBImpl NodeHeartbeatResponseProto.Builder builder = null; boolean viaProto = false; - private HeartbeatResponse heartbeatResponse = null; - + private List containersToCleanup = null; + private List applicationsToCleanup = null; + private MasterKey masterKey = null; public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); @@ -54,8 +66,14 @@ public class NodeHeartbeatResponsePBImpl } private void mergeLocalToBuilder() { - if (this.heartbeatResponse != null) { - builder.setHeartbeatResponse(convertToProtoFormat(this.heartbeatResponse)); + if (this.containersToCleanup != null) { + addContainersToCleanupToProto(); + } + if (this.applicationsToCleanup != null) { + addApplicationsToCleanupToProto(); + } + if (this.masterKey != null) { + builder.setMasterKey(convertToProtoFormat(this.masterKey)); } } @@ -76,34 +94,213 @@ public class NodeHeartbeatResponsePBImpl @Override - public HeartbeatResponse getHeartbeatResponse() { + public int getResponseId() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + return (p.getResponseId()); + } + + @Override + public void setResponseId(int responseId) { + maybeInitBuilder(); + builder.setResponseId((responseId)); + } + + @Override + public MasterKey getMasterKey() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - if (this.heartbeatResponse != null) { - return this.heartbeatResponse; + if (this.masterKey != null) { + return this.masterKey; } - if (!p.hasHeartbeatResponse()) { + if (!p.hasMasterKey()) { return null; } - this.heartbeatResponse = convertFromProtoFormat(p.getHeartbeatResponse()); - return this.heartbeatResponse; + this.masterKey = convertFromProtoFormat(p.getMasterKey()); + return this.masterKey; + } + + @Override + public void setMasterKey(MasterKey masterKey) { + maybeInitBuilder(); + if (masterKey == null) + builder.clearMasterKey(); + this.masterKey = masterKey; + } + + @Override + public NodeAction getNodeAction() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeAction()) { + return null; + } + return (convertFromProtoFormat(p.getNodeAction())); + } + + @Override + public void setNodeAction(NodeAction nodeAction) { + maybeInitBuilder(); + if (nodeAction == null) { + builder.clearNodeAction(); + return; + } + builder.setNodeAction(convertToProtoFormat(nodeAction)); + } + + @Override + public List getContainersToCleanup() { + initContainersToCleanup(); + return this.containersToCleanup; + } + + private void initContainersToCleanup() { + if (this.containersToCleanup != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToCleanupList(); + this.containersToCleanup = new ArrayList(); + + for (ContainerIdProto c : list) { + this.containersToCleanup.add(convertFromProtoFormat(c)); + } } @Override - public void setHeartbeatResponse(HeartbeatResponse heartbeatResponse) { + public void addAllContainersToCleanup( + final List containersToCleanup) { + if (containersToCleanup == null) + return; + initContainersToCleanup(); + this.containersToCleanup.addAll(containersToCleanup); + } + + private void addContainersToCleanupToProto() { maybeInitBuilder(); - if (heartbeatResponse == null) - builder.clearHeartbeatResponse(); - this.heartbeatResponse = heartbeatResponse; + builder.clearContainersToCleanup(); + if (containersToCleanup == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = containersToCleanup.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllContainersToCleanup(iterable); + } + + @Override + public List getApplicationsToCleanup() { + initApplicationsToCleanup(); + return this.applicationsToCleanup; } - private HeartbeatResponsePBImpl convertFromProtoFormat(HeartbeatResponseProto p) { - return new HeartbeatResponsePBImpl(p); + private void initApplicationsToCleanup() { + if (this.applicationsToCleanup != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getApplicationsToCleanupList(); + this.applicationsToCleanup = new ArrayList(); + + for (ApplicationIdProto c : list) { + this.applicationsToCleanup.add(convertFromProtoFormat(c)); + } } - private HeartbeatResponseProto convertToProtoFormat(HeartbeatResponse t) { - return ((HeartbeatResponsePBImpl)t).getProto(); + @Override + public void addAllApplicationsToCleanup( + final List applicationsToCleanup) { + if (applicationsToCleanup == null) + return; + initApplicationsToCleanup(); + this.applicationsToCleanup.addAll(applicationsToCleanup); } + private void addApplicationsToCleanupToProto() { + maybeInitBuilder(); + builder.clearApplicationsToCleanup(); + if (applicationsToCleanup == null) + return; + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = applicationsToCleanup.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ApplicationIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllApplicationsToCleanup(iterable); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + private NodeAction convertFromProtoFormat(NodeActionProto p) { + return NodeAction.valueOf(p.name()); + } + + private NodeActionProto convertToProtoFormat(NodeAction t) { + return NodeActionProto.valueOf(t.name()); + } + + private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { + return new MasterKeyPBImpl(p); + } + + private MasterKeyProto convertToProtoFormat(MasterKey t) { + return ((MasterKeyPBImpl) t).getProto(); + } +} -} Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java Mon Apr 1 16:47:16 2013 @@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.ap import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; -import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; -import org.apache.hadoop.yarn.server.api.records.impl.pb.RegistrationResponsePBImpl; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -34,7 +36,7 @@ public class RegisterNodeManagerResponse RegisterNodeManagerResponseProto.Builder builder = null; boolean viaProto = false; - private RegistrationResponse registartionResponse = null; + private MasterKey masterKey = null; private boolean rebuild = false; @@ -56,9 +58,8 @@ public class RegisterNodeManagerResponse } private void mergeLocalToBuilder() { - if (this.registartionResponse != null) { - builder.setRegistrationResponse(convertToProtoFormat(this.registartionResponse)); - this.registartionResponse = null; + if (this.masterKey != null) { + builder.setMasterKey(convertToProtoFormat(this.masterKey)); } } @@ -77,39 +78,60 @@ public class RegisterNodeManagerResponse } viaProto = false; } - - + @Override - public RegistrationResponse getRegistrationResponse() { + public MasterKey getMasterKey() { RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder; - if (this.registartionResponse != null) { - return this.registartionResponse; + if (this.masterKey != null) { + return this.masterKey; } - if (!p.hasRegistrationResponse()) { + if (!p.hasMasterKey()) { return null; } - this.registartionResponse = convertFromProtoFormat(p.getRegistrationResponse()); - rebuild = true; - return this.registartionResponse; + this.masterKey = convertFromProtoFormat(p.getMasterKey()); + return this.masterKey; } @Override - public void setRegistrationResponse(RegistrationResponse registrationResponse) { + public void setMasterKey(MasterKey masterKey) { maybeInitBuilder(); - if (registrationResponse == null) - builder.clearRegistrationResponse(); - this.registartionResponse = registrationResponse; - rebuild = true; + if (masterKey == null) + builder.clearMasterKey(); + this.masterKey = masterKey; } - private RegistrationResponsePBImpl convertFromProtoFormat(RegistrationResponseProto p) { - return new RegistrationResponsePBImpl(p); + @Override + public NodeAction getNodeAction() { + RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder; + if(!p.hasNodeAction()) { + return null; + } + return convertFromProtoFormat(p.getNodeAction()); } - private RegistrationResponseProto convertToProtoFormat(RegistrationResponse t) { - return ((RegistrationResponsePBImpl)t).getProto(); + @Override + public void setNodeAction(NodeAction nodeAction) { + maybeInitBuilder(); + if (nodeAction == null) { + builder.clearNodeAction(); + return; + } + builder.setNodeAction(convertToProtoFormat(nodeAction)); } + private NodeAction convertFromProtoFormat(NodeActionProto p) { + return NodeAction.valueOf(p.name()); + } + private NodeActionProto convertToProtoFormat(NodeAction t) { + return NodeActionProto.valueOf(t.name()); + } + private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) { + return new MasterKeyPBImpl(p); + } + + private MasterKeyProto convertToProtoFormat(MasterKey t) { + return ((MasterKeyPBImpl)t).getProto(); + } } Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Mon Apr 1 16:47:16 2013 @@ -42,16 +42,3 @@ message MasterKeyProto { optional bytes bytes = 2; } -message RegistrationResponseProto { - optional MasterKeyProto master_key = 1; - optional NodeActionProto nodeAction = 2; -} - -message HeartbeatResponseProto { - optional int32 response_id = 1; - optional MasterKeyProto master_key = 2; - optional NodeActionProto nodeAction = 3; - repeated ContainerIdProto containers_to_cleanup = 4; - repeated ApplicationIdProto applications_to_cleanup = 5; -} - Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Mon Apr 1 16:47:16 2013 @@ -29,8 +29,10 @@ message RegisterNodeManagerRequestProto optional int32 http_port = 3; optional ResourceProto resource = 4; } + message RegisterNodeManagerResponseProto { - optional RegistrationResponseProto registration_response = 1; + optional MasterKeyProto master_key = 1; + optional NodeActionProto nodeAction = 2; } message NodeHeartbeatRequestProto { @@ -38,6 +40,11 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_master_key = 2; } + message NodeHeartbeatResponseProto { - optional HeartbeatResponseProto heartbeat_response = 1; + optional int32 response_id = 1; + optional MasterKeyProto master_key = 2; + optional NodeActionProto nodeAction = 3; + repeated ContainerIdProto containers_to_cleanup = 4; + repeated ApplicationIdProto applications_to_cleanup = 5; } Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRecordFactory.java Mon Apr 1 16:47:16 2013 @@ -25,8 +25,6 @@ import org.apache.hadoop.yarn.factories. import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; -import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl; import org.junit.Test; public class TestRecordFactory { @@ -34,15 +32,6 @@ public class TestRecordFactory { @Test public void testPbRecordFactory() { RecordFactory pbRecordFactory = RecordFactoryPBImpl.get(); - - try { - HeartbeatResponse response = pbRecordFactory.newRecordInstance(HeartbeatResponse.class); - Assert.assertEquals(HeartbeatResponsePBImpl.class, response.getClass()); - } catch (YarnException e) { - e.printStackTrace(); - Assert.fail("Failed to crete record"); - } - try { NodeHeartbeatRequest request = pbRecordFactory.newRecordInstance(NodeHeartbeatRequest.class); Assert.assertEquals(NodeHeartbeatRequestPBImpl.class, request.getClass()); Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Mon Apr 1 16:47:16 2013 @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -35,8 +37,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; +import org.apache.hadoop.util.Shell; public abstract class ContainerExecutor implements Configurable { @@ -182,6 +186,54 @@ public abstract class ContainerExecutor readLock.unlock(); } } + + /** + * Return a command to execute the given command in OS shell. + * On Windows, the passed in groupId can be used to launch + * and associate the given groupId in a process group. On + * non-Windows, groupId is ignored. + */ + protected static String[] getRunCommand(String command, String groupId, + Configuration conf) { + boolean containerSchedPriorityIsSet = false; + int containerSchedPriorityAdjustment = + YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY; + + if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != + null) { + containerSchedPriorityIsSet = true; + containerSchedPriorityAdjustment = conf + .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, + YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY); + } + + if (Shell.WINDOWS) { + return new String[] { Shell.WINUTILS, "task", "create", groupId, + "cmd /c " + command }; + } else { + List retCommand = new ArrayList(); + if (containerSchedPriorityIsSet) { + retCommand.addAll(Arrays.asList("nice", "-n", + Integer.toString(containerSchedPriorityAdjustment))); + } + retCommand.addAll(Arrays.asList("bash", "-c", command)); + return retCommand.toArray(new String[retCommand.size()]); + } + + } + + /** Return a command for determining if process with specified pid is alive. */ + protected static String[] getCheckProcessIsAliveCommand(String pid) { + return Shell.WINDOWS ? + new String[] { Shell.WINUTILS, "task", "isAlive", pid } : + new String[] { "kill", "-0", pid }; + } + + /** Return a command to send a signal to a given pid */ + protected static String[] getSignalKillCommand(int code, String pid) { + return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } : + new String[] { "kill", "-" + code, pid }; + } /** * Is the container still active? @@ -253,6 +305,9 @@ public abstract class ContainerExecutor public static final boolean isSetsidAvailable = isSetsidSupported(); private static boolean isSetsidSupported() { + if (Shell.WINDOWS) { + return true; + } ShellCommandExecutor shexec = null; boolean setsidSupported = true; try { Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Mon Apr 1 16:47:16 2013 @@ -37,6 +37,8 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -53,10 +55,9 @@ public class DefaultContainerExecutor ex private static final Log LOG = LogFactory .getLog(DefaultContainerExecutor.class); - private final FileContext lfs; + private static final int WIN_MAX_PATH = 260; - private static final String WRAPPER_LAUNCH_SCRIPT = - "default_container_executor.sh"; + private final FileContext lfs; public DefaultContainerExecutor() { try { @@ -145,15 +146,24 @@ public class DefaultContainerExecutor ex lfs.util().copy(nmPrivateTokensPath, tokenDst); // Create new local launch wrapper script - Path wrapperScriptDst = new Path(containerWorkDir, WRAPPER_LAUNCH_SCRIPT); - DataOutputStream wrapperScriptOutStream = - lfs.create(wrapperScriptDst, - EnumSet.of(CREATE, OVERWRITE)); + LocalWrapperScriptBuilder sb = Shell.WINDOWS ? + new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) : + new UnixLocalWrapperScriptBuilder(containerWorkDir); + + // Fail fast if attempting to launch the wrapper script would fail due to + // Windows path length limitation. + if (Shell.WINDOWS && + sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) { + throw new IOException(String.format( + "Cannot launch container using script at path %s, because it exceeds " + + "the maximum supported path length of %d characters. Consider " + + "configuring shorter directories in %s.", sb.getWrapperScriptPath(), + WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS)); + } Path pidFile = getPidFilePath(containerId); if (pidFile != null) { - writeLocalWrapperScript(wrapperScriptOutStream, launchDst.toUri() - .getPath().toString(), pidFile.toString()); + sb.writeLocalWrapperScript(launchDst, pidFile); } else { LOG.info("Container " + containerIdStr + " was marked as inactive. Returning terminated error"); @@ -166,12 +176,13 @@ public class DefaultContainerExecutor ex try { lfs.setPermission(launchDst, ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); - lfs.setPermission(wrapperScriptDst, + lfs.setPermission(sb.getWrapperScriptPath(), ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); // Setup command to run - String[] command = {"bash", - wrapperScriptDst.toUri().getPath().toString()}; + String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), + containerIdStr, this.getConf()); + LOG.info("launchContainer: " + Arrays.toString(command)); shExec = new ShellCommandExecutor( command, @@ -202,28 +213,85 @@ public class DefaultContainerExecutor ex return 0; } - private void writeLocalWrapperScript(DataOutputStream out, - String launchScriptDst, String pidFilePath) throws IOException { - // We need to do a move as writing to a file is not atomic - // Process reading a file being written to may get garbled data - // hence write pid to tmp file first followed by a mv - StringBuilder sb = new StringBuilder("#!/bin/bash\n\n"); - sb.append("echo $$ > " + pidFilePath + ".tmp\n"); - sb.append("/bin/mv -f " + pidFilePath + ".tmp " + pidFilePath + "\n"); - sb.append(ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec"); - sb.append(" /bin/bash "); - sb.append("\""); - sb.append(launchScriptDst); - sb.append("\"\n"); - PrintStream pout = null; - try { - pout = new PrintStream(out); - pout.append(sb); - } finally { - if (out != null) { - out.close(); + private abstract class LocalWrapperScriptBuilder { + + private final Path wrapperScriptPath; + + public Path getWrapperScriptPath() { + return wrapperScriptPath; + } + + public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException { + DataOutputStream out = null; + PrintStream pout = null; + + try { + out = lfs.create(wrapperScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out); + writeLocalWrapperScript(launchDst, pidFile, pout); + } finally { + IOUtils.cleanup(LOG, pout, out); } } + + protected abstract void writeLocalWrapperScript(Path launchDst, Path pidFile, + PrintStream pout); + + protected LocalWrapperScriptBuilder(Path wrapperScriptPath) { + this.wrapperScriptPath = wrapperScriptPath; + } + } + + private final class UnixLocalWrapperScriptBuilder + extends LocalWrapperScriptBuilder { + + public UnixLocalWrapperScriptBuilder(Path containerWorkDir) { + super(new Path(containerWorkDir, "default_container_executor.sh")); + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile, + PrintStream pout) { + + // We need to do a move as writing to a file is not atomic + // Process reading a file being written to may get garbled data + // hence write pid to tmp file first followed by a mv + pout.println("#!/bin/bash"); + pout.println(); + pout.println("echo $$ > " + pidFile.toString() + ".tmp"); + pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); + String exec = ContainerExecutor.isSetsidAvailable? "exec setsid" : "exec"; + pout.println(exec + " /bin/bash -c \"" + + launchDst.toUri().getPath().toString() + "\""); + } + } + + private final class WindowsLocalWrapperScriptBuilder + extends LocalWrapperScriptBuilder { + + private final String containerIdStr; + + public WindowsLocalWrapperScriptBuilder(String containerIdStr, + Path containerWorkDir) { + + super(new Path(containerWorkDir, "default_container_executor.cmd")); + this.containerIdStr = containerIdStr; + } + + @Override + public void writeLocalWrapperScript(Path launchDst, Path pidFile, + PrintStream pout) { + + // On Windows, the pid is the container ID, so that it can also serve as + // the name of the job object created by winutils for task management. + // Write to temp file followed by atomic move. + String normalizedPidFile = new File(pidFile.toString()).getPath(); + pout.println("@echo " + containerIdStr + " > " + normalizedPidFile + + ".tmp"); + pout.println("@move /Y " + normalizedPidFile + ".tmp " + + normalizedPidFile); + pout.println("@call " + launchDst.toString()); + } } @Override @@ -234,17 +302,13 @@ public class DefaultContainerExecutor ex : pid; LOG.debug("Sending signal " + signal.getValue() + " to pid " + sigpid + " as user " + user); - try { - sendSignal(sigpid, Signal.NULL); - } catch (ExitCodeException e) { + if (!containerIsAlive(sigpid)) { return false; } try { - sendSignal(sigpid, signal); + killContainer(sigpid, signal); } catch (IOException e) { - try { - sendSignal(sigpid, Signal.NULL); - } catch (IOException ignore) { + if (!containerIsAlive(sigpid)) { return false; } throw e; @@ -253,17 +317,33 @@ public class DefaultContainerExecutor ex } /** + * Returns true if the process with the specified pid is alive. + * + * @param pid String pid + * @return boolean true if the process is alive + */ + private boolean containerIsAlive(String pid) throws IOException { + try { + new ShellCommandExecutor(getCheckProcessIsAliveCommand(pid)).execute(); + // successful execution means process is alive + return true; + } + catch (ExitCodeException e) { + // failure (non-zero exit code) means process is not alive + return false; + } + } + + /** * Send a specified signal to the specified pid * * @param pid the pid of the process [group] to signal. * @param signal signal to send * (for logging). */ - protected void sendSignal(String pid, Signal signal) throws IOException { - ShellCommandExecutor shexec = null; - String[] arg = { "kill", "-" + signal.getValue(), pid }; - shexec = new ShellCommandExecutor(arg); - shexec.execute(); + private void killContainer(String pid, Signal signal) throws IOException { + new ShellCommandExecutor(getSignalKillCommand(signal.getValue(), pid)) + .execute(); } @Override Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Mon Apr 1 16:47:16 2013 @@ -50,6 +50,8 @@ public class LinuxContainerExecutor exte private String containerExecutorExe; private LCEResourcesHandler resourcesHandler; + private boolean containerSchedPriorityIsSet = false; + private int containerSchedPriorityAdjustment = 0; @Override @@ -61,6 +63,13 @@ public class LinuxContainerExecutor exte conf.getClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER, DefaultLCEResourcesHandler.class, LCEResourcesHandler.class), conf); resourcesHandler.setConf(conf); + + if (conf.get(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY) != null) { + containerSchedPriorityIsSet = true; + containerSchedPriorityAdjustment = conf + .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, + YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY); + } } /** @@ -114,6 +123,13 @@ public class LinuxContainerExecutor exte : conf.get(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, defaultPath); } + protected void addSchedPriorityCommand(List command) { + if (containerSchedPriorityIsSet) { + command.addAll(Arrays.asList("nice", "-n", + Integer.toString(containerSchedPriorityAdjustment))); + } + } + @Override public void init() throws IOException { // Send command to executor which will just start up, @@ -145,14 +161,15 @@ public class LinuxContainerExecutor exte List localDirs, List logDirs) throws IOException, InterruptedException { - List command = new ArrayList( - Arrays.asList(containerExecutorExe, - user, - Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()), - appId, - nmPrivateContainerTokensPath.toUri().getPath().toString(), - StringUtils.join(",", localDirs), - StringUtils.join(",", logDirs))); + List command = new ArrayList(); + addSchedPriorityCommand(command); + command.addAll(Arrays.asList(containerExecutorExe, + user, + Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()), + appId, + nmPrivateContainerTokensPath.toUri().getPath().toString(), + StringUtils.join(",", localDirs), + StringUtils.join(",", logDirs))); File jvm = // use same jvm as parent new File(new File(System.getProperty("java.home"), "bin"), "java"); @@ -212,7 +229,9 @@ public class LinuxContainerExecutor exte try { Path pidFilePath = getPidFilePath(containerId); if (pidFilePath != null) { - List command = new ArrayList(Arrays.asList( + List command = new ArrayList(); + addSchedPriorityCommand(command); + command.addAll(Arrays.asList( containerExecutorExe, user, Integer .toString(Commands.LAUNCH_CONTAINER.getValue()), appId, containerIdStr, containerWorkDir.toString(), Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Mon Apr 1 16:47:16 2013 @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.no import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Timer; @@ -305,7 +304,7 @@ public class LocalDirsHandlerService ext ArrayList validPaths = new ArrayList(); for (int i = 0; i < paths.length; ++i) { try { - URI uriPath = new URI(paths[i]); + URI uriPath = (new Path(paths[i])).toUri(); if (uriPath.getScheme() == null || uriPath.getScheme().equals(FILE_SCHEME)) { validPaths.add(uriPath.getPath()); @@ -316,7 +315,7 @@ public class LocalDirsHandlerService ext + " is not a valid path. Path should be with " + FILE_SCHEME + " scheme or without scheme"); } - } catch (URISyntaxException e) { + } catch (IllegalArgumentException e) { LOG.warn(e.getMessage()); throw new YarnException(paths[i] + " is not a valid path. Path should be with " + FILE_SCHEME Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Apr 1 16:47:16 2013 @@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.Co import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.Records; +import com.google.common.annotations.VisibleForTesting; + public class NodeManager extends CompositeService implements EventHandler { @@ -113,6 +115,10 @@ public class NodeManager extends Composi return new WebServer(nmContext, resourceView, aclsManager, dirsHandler); } + protected DeletionService createDeletionService(ContainerExecutor exec) { + return new DeletionService(exec); + } + protected void doSecureLogin() throws IOException { SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB, YarnConfiguration.NM_PRINCIPAL); @@ -143,7 +149,7 @@ public class NodeManager extends Composi } catch (IOException e) { throw new YarnException("Failed to initialize container executor", e); } - DeletionService del = new DeletionService(exec); + DeletionService del = createDeletionService(exec); addService(del); // NodeManager level dispatcher @@ -350,6 +356,11 @@ public class NodeManager extends Composi ContainerManagerImpl getContainerManager() { return containerManager; } + + @VisibleForTesting + Context getNMContext() { + return this.context; + } public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); Modified: hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff ============================================================================== --- hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original) +++ hadoop/common/branches/HDFS-347/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Apr 1 16:47:16 2013 @@ -50,12 +50,12 @@ import org.apache.hadoop.yarn.factory.pr import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; -import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; -import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; @@ -151,7 +151,6 @@ public class NodeStatusUpdaterImpl exten YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_NM_WEBAPP_PORT); try { - // this.hostName = InetAddress.getLocalHost().getCanonicalHostName(); this.httpPort = httpBindAddress.getPort(); // Registration has to be in start so that ContainerManager can get the // perNM tokens needed to authenticate ContainerTokens. @@ -189,23 +188,91 @@ public class NodeStatusUpdaterImpl exten } private void registerWithRM() throws YarnRemoteException { - this.resourceTracker = getRMClient(); - LOG.info("Connecting to ResourceManager at " + this.rmAddress); - - RegisterNodeManagerRequest request = recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); + Configuration conf = getConfig(); + long rmConnectWaitMS = + conf.getInt( + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) + * 1000; + long rmConnectionRetryIntervalMS = + conf.getLong( + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + YarnConfiguration + .DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS) + * 1000; + + if(rmConnectionRetryIntervalMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + " should not be negative."); + } + + boolean waitForEver = (rmConnectWaitMS == -1000); + + if(! waitForEver) { + if(rmConnectWaitMS < 0) { + throw new YarnException("Invalid Configuration. " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " can be -1, but can not be other negative numbers"); + } + + //try connect once + if(rmConnectWaitMS < rmConnectionRetryIntervalMS) { + LOG.warn(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS + + " is smaller than " + + YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS + + ". Only try connect once."); + rmConnectWaitMS = 0; + } + } + + int rmRetryCount = 0; + long waitStartTime = System.currentTimeMillis(); + + RegisterNodeManagerRequest request = + recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); request.setHttpPort(this.httpPort); request.setResource(this.totalResource); request.setNodeId(this.nodeId); - RegistrationResponse regResponse = - this.resourceTracker.registerNodeManager(request).getRegistrationResponse(); + RegisterNodeManagerResponse regNMResponse; + + while(true) { + try { + rmRetryCount++; + LOG.info("Connecting to ResourceManager at " + this.rmAddress + + ". current no. of attempts is " + rmRetryCount); + this.resourceTracker = getRMClient(); + regNMResponse = + this.resourceTracker.registerNodeManager(request); + break; + } catch(Throwable e) { + LOG.warn("Trying to connect to ResourceManager, " + + "current no. of failed attempts is "+rmRetryCount); + if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS + || waitForEver) { + try { + LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 + + " seconds before next connection retry to RM"); + Thread.sleep(rmConnectionRetryIntervalMS); + } catch(InterruptedException ex) { + //done nothing + } + } else { + String errorMessage = "Failed to Connect to RM, " + + "no. of failed attempts is "+rmRetryCount; + LOG.error(errorMessage,e); + throw new YarnException(errorMessage,e); + } + } + } // if the Resourcemanager instructs NM to shutdown. - if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) { + if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { throw new YarnException( "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed"); } if (UserGroupInformation.isSecurityEnabled()) { - MasterKey masterKey = regResponse.getMasterKey(); + MasterKey masterKey = regNMResponse.getMasterKey(); // do this now so that its set before we start heartbeating to RM LOG.info("Security enabled - updating secret keys now"); // It is expected that status updater is started by this point and @@ -340,8 +407,8 @@ public class NodeStatusUpdaterImpl exten request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey()); } - HeartbeatResponse response = - resourceTracker.nodeHeartbeat(request).getHeartbeatResponse(); + NodeHeartbeatResponse response = + resourceTracker.nodeHeartbeat(request); // See if the master-key has rolled over if (isSecurityEnabled()) { @@ -371,14 +438,14 @@ public class NodeStatusUpdaterImpl exten lastHeartBeatID = response.getResponseId(); List containersToCleanup = response - .getContainersToCleanupList(); + .getContainersToCleanup(); if (containersToCleanup.size() != 0) { dispatcher.getEventHandler().handle( new CMgrCompletedContainersEvent(containersToCleanup, CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); } List appsToCleanup = - response.getApplicationsToCleanupList(); + response.getApplicationsToCleanup(); //Only start tracking for keepAlive on FINISH_APP trackAppsForKeepAlive(appsToCleanup); if (appsToCleanup.size() != 0) {