Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 370A527FB for ; Fri, 29 Apr 2011 08:36:31 +0000 (UTC) Received: (qmail 2197 invoked by uid 500); 29 Apr 2011 08:36:31 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 2176 invoked by uid 500); 29 Apr 2011 08:36:31 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 2168 invoked by uid 99); 29 Apr 2011 08:36:31 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2011 08:36:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Apr 2011 08:36:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9DFF92388A39; Fri, 29 Apr 2011 08:35:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1097727 [3/5] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ yarn/yarn-common/src/main/ja... Date: Fri, 29 Apr 2011 08:35:56 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110429083557.9DFF92388A39@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,468 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.io.DataInputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; + +import java.net.InetSocketAddress; + +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityInfo; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YARNApplicationConstants; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; + +import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; +import org.apache.hadoop.yarn.util.ConverterUtils; + +public class ContainerLocalizer { + + static final Log LOG = LogFactory.getLog(ContainerLocalizer.class); + + public static final String FILECACHE = "filecache"; + public static final String APPCACHE = "appcache"; + public static final String USERCACHE = "usercache"; + public static final String OUTPUTDIR = "output"; + public static final String TOKEN_FILE_FMT = "%s.tokens"; + public static final String WORKDIR = "work"; + private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs"; + private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs"; + + private final String user; + private final String appId; + private final Path logDir; + private final List localDirs; + private final String localizerId; + private final FileContext lfs; + private final Configuration conf; + private final LocalDirAllocator appDirs; + private final LocalDirAllocator userDirs; + private final RecordFactory recordFactory; + private final Map> pendingResources; + + public ContainerLocalizer(String user, String appId, String localizerId, + Path logDir, List localDirs) throws IOException { + this(FileContext.getLocalFSFileContext(), user, appId, localizerId, logDir, + localDirs, new HashMap>(), + RecordFactoryProvider.getRecordFactory(null)); + } + + ContainerLocalizer(FileContext lfs, String user, String appId, + String localizerId, Path logDir, List localDirs, + Map> pendingResources, + RecordFactory recordFactory) throws IOException { + if (null == user) { + throw new IOException("Cannot initialize for null user"); + } + if (null == localizerId) { + throw new IOException("Cannot initialize for null containerId"); + } + this.lfs = lfs; + this.user = user; + this.appId = appId; + this.logDir = logDir; + this.localDirs = localDirs; + this.localizerId = localizerId; + this.recordFactory = recordFactory; + this.conf = new Configuration(); + this.appDirs = + new LocalDirAllocator(String.format(APPCACHE_CTXT_FMT, appId)); + this.userDirs = + new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, appId)); + this.pendingResources = pendingResources; + } + + LocalizationProtocol getProxy(final InetSocketAddress nmAddr) { + Configuration localizerConf = new Configuration(); + YarnRPC rpc = YarnRPC.create(localizerConf); + if (UserGroupInformation.isSecurityEnabled()) { + localizerConf.setClass( + CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME, + LocalizerSecurityInfo.class, SecurityInfo.class); + } + return (LocalizationProtocol) + rpc.getProxy(LocalizationProtocol.class, nmAddr, localizerConf); + } + + public int runLocalization(final InetSocketAddress nmAddr) + throws IOException, InterruptedException { + // load credentials + initDirs(conf, user, appId, lfs, logDir, localDirs); + final Credentials creds = new Credentials(); + DataInputStream credFile = null; + try { + // assume credentials in cwd + credFile = lfs.open( + new Path(String.format(TOKEN_FILE_FMT, localizerId))); + creds.readTokenStorageStream(credFile); + } finally { + if (credFile != null) { + credFile.close(); + } + } + // create localizer context + UserGroupInformation remoteUser = + UserGroupInformation.createRemoteUser(user); + LocalizerTokenSecretManager secretManager = + new LocalizerTokenSecretManager(); + LocalizerTokenIdentifier id = secretManager.createIdentifier(); + Token localizerToken = + new Token(id, secretManager); + remoteUser.addToken(localizerToken); + final LocalizationProtocol nodeManager = + remoteUser.doAs(new PrivilegedAction() { + @Override + public LocalizationProtocol run() { + return getProxy(nmAddr); + } + }); + + // create user context + UserGroupInformation ugi = + UserGroupInformation.createRemoteUser(user); + for (Token token : creds.getAllTokens()) { + ugi.addToken(token); + } + + return ugi.doAs(new PrivilegedExceptionAction() { + public Integer run() { + ExecutorService exec = null; + try { + exec = createDownloadThreadPool(); + localizeFiles(nodeManager, exec); + return 0; + } catch (Throwable e) { + e.printStackTrace(System.out); + return -1; + } finally { + if (exec != null) { + exec.shutdownNow(); + } + } + } + }); + } + + ExecutorService createDownloadThreadPool() { + return Executors.newSingleThreadExecutor(); + } + + Callable download(LocalDirAllocator lda, LocalResource rsrc) { + return new FSDownload(lfs, conf, lda, rsrc, new Random()); + } + + void sleep(int duration) throws InterruptedException { + TimeUnit.SECONDS.sleep(duration); + } + + void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec) { + while (true) { + try { + LocalizerStatus status = createStatus(); + LocalizerHeartbeatResponse response = nodemanager.heartbeat(status); + switch (response.getLocalizerAction()) { + case LIVE: + List newResources = response.getAllResources(); + for (LocalResource r : newResources) { + if (!pendingResources.containsKey(r)) { + final LocalDirAllocator lda; + switch (r.getVisibility()) { + default: + LOG.warn("Unknown visibility: " + r.getVisibility()); + case PUBLIC: + case PRIVATE: + lda = userDirs; + break; + case APPLICATION: + lda = appDirs; + break; + } + pendingResources.put(r, exec.submit(download(lda, r))); + } + } + break; + case DIE: + // killall running localizations + for (Future pending : pendingResources.values()) { + pending.cancel(true); + } + status = createStatus(); + // ignore response + try { + nodemanager.heartbeat(status); + } catch (YarnRemoteException e) { } + return; + } + sleep(1); + } catch (InterruptedException e) { + return; + } catch (YarnRemoteException e) { + // TODO cleanup + return; + } + } + } + + LocalizerStatus createStatus() throws InterruptedException { + final List currentResources = + new ArrayList(); + for (Iterator i = pendingResources.keySet().iterator(); + i.hasNext();) { + LocalResource rsrc = i.next(); + LocalResourceStatus stat = + recordFactory.newRecordInstance(LocalResourceStatus.class); + stat.setResource(rsrc); + Future fPath = pendingResources.get(rsrc); + if (fPath.isDone()) { + try { + Path localPath = fPath.get(); + stat.setLocalPath( + ConverterUtils.getYarnUrlFromPath(localPath)); + stat.setLocalSize( + FileUtil.getDU(new File(localPath.getParent().toString()))); + stat.setStatus(ResourceStatusType.FETCH_SUCCESS); + } catch (ExecutionException e) { + stat.setStatus(ResourceStatusType.FETCH_FAILURE); + stat.setException(RPCUtil.getRemoteException(e.getCause())); + } catch (CancellationException e) { + stat.setStatus(ResourceStatusType.FETCH_FAILURE); + stat.setException(RPCUtil.getRemoteException(e)); + } + // TODO shouldn't remove until ACK + i.remove(); + } else { + stat.setStatus(ResourceStatusType.FETCH_PENDING); + } + currentResources.add(stat); + } + LocalizerStatus status = + recordFactory.newRecordInstance(LocalizerStatus.class); + status.setLocalizerId(localizerId); + status.addAllResources(currentResources); + return status; + } + + public static void main(String[] argv) throws Throwable { + // usage: $0 user appId locId host port app_log_dir user_dir [user_dir]* + // let $x = $x/usercache for $local.dir + // MKDIR $x/$user/appcache/$appid + // MKDIR $x/$user/appcache/$appid/output + // MKDIR $x/$user/appcache/$appid/filecache + // LOAD $x/$user/appcache/$appid/appTokens + try { + String user = argv[0]; + String appId = argv[1]; + String locId = argv[2]; + InetSocketAddress nmAddr = + new InetSocketAddress(argv[3], Integer.parseInt(argv[4])); + Path logDir = new Path(argv[5]); + String[] sLocaldirs = Arrays.copyOfRange(argv, 6, argv.length); + ArrayList localDirs = new ArrayList(sLocaldirs.length); + for (String sLocaldir : sLocaldirs) { + localDirs.add(new Path(sLocaldir)); + } + + final String uid = + UserGroupInformation.getCurrentUser().getShortUserName(); + if (!user.equals(uid)) { + // TODO: fail localization + LOG.warn("Localization running as " + uid + " not " + user); + } + + ContainerLocalizer localizer = + new ContainerLocalizer(user, appId, locId, logDir, localDirs); + System.exit(localizer.runLocalization(nmAddr)); + } catch (Throwable e) { + // Print error to stdout so that LCE can use it. + e.printStackTrace(System.out); + throw e; + } + } + + private static void initDirs(Configuration conf, String user, String appId, + FileContext lfs, Path logDir, List localDirs) throws IOException { + if (null == localDirs || 0 == localDirs.size()) { + throw new IOException("Cannot initialize without local dirs"); + } + String[] appCacheDirs = new String[localDirs.size()]; + String[] userCacheDirs = new String[localDirs.size()]; + for (int i = 0, n = localDirs.size(); i < n; ++i) { + // $x/usercache/$user + Path base = lfs.makeQualified( + new Path(new Path(localDirs.get(i), USERCACHE), user)); + // $x/usercache/$user/filecache + Path uCacheDir = new Path(base, FILECACHE); + userCacheDirs[i] = uCacheDir.toString(); + lfs.mkdir(uCacheDir, null, true); + // $x/usercache/$user/appcache/$appId + Path appBase = new Path(base, new Path(APPCACHE, appId)); + lfs.mkdir(appBase, null, true); + // $x/usercache/$user/appcache/$appId/filecache + Path aCacheDir = new Path(appBase, FILECACHE); + appCacheDirs[i] = aCacheDir.toString(); + lfs.mkdir(aCacheDir, null, true); + // $x/usercache/$user/appcache/$appId/output + lfs.mkdir(new Path(appBase, OUTPUTDIR), null, true); + } + conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appCacheDirs); + conf.setStrings(String.format(USERCACHE_CTXT_FMT, appId), userCacheDirs); + lfs.mkdir(logDir, null, true); + } + + public static void writeLaunchEnv(OutputStream out, + Map environment, Map resources, + List command, List appDirs) + throws IOException { + ShellScriptBuilder sb = new ShellScriptBuilder(); + if (System.getenv("YARN_HOME") != null) { + sb.env("YARN_HOME", System.getenv("YARN_HOME")); + } + sb.env(YARNApplicationConstants.LOCAL_DIR_ENV, + StringUtils.join(",", appDirs)); + if (environment != null) { + for (Map.Entry env : environment.entrySet()) { + sb.env(env.getKey().toString(), env.getValue().toString()); + } + } + if (resources != null) { + for (Map.Entry link : resources.entrySet()) { + sb.symlink(link.getKey(), link.getValue()); + } + } + ArrayList cmd = new ArrayList(2 * command.size() + 5); + cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec "); + cmd.add("/bin/bash "); + cmd.add("-c "); + cmd.add("\""); + for (String cs : command) { + cmd.add(cs.toString()); + cmd.add(" "); + } + cmd.add("\""); + sb.line(cmd.toArray(new String[cmd.size()])); + PrintStream pout = null; + try { + pout = new PrintStream(out); + sb.write(pout); + } finally { + if (out != null) { + out.close(); + } + } + } + + private static class ShellScriptBuilder { + + private final StringBuilder sb; + + public ShellScriptBuilder() { + this(new StringBuilder("#!/bin/bash\n\n")); + } + + protected ShellScriptBuilder(StringBuilder sb) { + this.sb = sb; + } + + public ShellScriptBuilder env(String key, String value) { + line("export ", key, "=\"", value, "\""); + return this; + } + + public ShellScriptBuilder symlink(Path src, String dst) throws IOException { + return symlink(src, new Path(dst)); + } + + public ShellScriptBuilder symlink(Path src, Path dst) throws IOException { + if (!src.isAbsolute()) { + throw new IOException("Source must be absolute"); + } + if (dst.isAbsolute()) { + throw new IOException("Destination must be relative"); + } + if (dst.toUri().getPath().indexOf('/') != -1) { + line("mkdir -p ", dst.getParent().toString()); + } + line("ln -sf ", src.toUri().getPath(), " ", dst.toString()); + return this; + } + + public void write(PrintStream out) throws IOException { + out.append(sb); + } + + public void line(String... command) { + for (String s : command) { + sb.append(s); + } + sb.append("\n"); + } + + @Override + public String toString() { + return sb.toString(); + } + + } + +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java?rev=1097727&r1=1097726&r2=1097727&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java Fri Apr 29 08:35:53 2011 @@ -22,8 +22,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URISyntaxException; -import java.util.Collections; -import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; @@ -48,7 +46,7 @@ import org.apache.hadoop.yarn.api.record * Download a single URL to the local disk. * */ -public class FSDownload implements Callable> { +public class FSDownload implements Callable { private static final Log LOG = LogFactory.getLog(FSDownload.class); @@ -117,7 +115,7 @@ public class FSDownload implements Calla } @Override - public Map call() throws IOException { + public Path call() throws IOException { Path sCopy; try { sCopy = ConverterUtils.getPathFromYarnURL(resource.getResource()); @@ -140,7 +138,7 @@ public class FSDownload implements Calla Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName())); try { Path dTmp = files.makeQualified(copy(sCopy, dst_work)); - resource.setSize(unpack(new File(dTmp.toUri()), new File(dFinal.toUri()))); + unpack(new File(dTmp.toUri()), new File(dFinal.toUri())); files.rename(dst_work, dst, Rename.OVERWRITE); } catch (IOException e) { try { files.delete(dst, true); } catch (IOException ignore) { } @@ -152,12 +150,11 @@ public class FSDownload implements Calla // clear ref to internal var rand = null; conf = null; - //resource = null; TODO change downstream to not req + resource = null; dirs = null; cachePerms = null; } - return Collections.singletonMap(resource, - files.makeQualified(new Path(dst, sCopy.getName()))); + return files.makeQualified(new Path(dst, sCopy.getName())); } private static long getEstimatedSize(LocalResource rsrc) { Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,103 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.net.URISyntaxException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.util.ConverterUtils; + +public class LocalResourceRequest implements Comparable { + + private final Path loc; + private final long timestamp; + private final LocalResourceType type; + + /** + * Wrap API resource to match against cache of localized resources. + * @param resource Resource requested by container + * @throws URISyntaxException If the path is malformed + */ + public LocalResourceRequest(LocalResource resource) + throws URISyntaxException { + this.loc = ConverterUtils.getPathFromYarnURL(resource.getResource()); + this.timestamp = resource.getTimestamp(); + this.type = resource.getType(); + } + + @Override + public int hashCode() { + return loc.hashCode() ^ + (int)((timestamp >>> 32) ^ timestamp) * + type.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof LocalResourceRequest)) { + return false; + } + final LocalResourceRequest other = (LocalResourceRequest) o; + return getPath().equals(other.getPath()) && + getTimestamp() == other.getTimestamp() && + getType() == other.getType(); + } + + @Override + public int compareTo(LocalResourceRequest other) { + if (this == other) { + return 0; + } + int ret = getPath().compareTo(other.getPath()); + if (0 == ret) { + ret = (int)(getTimestamp() - other.getTimestamp()); + if (0 == ret) { + ret = getType().ordinal() - other.getType().ordinal(); + } + } + return ret; + } + + public Path getPath() { + return loc; + } + + public long getTimestamp() { + return timestamp; + } + + public LocalResourceType getType() { + return type; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ "); + sb.append(getPath().toString()).append(", "); + sb.append(getTimestamp()).append(", "); + sb.append(getType()).append(" }"); + return sb.toString(); + } +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1097727&r1=1097726&r2=1097727&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Fri Apr 29 08:35:53 2011 @@ -18,23 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import java.net.URISyntaxException; -import java.util.Collection; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +public interface LocalResourcesTracker extends EventHandler { -public interface LocalResourcesTracker { - - Collection register( - AppLocalizationRunnerImpl appLocalizationRunner, - Collection values) - throws URISyntaxException; - - void setSuccess(LocalResource localRsrc, long size, Path pathFromYarnURL) - throws IllegalArgumentException, InterruptedException; - - void removeFailedResource(LocalResource localResource, - YarnRemoteException cause); + public boolean contains(LocalResourceRequest resource); } Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1097727&r1=1097726&r2=1097727&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Apr 29 08:35:53 2011 @@ -15,151 +15,50 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; - +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; class LocalResourcesTrackerImpl implements LocalResourcesTracker { static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class); - private final ConcurrentHashMap localrsrc = - new ConcurrentHashMap(); - - public void setSuccess(LocalResource lResource, long rsrcSize, - Path localPath) throws IllegalArgumentException, InterruptedException { - LocalizedResource rsrc = localrsrc.get(lResource); - if (null == rsrc) { - throw new IllegalArgumentException("Unknown: " + lResource.getPath()); - } - rsrc.success(lResource, rsrcSize, localPath); - } - - public void removeFailedResource(LocalResource lResource, - YarnRemoteException e) throws IllegalArgumentException { - LocalizedResource rsrc = localrsrc.get(lResource); - if (null == rsrc) { - throw new IllegalArgumentException("Unknown: " + lResource.getPath()); - } - rsrc.failure(lResource, e); - } - - protected long currentTime() { - return System.nanoTime(); - } - - // TODO replace w/ queue over RPC - /** @return Resources not present in this bundle */ - public Collection register( - AppLocalizationRunnerImpl app, - Collection rsrcs) - throws URISyntaxException { - ArrayList ret = - new ArrayList(rsrcs.size()); - for (final org.apache.hadoop.yarn.api.records.LocalResource yrsrc : rsrcs) { - final LocalizedResource cand = - new LocalizedResource(new Callable>() { - @Override - public Map call() { - // Future complete from RPC callback - throw new UnsupportedOperationException(); - } - }); - final LocalResource rsrc = new LocalResource(yrsrc); - while (true) { - LocalizedResource actual = localrsrc.putIfAbsent(rsrc, cand); - if (null == actual) { - cand.request(app); - ret.add(yrsrc); - break; - } - // ensure resource localization not cancelled - if (actual.isCancelled()) { - if (!localrsrc.replace(rsrc, actual, cand)) { - // newer entry exists - continue; - } else { - cand.request(app); - ret.add(yrsrc); - break; - } - } - actual.request(app); - break; + private final Dispatcher dispatcher; + private final ConcurrentHashMap + localrsrc = new ConcurrentHashMap(); + + public LocalResourcesTrackerImpl(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + } + + public void handle(ResourceEvent event) { + LocalResourceRequest req = event.getLocalResource(); + LocalizedResource rsrc = localrsrc.get(req); + switch (event.getType()) { + case REQUEST: + case LOCALIZED: + if (null == rsrc) { + rsrc = new LocalizedResource(req, dispatcher); + localrsrc.put(req, rsrc); } - } - return ret; - } - - public void release(AppLocalizationRunnerImpl app, LocalResource[] resources) { - for (LocalResource rsrc : resources) { - LocalizedResource resource = localrsrc.get(rsrc); - if (resource != null) { - // XXX update timestamp to last-used? - resource.refCount.getAndDecrement(); - resource.notifyQueue.remove(app); + break; + case RELEASE: + if (null == rsrc) { + LOG.info("Release unknown rsrc " + rsrc + " (discard)"); + return; } + break; } + rsrc.handle(event); } - /** - * Private inner datum tracking a resource that is already localized. - */ - // TODO use AQS for download - private class LocalizedResource extends - FutureTask> { - private final AtomicInteger refCount = new AtomicInteger(0); - private final AtomicLong timestamp = new AtomicLong(currentTime()); - private final BlockingQueue notifyQueue = - new LinkedBlockingQueue(); - // TODO: Why is it needed? - private volatile long size = -1; - LocalizedResource(Callable> fetch) { - super(fetch); - } - void request(AppLocalizationRunnerImpl appRunner) { - refCount.getAndIncrement(); - timestamp.set(currentTime()); - notifyQueue.offer(appRunner); - if (isDone() && notifyQueue.remove(appRunner)) { - appRunner.localizedResource(this); - } - } - void success(LocalResource rsrc, long rsrcSize, Path p) - throws InterruptedException { - size = rsrcSize; - set(Collections.singletonMap(rsrc, p)); - } - void failure(LocalResource r, YarnRemoteException e) { - // TODO: How do you inform the appRunner? - localrsrc.remove(r, this); - setException(e); - } - @Override - protected void done() { - for (AppLocalizationRunner appRunner = notifyQueue.poll(); appRunner != null; - appRunner = notifyQueue.poll()) { - appRunner.localizedResource(this); - } - } + public boolean contains(LocalResourceRequest resource) { + return localrsrc.contains(resource); } } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,225 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; +import org.apache.hadoop.yarn.state.MultipleArcTransition; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; + +public class LocalizedResource implements EventHandler { + + private static final Log LOG = LogFactory.getLog(LocalizedResource.class); + + Path localPath; + long size = -1; + final LocalResourceRequest rsrc; + final Dispatcher dispatcher; + final StateMachine + stateMachine; + final Semaphore sem = new Semaphore(1); + final Queue ref; + final AtomicLong timestamp = new AtomicLong(currentTime()); + + private static final StateMachineFactory stateMachineFactory = + new StateMachineFactory(ResourceState.INIT) + // From INIT (ref == 0, awaiting req) + .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING, + ResourceEventType.REQUEST, new FetchResourceTransition()) + .addTransition(ResourceState.INIT, ResourceState.LOCALIZED, + ResourceEventType.LOCALIZED, new FetchDirectTransition()) + .addTransition(ResourceState.INIT, ResourceState.INIT, + ResourceEventType.RELEASE, new ReleaseTransition()) + // From DOWNLOADING (ref > 0, may be localizing) + .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING, + ResourceEventType.REQUEST, new FetchResourceTransition()) + .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED, + ResourceEventType.LOCALIZED, new FetchSuccessTransition()) + .addTransition(ResourceState.DOWNLOADING, + EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT), + ResourceEventType.RELEASE, new ReleasePendingTransition()) + // From LOCALIZED (ref >= 0, on disk) + .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, + ResourceEventType.REQUEST, new LocalizedResourceTransition()) + .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, + ResourceEventType.LOCALIZED, new LocalizedResourceTransition()) + .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED, + ResourceEventType.RELEASE, new ReleaseTransition()) + .installTopology(); + + public LocalizedResource(LocalResourceRequest rsrc, Dispatcher dispatcher) { + this.rsrc = rsrc; + this.dispatcher = dispatcher; + this.ref = new LinkedList(); + this.stateMachine = stateMachineFactory.make(this); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ ").append(rsrc.toString()).append(",") + .append(getState() == ResourceState.LOCALIZED + ? localPath + : "pending").append(",["); + for (ContainerId c : ref) { + sb.append("(").append(c.toString()).append(")"); + } + sb.append("],").append(timestamp.get()).append("}"); + return sb.toString(); + } + + public void release(ContainerId container) { + if (!ref.remove(container)) { + LOG.info("Attempt to release claim on " + this + + " from unregistered container " + container); + assert false; + } + timestamp.set(currentTime()); + } + + long currentTime() { + return System.nanoTime(); + } + + public ResourceState getState() { + return stateMachine.getCurrentState(); + } + + public LocalResourceRequest getRequest() { + return rsrc; + } + + public boolean tryAcquire() { + return sem.tryAcquire(); + } + + public void unlock() { + sem.release(); + } + + public synchronized void handle(ResourceEvent event) { + stateMachine.doTransition(event.getType(), event); + } + + static abstract class ResourceTransition implements + SingleArcTransition { + // typedef + } + + @SuppressWarnings("unchecked") // dispatcher not typed + private static class FetchResourceTransition extends ResourceTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + ResourceRequestEvent req = (ResourceRequestEvent) event; + LocalizerContext ctxt = req.getContext(); + ContainerId container = ctxt.getContainer(); + rsrc.ref.add(container); + rsrc.dispatcher.getEventHandler().handle( + new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt)); + } + } + + private static class FetchDirectTransition extends FetchSuccessTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + LOG.warn("Resource " + rsrc + " localized without listening container"); + super.transition(rsrc, event); + } + } + + /** + * Resource localized, notify waiting containers. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + private static class FetchSuccessTransition extends ResourceTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event; + rsrc.localPath = locEvent.getLocation(); + rsrc.size = locEvent.getSize(); + for (ContainerId container : rsrc.ref) { + rsrc.dispatcher.getEventHandler().handle( + new ContainerResourceLocalizedEvent( + container, rsrc.rsrc, rsrc.localPath)); + } + } + } + + /** + * Resource already localized, notify immediately. + */ + @SuppressWarnings("unchecked") // dispatcher not typed + private static class LocalizedResourceTransition + extends ResourceTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + // notify waiting containers + ResourceRequestEvent reqEvent = (ResourceRequestEvent) event; + ContainerId container = reqEvent.getContext().getContainer(); + rsrc.ref.add(container); + rsrc.dispatcher.getEventHandler().handle( + new ContainerResourceLocalizedEvent( + container, rsrc.rsrc, rsrc.localPath)); + } + } + + /** + * Decrement resource count, update timestamp. + */ + private static class ReleaseTransition extends ResourceTransition { + @Override + public void transition(LocalizedResource rsrc, ResourceEvent event) { + // Note: assumes that localizing container must succeed or fail + ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; + rsrc.release(relEvent.getContainer()); + } + } + + private static class ReleasePendingTransition implements + MultipleArcTransition { + @Override + public ResourceState transition(LocalizedResource rsrc, + ResourceEvent event) { + ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; + rsrc.release(relEvent.getContainer()); + return rsrc.ref.isEmpty() + ? ResourceState.INIT + : ResourceState.DOWNLOADING; + } + } +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,31 @@ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ContainerId; + +public class LocalizerContext { + + private final String user; + private final ContainerId container; + private final Credentials credentials; + + public LocalizerContext(String user, ContainerId container, + Credentials credentials) { + this.user = user; + this.container = container; + this.credentials = credentials; + } + + public String getUser() { + return user; + } + + public ContainerId getContainer() { + return container; + } + + public Credentials getCredentials() { + return credentials; + } + +} Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1097727&r1=1097726&r2=1097727&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Apr 29 08:35:53 2011 @@ -15,15 +15,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; +import java.io.DataOutputStream; + +import java.net.URISyntaxException; + +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCALIZER_BIND_ADDRESS; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCALIZER_BIND_ADDRESS; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR; import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR; +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import java.io.IOException; import java.net.InetSocketAddress; @@ -32,8 +47,6 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.avro.ipc.Server; import org.apache.commons.logging.Log; @@ -49,40 +62,44 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol; -import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationRequest; -import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationResponse; -import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationRequest; -import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationResponse; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; +import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager; import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.api.records.URL; - public class ResourceLocalizationService extends AbstractService - implements EventHandler, LocalizationProtocol { + implements EventHandler, LocalizationProtocol { private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class); public static final String NM_PRIVATE_DIR = "nmPrivate"; - public final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700); + public static final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700); private Server server; private InetSocketAddress locAddr; @@ -92,22 +109,15 @@ public class ResourceLocalizationService private final ContainerExecutor exec; protected final Dispatcher dispatcher; private final DeletionService delService; - private final ExecutorService appLocalizerThreadPool = - Executors.newCachedThreadPool(); - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - - /** - * Map of private resources of users. - */ + private LocalizerTracker localizers; + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + //private final LocalResourcesTracker publicRsrc; private final ConcurrentMap privateRsrc = new ConcurrentHashMap(); - - /** - * Map of applications that are in the process of localization. - * TODO: Why is it needed? - */ - private final ConcurrentMap localizingApps = - new ConcurrentHashMap(); + private final ConcurrentMap appRsrc = + new ConcurrentHashMap(); public ResourceLocalizationService(Dispatcher dispatcher, ContainerExecutor exec, DeletionService delService) { @@ -117,16 +127,21 @@ public class ResourceLocalizationService this.delService = delService; } + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnException("Failed to access local fs"); + } + } + @Override public void init(Configuration conf) { - // TODO limit for public cache, not appLocalizer - //appLocalizer.setMaximumPoolSize( - // conf.getInt(NM_MAX_PUBLIC_FETCH_THREADS, - // DEFAULT_MAX_PUBLIC_FETCH_THREADS)); try { // TODO queue deletions here, rather than NM init? - FileContext lfs = FileContext.getLocalFSFileContext(conf); - String[] sLocalDirs = conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR); + FileContext lfs = getLocalFileContext(conf); + String[] sLocalDirs = + conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR); localDirs = new ArrayList(sLocalDirs.length); logDirs = new ArrayList(sLocalDirs.length); @@ -135,10 +150,10 @@ public class ResourceLocalizationService Path localdir = new Path(sLocaldir); localDirs.add(localdir); // $local/usercache - Path userdir = new Path(localdir, ApplicationLocalizer.USERCACHE); + Path userdir = new Path(localdir, ContainerLocalizer.USERCACHE); lfs.mkdir(userdir, null, true); // $local/filecache - Path filedir = new Path(localdir, ApplicationLocalizer.FILECACHE); + Path filedir = new Path(localdir, ContainerLocalizer.FILECACHE); lfs.mkdir(filedir, null, true); // $local/nmPrivate Path sysdir = new Path(localdir, NM_PRIVATE_DIR); @@ -152,18 +167,32 @@ public class ResourceLocalizationService lfs.mkdir(logdir, null, true); } } catch (IOException e) { - throw new YarnException("Failed to start Localizer", e); + throw new YarnException("Failed to initialize LocalizationService", e); } localDirs = Collections.unmodifiableList(localDirs); logDirs = Collections.unmodifiableList(logDirs); sysDirs = Collections.unmodifiableList(sysDirs); locAddr = NetUtils.createSocketAddr( - conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS)); + conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS)); + localizers = new LocalizerTracker(); + dispatcher.register(LocalizerEventType.class, localizers); super.init(conf); } @Override + public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) { + return localizers.processHeartbeat(status); + } + + @Override public void start() { + server = createServer(); + LOG.info("Localizer started on port " + server.getPort()); + server.start(); + super.start(); + } + + Server createServer() { YarnRPC rpc = YarnRPC.create(getConfig()); Configuration conf = new Configuration(getConfig()); // Clone to separate // sec-info classes @@ -173,127 +202,87 @@ public class ResourceLocalizationService LocalizerSecurityInfo.class, SecurityInfo.class); secretManager = new LocalizerTokenSecretManager(); } - server = - rpc.getServer(LocalizationProtocol.class, this, locAddr, conf, - secretManager); - LOG.info("Localizer started at " + locAddr); - server.start(); - super.start(); - } - - /** LTC communication w/ Localizer */ - public InetSocketAddress getAddress() { - return locAddr; - } - - /** - * Localizer report to NodeManager of localized resource. - * @param user Owner of the private cache - * @param resource Resource localized - * @param path Location on the local filesystem, or null if failed - */ - - @Override - public SuccessfulLocalizationResponse successfulLocalization(SuccessfulLocalizationRequest request) throws YarnRemoteException { - String user = request.getUser(); - org.apache.hadoop.yarn.api.records.LocalResource resource = request.getResource(); - URL path = request.getPath(); - // TODO validate request - LocalResourcesTracker userCache = privateRsrc.get(user.toString()); - if (null == userCache) { - throw RPCUtil.getRemoteException("Unknown user: " + user); - } - try { - userCache.setSuccess(new LocalResource(resource), - resource.getSize(), ConverterUtils.getPathFromYarnURL(path)); - } catch (Exception e) { - throw RPCUtil.getRemoteException(e); - } - SuccessfulLocalizationResponse response = recordFactory.newRecordInstance(SuccessfulLocalizationResponse.class); - return response; - } - - public FailedLocalizationResponse failedLocalization(FailedLocalizationRequest request) throws YarnRemoteException { - String user = request.getUser(); - org.apache.hadoop.yarn.api.records.LocalResource resource = request.getResource(); - YarnRemoteException cause = request.getException(); - LocalResourcesTracker userCache = privateRsrc.get(user.toString()); - if (null == userCache) { - throw RPCUtil.getRemoteException("Unknown user: " + user); - } - try { - userCache.removeFailedResource(new LocalResource(resource), cause); - } catch (Exception e) { - throw RPCUtil.getRemoteException(e); - } - FailedLocalizationResponse response = recordFactory.newRecordInstance(FailedLocalizationResponse.class); - return response; + return rpc.getServer( + LocalizationProtocol.class, this, locAddr, conf, secretManager); } @Override public void stop() { - appLocalizerThreadPool.shutdownNow(); if (server != null) { server.close(); } + if (localizers != null) { + localizers.stop(); + } super.stop(); } @Override - public void handle(LocalizerEvent event) { + @SuppressWarnings("unchecked") // dispatcher not typed + public void handle(LocalizationEvent event) { String userName; String appIDStr; - + // TODO: create log dir as $logdir/$user/$appId switch (event.getType()) { case INIT_APPLICATION_RESOURCES: - Application app = ((ApplicationLocalizerEvent)event).getApplication(); - String user = app.getUser(); - LocalResourcesTracker rsrcTracker = privateRsrc.get(user); - if (null == rsrcTracker) { - LocalResourcesTracker perUserPrivateRsrcTracker = - new LocalResourcesTrackerImpl(); - rsrcTracker = - privateRsrc.putIfAbsent(user, perUserPrivateRsrcTracker); - if (null == rsrcTracker) { - rsrcTracker = perUserPrivateRsrcTracker; - } + Application app = + ((ApplicationLocalizationEvent)event).getApplication(); + // 0) Create application tracking structs + privateRsrc.putIfAbsent(app.getUser(), + new LocalResourcesTrackerImpl(dispatcher)); + if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()), + new LocalResourcesTrackerImpl(dispatcher))) { + LOG.warn("Initializing application " + app + " already present"); + assert false; } - - // Possibility of duplicate app localization is avoided by ApplicationImpl - // itself, so we are good to go. - - // TODO: use LDA for picking single logdir, sysDir - // TODO: create log dir as $logdir/$user/$appId - try { - AppLocalizationRunner appLocalizationRunner = - new AppLocalizationRunnerImpl(dispatcher, - app, this, null, rsrcTracker, exec, logDirs.get(0), localDirs, - new Path(sysDirs.get(0), app.toString())); - localizingApps.put(app.toString(), appLocalizationRunner); - appLocalizerThreadPool.submit(appLocalizationRunner); - } catch (IOException e) { - // TODO kill containers - LOG.info("Failed to submit application", e); - //dispatcher.getEventHandler().handle( + // 1) Signal container init + dispatcher.getEventHandler().handle(new ApplicationInitedEvent( + app.getAppId(), logDirs.get(0))); + break; + case INIT_CONTAINER_RESOURCES: + ContainerLocalizationRequestEvent rsrcReqs = + (ContainerLocalizationRequestEvent) event; + Container c = rsrcReqs.getContainer(); + LocalizerContext ctxt = new LocalizerContext( + c.getUser(), c.getContainerID(), c.getCredentials()); + final LocalResourcesTracker tracker; + LocalResourceVisibility vis = rsrcReqs.getVisibility(); + switch (vis) { + default: + case PUBLIC: + // TODO + tracker = null; + break; + case PRIVATE: + tracker = privateRsrc.get(c.getUser()); + break; + case APPLICATION: + tracker = + appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId())); + break; + } + for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) { + tracker.handle(new ResourceRequestEvent(req, vis, ctxt)); } break; case CLEANUP_CONTAINER_RESOURCES: - Container container = ((ContainerLocalizerEvent)event).getContainer(); + Container container = + ((ContainerLocalizationEvent)event).getContainer(); // Delete the container directories userName = container.getUser();; String containerIDStr = container.toString(); - appIDStr = ConverterUtils.toString(container.getContainerID().getAppId()); + appIDStr = + ConverterUtils.toString(container.getContainerID().getAppId()); for (Path localDir : localDirs) { - Path usersdir = new Path(localDir, ApplicationLocalizer.USERCACHE); + Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, userName); - Path allAppsdir = new Path(userdir, ApplicationLocalizer.APPCACHE); + Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); Path containerDir = new Path(appDir, containerIDStr); - delService.delete(userName, - containerDir, null); + delService.delete(userName, containerDir, null); Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); @@ -308,19 +297,21 @@ public class ResourceLocalizationService case DESTROY_APPLICATION_RESOURCES: Application application = - ((ApplicationLocalizerEvent) event).getApplication(); + ((ApplicationLocalizationEvent) event).getApplication(); + if (null == appRsrc.remove(application)) { + LOG.warn("Removing uninitialized application " + application); + } // Delete the application directories userName = application.getUser(); appIDStr = application.toString(); for (Path localDir : localDirs) { - Path usersdir = new Path(localDir, ApplicationLocalizer.USERCACHE); + Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, userName); - Path allAppsdir = new Path(userdir, ApplicationLocalizer.APPCACHE); + Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); - delService.delete(userName, - appDir, null); + delService.delete(userName, appDir, null); Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); @@ -337,4 +328,247 @@ public class ResourceLocalizationService } } + class LocalizerTracker implements EventHandler { + + private final Map trackers; + + LocalizerTracker() { + this(new HashMap()); + } + + LocalizerTracker(Map trackers) { + this.trackers = trackers; + } + + public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) { + String locId = status.getLocalizerId(); + synchronized (trackers) { + LocalizerRunner localizer = trackers.get(locId); + if (null == localizer) { + // TODO process resources anyway + LocalizerHeartbeatResponse response = + recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); + response.setLocalizerAction(LocalizerAction.DIE); + return response; + } + return localizer.update(status.getResources()); + } + } + + public void stop() { + for (LocalizerRunner localizer : trackers.values()) { + localizer.interrupt(); + } + } + + @Override + public void handle(LocalizerEvent event) { + synchronized (trackers) { + String locId = event.getLocalizerId(); + LocalizerRunner localizer = trackers.get(locId); + switch(event.getType()) { + case REQUEST_RESOURCE_LOCALIZATION: + // 0) find running localizer or start new thread + LocalizerResourceRequestEvent req = + (LocalizerResourceRequestEvent)event; + if (null == localizer) { + LOG.info("Created localizer for " + req.getLocalizerId()); + localizer = new LocalizerRunner(req.getContext(), + sysDirs.get(0), req.getLocalizerId(), logDirs.get(0)); + trackers.put(locId, localizer); + localizer.start(); + } + // 1) propagate event + localizer.addResource(req); + break; + case ABORT_LOCALIZATION: + // 0) find running localizer, interrupt and remove + if (null == localizer) { + return; // ignore; already gone + } + trackers.remove(locId); + localizer.interrupt(); + break; + } + } + } + } + + class LocalizerRunner extends Thread { + + final LocalizerContext context; + final String localizerId; + final Path nmPrivate; + final Path logDir; + final Map scheduled; + final List pending; + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + LocalizerRunner(LocalizerContext context, Path nmPrivate, + String localizerId, Path logDir) { + this(context, nmPrivate, localizerId, logDir, + new ArrayList(), + new HashMap()); + } + + LocalizerRunner(LocalizerContext context, Path nmPrivate, + String localizerId, Path logDir, + List pending, + Map scheduled) { + this.nmPrivate = nmPrivate; + this.context = context; + this.localizerId = localizerId; + this.logDir = logDir; + this.pending = pending; + this.scheduled = scheduled; + } + + public void addResource(LocalizerResourceRequestEvent request) { + pending.add(request); + } + + LocalResource findNextResource() { + for (Iterator i = pending.iterator(); + i.hasNext();) { + LocalizerResourceRequestEvent evt = i.next(); + LocalizedResource nRsrc = evt.getResource(); + if (ResourceState.LOCALIZED.equals(nRsrc.getState())) { + i.remove(); + continue; + } + if (nRsrc.tryAcquire()) { + LocalResourceRequest nextRsrc = nRsrc.getRequest(); + LocalResource next = + recordFactory.newRecordInstance(LocalResource.class); + next.setResource( + ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath())); + next.setTimestamp(nextRsrc.getTimestamp()); + next.setType(nextRsrc.getType()); + next.setVisibility(evt.getVisibility()); + scheduled.put(nextRsrc, evt); + return next; + } + } + return null; + } + + // TODO this sucks. Fix it later + LocalizerHeartbeatResponse update( + List stats) { + LocalizerHeartbeatResponse response = + recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class); + + if (stats.isEmpty()) { + LocalResource next = findNextResource(); + if (next != null) { + response.setLocalizerAction(LocalizerAction.LIVE); + response.addResource(next); + } else if (pending.isEmpty()) { + response.setLocalizerAction(LocalizerAction.DIE); + } else { + response.setLocalizerAction(LocalizerAction.LIVE); + } + return response; + } + + for (LocalResourceStatus stat : stats) { + LocalResource rsrc = stat.getResource(); + LocalResourceRequest req = null; + try { + req = new LocalResourceRequest(rsrc); + } catch (URISyntaxException e) { + // TODO fail? Already translated several times... + } + LocalizerResourceRequestEvent assoc = scheduled.get(req); + if (assoc == null) { + // internal error + LOG.error("Unknown resource reported: " + req); + continue; + } + switch (stat.getStatus()) { + case FETCH_SUCCESS: + // notify resource + try { + assoc.getResource().handle( + new ResourceLocalizedEvent(req, + ConverterUtils.getPathFromYarnURL(stat.getLocalPath()), + stat.getLocalSize())); + } catch (URISyntaxException e) { } + if (pending.isEmpty()) { + response.setLocalizerAction(LocalizerAction.DIE); + break; + } + response.setLocalizerAction(LocalizerAction.LIVE); + LocalResource next = findNextResource(); + if (next != null) { + response.addResource(next); + } + break; + case FETCH_PENDING: + response.setLocalizerAction(LocalizerAction.LIVE); + break; + case FETCH_FAILURE: + LOG.info("DEBUG: FAILED " + req, stat.getException()); + assoc.getResource().unlock(); + response.setLocalizerAction(LocalizerAction.DIE); + dispatcher.getEventHandler().handle( + new ContainerResourceFailedEvent(context.getContainer(), + req, stat.getException())); + break; + default: + LOG.info("Unknown status: " + stat.getStatus()); + response.setLocalizerAction(LocalizerAction.DIE); + dispatcher.getEventHandler().handle( + new ContainerResourceFailedEvent(context.getContainer(), + req, stat.getException())); + break; + } + } + return response; + } + + @Override + @SuppressWarnings("unchecked") // dispatcher not typed + public void run() { + try { + // 0) init queue, etc. + // 1) write credentials to private dir + DataOutputStream tokenOut = null; + try { + Credentials credentials = context.getCredentials(); + Path cTokens = new Path(nmPrivate, String.format( + ContainerLocalizer.TOKEN_FILE_FMT, localizerId)); + FileContext lfs = getLocalFileContext(getConfig()); + tokenOut = lfs.create(cTokens, EnumSet.of(CREATE, OVERWRITE)); + LOG.info("Writing credentials to " + cTokens.toString() + + ". Credentials list: "); + for (Token tk : + credentials.getAllTokens()) { + LOG.info(tk.getService() + " : " + tk.encodeToUrlString()); + } + credentials.writeTokenStorageToStream(tokenOut); + } finally { + if (tokenOut != null) { + tokenOut.close(); + } + } + // 2) exec initApplication and wait + exec.startLocalizer(nmPrivate, locAddr, context.getUser(), + ConverterUtils.toString(context.getContainer().getAppId()), + localizerId, logDir, localDirs); + } catch (Exception e) { + // 3) on error, report failure to Container and signal ABORT + // 3.1) notify resource of failed localization + for (LocalizerResourceRequestEvent event : scheduled.values()) { + event.getResource().unlock(); + } + //dispatcher.getEventHandler().handle( + // new ContainerResourceFailedEvent(current.getContainer(), + // current.getResource().getRequest(), e)); + } + } + + } } Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; + +enum ResourceState { + INIT, + DOWNLOADING, + LOCALIZED +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,36 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; + +public class ApplicationLocalizationEvent extends LocalizationEvent { + + final Application app; + + public ApplicationLocalizationEvent(LocalizationEventType type, Application app) { + super(type); + this.app = app; + } + + public Application getApplication() { + return app; + } + +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,35 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +public class ContainerLocalizationEvent extends LocalizationEvent { + + final Container container; + + public ContainerLocalizationEvent(LocalizationEventType event, Container c) { + super(event); + this.container = c; + } + + public Container getContainer() { + return container; + } + +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,45 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +import java.util.Collection; + +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest; + +public class ContainerLocalizationRequestEvent extends ContainerLocalizationEvent { + + private final LocalResourceVisibility vis; + private final Collection reqs; + + public ContainerLocalizationRequestEvent(Container c, Collection reqs, + LocalResourceVisibility vis) { + super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c); + this.vis = vis; + this.reqs = reqs; + } + + public LocalResourceVisibility getVisibility() { + return vis; + } + + public Collection getRequestedResources() { + return reqs; + } +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,30 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.event.Event; + +public abstract class LocalizationEvent extends AbstractEvent { + + public LocalizationEvent(LocalizationEventType event) { + super(event); + } + +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java?rev=1097727&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java Fri Apr 29 08:35:53 2011 @@ -0,0 +1,25 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event; + +public enum LocalizationEventType { + INIT_APPLICATION_RESOURCES, + INIT_CONTAINER_RESOURCES, + CLEANUP_CONTAINER_RESOURCES, + DESTROY_APPLICATION_RESOURCES, +}