hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r1097727 [3/5] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ yarn/yarn-common/src/main/ja...
Date Fri, 29 Apr 2011 08:35:56 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,468 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+
+import java.net.InetSocketAddress;
+
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class ContainerLocalizer {
+
+  static final Log LOG = LogFactory.getLog(ContainerLocalizer.class);
+
+  public static final String FILECACHE = "filecache";
+  public static final String APPCACHE = "appcache";
+  public static final String USERCACHE = "usercache";
+  public static final String OUTPUTDIR = "output";
+  public static final String TOKEN_FILE_FMT = "%s.tokens";
+  public static final String WORKDIR = "work";
+  private static final String APPCACHE_CTXT_FMT = "%s.app.cache.dirs";
+  private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
+
+  private final String user;
+  private final String appId;
+  private final Path logDir;
+  private final List<Path> localDirs;
+  private final String localizerId;
+  private final FileContext lfs;
+  private final Configuration conf;
+  private final LocalDirAllocator appDirs;
+  private final LocalDirAllocator userDirs;
+  private final RecordFactory recordFactory;
+  private final Map<LocalResource,Future<Path>> pendingResources;
+
+  public ContainerLocalizer(String user, String appId, String localizerId,
+      Path logDir, List<Path> localDirs) throws IOException {
+    this(FileContext.getLocalFSFileContext(), user, appId, localizerId, logDir,
+        localDirs, new HashMap<LocalResource,Future<Path>>(),
+        RecordFactoryProvider.getRecordFactory(null));
+  }
+
+  ContainerLocalizer(FileContext lfs, String user, String appId,
+      String localizerId, Path logDir, List<Path> localDirs,
+      Map<LocalResource,Future<Path>> pendingResources,
+      RecordFactory recordFactory) throws IOException {
+    if (null == user) {
+      throw new IOException("Cannot initialize for null user");
+    }
+    if (null == localizerId) {
+      throw new IOException("Cannot initialize for null containerId");
+    }
+    this.lfs = lfs;
+    this.user = user;
+    this.appId = appId;
+    this.logDir = logDir;
+    this.localDirs = localDirs;
+    this.localizerId = localizerId;
+    this.recordFactory = recordFactory;
+    this.conf = new Configuration();
+    this.appDirs =
+      new LocalDirAllocator(String.format(APPCACHE_CTXT_FMT, appId));
+    this.userDirs =
+      new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, appId));
+    this.pendingResources = pendingResources;
+  }
+
+  LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
+    Configuration localizerConf = new Configuration();
+    YarnRPC rpc = YarnRPC.create(localizerConf);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      localizerConf.setClass(
+          CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+          LocalizerSecurityInfo.class, SecurityInfo.class);
+    }
+    return (LocalizationProtocol)
+      rpc.getProxy(LocalizationProtocol.class, nmAddr, localizerConf);
+  }
+
+  public int runLocalization(final InetSocketAddress nmAddr)
+      throws IOException, InterruptedException {
+    // load credentials
+    initDirs(conf, user, appId, lfs, logDir, localDirs);
+    final Credentials creds = new Credentials();
+    DataInputStream credFile = null;
+    try {
+      // assume credentials in cwd
+      credFile = lfs.open(
+          new Path(String.format(TOKEN_FILE_FMT, localizerId)));
+      creds.readTokenStorageStream(credFile);
+    } finally  {
+      if (credFile != null) {
+        credFile.close();
+      }
+    }
+    // create localizer context
+    UserGroupInformation remoteUser =
+      UserGroupInformation.createRemoteUser(user);
+    LocalizerTokenSecretManager secretManager =
+      new LocalizerTokenSecretManager();
+    LocalizerTokenIdentifier id = secretManager.createIdentifier();
+    Token<LocalizerTokenIdentifier> localizerToken =
+      new Token<LocalizerTokenIdentifier>(id, secretManager);
+    remoteUser.addToken(localizerToken);
+    final LocalizationProtocol nodeManager =
+        remoteUser.doAs(new PrivilegedAction<LocalizationProtocol>() {
+          @Override
+          public LocalizationProtocol run() {
+            return getProxy(nmAddr);
+          }
+        });
+
+    // create user context
+    UserGroupInformation ugi =
+      UserGroupInformation.createRemoteUser(user);
+    for (Token<? extends TokenIdentifier> token : creds.getAllTokens()) {
+      ugi.addToken(token);
+    }
+
+    return ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+      public Integer run() {
+        ExecutorService exec = null;
+        try {
+          exec = createDownloadThreadPool();
+          localizeFiles(nodeManager, exec);
+          return 0;
+        } catch (Throwable e) {
+          e.printStackTrace(System.out);
+          return -1;
+        } finally {
+          if (exec != null) {
+            exec.shutdownNow();
+          }
+        }
+      }
+    });
+  }
+
+  ExecutorService createDownloadThreadPool() {
+    return Executors.newSingleThreadExecutor();
+  }
+
+  Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc) {
+    return new FSDownload(lfs, conf, lda, rsrc, new Random());
+  }
+
+  void sleep(int duration) throws InterruptedException {
+    TimeUnit.SECONDS.sleep(duration);
+  }
+
+  void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec) {
+    while (true) {
+      try {
+        LocalizerStatus status = createStatus();
+        LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
+        switch (response.getLocalizerAction()) {
+        case LIVE:
+          List<LocalResource> newResources = response.getAllResources();
+          for (LocalResource r : newResources) {
+            if (!pendingResources.containsKey(r)) {
+              final LocalDirAllocator lda;
+              switch (r.getVisibility()) {
+              default:
+                LOG.warn("Unknown visibility: " + r.getVisibility());
+              case PUBLIC:
+              case PRIVATE:
+                lda = userDirs;
+                break;
+              case APPLICATION:
+                lda = appDirs;
+                break;
+              }
+              pendingResources.put(r, exec.submit(download(lda, r)));
+            }
+          }
+          break;
+        case DIE:
+          // killall running localizations
+          for (Future<Path> pending : pendingResources.values()) {
+            pending.cancel(true);
+          }
+          status = createStatus();
+          // ignore response
+          try {
+            nodemanager.heartbeat(status);
+          } catch (YarnRemoteException e) { }
+          return;
+        }
+        sleep(1);
+      } catch (InterruptedException e) {
+        return;
+      } catch (YarnRemoteException e) {
+        // TODO cleanup
+        return;
+      }
+    }
+  }
+
+  LocalizerStatus createStatus() throws InterruptedException {
+    final List<LocalResourceStatus> currentResources =
+      new ArrayList<LocalResourceStatus>();
+    for (Iterator<LocalResource> i = pendingResources.keySet().iterator();
+         i.hasNext();) {
+      LocalResource rsrc = i.next();
+      LocalResourceStatus stat =
+        recordFactory.newRecordInstance(LocalResourceStatus.class);
+      stat.setResource(rsrc);
+      Future<Path> fPath = pendingResources.get(rsrc);
+      if (fPath.isDone()) {
+        try {
+          Path localPath = fPath.get();
+          stat.setLocalPath(
+              ConverterUtils.getYarnUrlFromPath(localPath));
+          stat.setLocalSize(
+              FileUtil.getDU(new File(localPath.getParent().toString())));
+          stat.setStatus(ResourceStatusType.FETCH_SUCCESS);
+        } catch (ExecutionException e) {
+          stat.setStatus(ResourceStatusType.FETCH_FAILURE);
+          stat.setException(RPCUtil.getRemoteException(e.getCause()));
+        } catch (CancellationException e) {
+          stat.setStatus(ResourceStatusType.FETCH_FAILURE);
+          stat.setException(RPCUtil.getRemoteException(e));
+        }
+        // TODO shouldn't remove until ACK
+        i.remove();
+      } else {
+        stat.setStatus(ResourceStatusType.FETCH_PENDING);
+      }
+      currentResources.add(stat);
+    }
+    LocalizerStatus status =
+      recordFactory.newRecordInstance(LocalizerStatus.class);
+    status.setLocalizerId(localizerId);
+    status.addAllResources(currentResources);
+    return status;
+  }
+
+  public static void main(String[] argv) throws Throwable {
+    // usage: $0 user appId locId host port app_log_dir user_dir [user_dir]*
+    // let $x = $x/usercache for $local.dir
+    // MKDIR $x/$user/appcache/$appid
+    // MKDIR $x/$user/appcache/$appid/output
+    // MKDIR $x/$user/appcache/$appid/filecache
+    // LOAD $x/$user/appcache/$appid/appTokens
+    try {
+      String user = argv[0];
+      String appId = argv[1];
+      String locId = argv[2];
+      InetSocketAddress nmAddr =
+          new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
+      Path logDir = new Path(argv[5]);
+      String[] sLocaldirs = Arrays.copyOfRange(argv, 6, argv.length);
+      ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
+      for (String sLocaldir : sLocaldirs) {
+        localDirs.add(new Path(sLocaldir));
+      }
+
+      final String uid =
+          UserGroupInformation.getCurrentUser().getShortUserName();
+      if (!user.equals(uid)) {
+        // TODO: fail localization
+        LOG.warn("Localization running as " + uid + " not " + user);
+      }
+
+      ContainerLocalizer localizer =
+          new ContainerLocalizer(user, appId, locId, logDir, localDirs);
+      System.exit(localizer.runLocalization(nmAddr));
+    } catch (Throwable e) {
+      // Print error to stdout so that LCE can use it.
+      e.printStackTrace(System.out);
+      throw e;
+    }
+  }
+
+  private static void initDirs(Configuration conf, String user, String appId,
+      FileContext lfs, Path logDir, List<Path> localDirs) throws IOException {
+    if (null == localDirs || 0 == localDirs.size()) {
+      throw new IOException("Cannot initialize without local dirs");
+    }
+    String[] appCacheDirs = new String[localDirs.size()];
+    String[] userCacheDirs = new String[localDirs.size()];
+    for (int i = 0, n = localDirs.size(); i < n; ++i) {
+      // $x/usercache/$user
+      Path base = lfs.makeQualified(
+          new Path(new Path(localDirs.get(i), USERCACHE), user));
+      // $x/usercache/$user/filecache
+      Path uCacheDir = new Path(base, FILECACHE);
+      userCacheDirs[i] = uCacheDir.toString();
+      lfs.mkdir(uCacheDir, null, true);
+      // $x/usercache/$user/appcache/$appId
+      Path appBase = new Path(base, new Path(APPCACHE, appId));
+      lfs.mkdir(appBase, null, true);
+      // $x/usercache/$user/appcache/$appId/filecache
+      Path aCacheDir = new Path(appBase, FILECACHE);
+      appCacheDirs[i] = aCacheDir.toString();
+      lfs.mkdir(aCacheDir, null, true);
+      // $x/usercache/$user/appcache/$appId/output
+      lfs.mkdir(new Path(appBase, OUTPUTDIR), null, true);
+    }
+    conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appCacheDirs);
+    conf.setStrings(String.format(USERCACHE_CTXT_FMT, appId), userCacheDirs);
+    lfs.mkdir(logDir, null, true);
+  }
+
+  public static void writeLaunchEnv(OutputStream out,
+      Map<String,String> environment, Map<Path,String> resources,
+      List<String> command, List<Path> appDirs)
+      throws IOException {
+    ShellScriptBuilder sb = new ShellScriptBuilder();
+    if (System.getenv("YARN_HOME") != null) {
+      sb.env("YARN_HOME", System.getenv("YARN_HOME"));
+    }
+    sb.env(YARNApplicationConstants.LOCAL_DIR_ENV,
+        StringUtils.join(",", appDirs));
+    if (environment != null) {
+      for (Map.Entry<String,String> env : environment.entrySet()) {
+        sb.env(env.getKey().toString(), env.getValue().toString());
+      }
+    }
+    if (resources != null) {
+      for (Map.Entry<Path,String> link : resources.entrySet()) {
+        sb.symlink(link.getKey(), link.getValue());
+      }
+    }
+    ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
+    cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
+    cmd.add("/bin/bash ");
+    cmd.add("-c ");
+    cmd.add("\"");
+    for (String cs : command) {
+      cmd.add(cs.toString());
+      cmd.add(" ");
+    }
+    cmd.add("\"");
+    sb.line(cmd.toArray(new String[cmd.size()]));
+    PrintStream pout = null;
+    try {
+      pout = new PrintStream(out);
+      sb.write(pout);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  private static class ShellScriptBuilder {
+
+    private final StringBuilder sb;
+
+    public ShellScriptBuilder() {
+      this(new StringBuilder("#!/bin/bash\n\n"));
+    }
+
+    protected ShellScriptBuilder(StringBuilder sb) {
+      this.sb = sb;
+    }
+
+    public ShellScriptBuilder env(String key, String value) {
+      line("export ", key, "=\"", value, "\"");
+      return this;
+    }
+
+    public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
+      return symlink(src, new Path(dst));
+    }
+
+    public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
+      if (!src.isAbsolute()) {
+        throw new IOException("Source must be absolute");
+      }
+      if (dst.isAbsolute()) {
+        throw new IOException("Destination must be relative");
+      }
+      if (dst.toUri().getPath().indexOf('/') != -1) {
+        line("mkdir -p ", dst.getParent().toString());
+      }
+      line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
+      return this;
+    }
+
+    public void write(PrintStream out) throws IOException {
+      out.append(sb);
+    }
+
+    public void line(String... command) {
+      for (String s : command) {
+        sb.append(s);
+      }
+      sb.append("\n");
+    }
+
+    @Override
+    public String toString() {
+      return sb.toString();
+    }
+
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java Fri Apr 29 08:35:53 2011
@@ -22,8 +22,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URISyntaxException;
-import java.util.Collections;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 
@@ -48,7 +46,7 @@ import org.apache.hadoop.yarn.api.record
  * Download a single URL to the local disk.
  *
  */
-public class FSDownload implements Callable<Map<LocalResource,Path>> {
+public class FSDownload implements Callable<Path> {
 
   private static final Log LOG = LogFactory.getLog(FSDownload.class);
 
@@ -117,7 +115,7 @@ public class FSDownload implements Calla
   }
 
   @Override
-  public Map<LocalResource,Path> call() throws IOException {
+  public Path call() throws IOException {
     Path sCopy;
     try {
       sCopy = ConverterUtils.getPathFromYarnURL(resource.getResource());
@@ -140,7 +138,7 @@ public class FSDownload implements Calla
     Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
     try {
       Path dTmp = files.makeQualified(copy(sCopy, dst_work));
-      resource.setSize(unpack(new File(dTmp.toUri()), new File(dFinal.toUri())));
+      unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
       files.rename(dst_work, dst, Rename.OVERWRITE);
     } catch (IOException e) {
       try { files.delete(dst, true); } catch (IOException ignore) { }
@@ -152,12 +150,11 @@ public class FSDownload implements Calla
       // clear ref to internal var
       rand = null;
       conf = null;
-      //resource = null; TODO change downstream to not req
+      resource = null;
       dirs = null;
       cachePerms = null;
     }
-    return Collections.singletonMap(resource,
-        files.makeQualified(new Path(dst, sCopy.getName())));
+    return files.makeQualified(new Path(dst, sCopy.getName()));
   }
 
   private static long getEstimatedSize(LocalResource rsrc) {

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,103 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class LocalResourceRequest implements Comparable<LocalResourceRequest> {
+
+  private final Path loc;
+  private final long timestamp;
+  private final LocalResourceType type;
+
+  /**
+   * Wrap API resource to match against cache of localized resources.
+   * @param resource Resource requested by container
+   * @throws URISyntaxException If the path is malformed
+   */
+  public LocalResourceRequest(LocalResource resource)
+      throws URISyntaxException {
+    this.loc = ConverterUtils.getPathFromYarnURL(resource.getResource());
+    this.timestamp = resource.getTimestamp();
+    this.type = resource.getType();
+  }
+
+  @Override
+  public int hashCode() {
+    return loc.hashCode() ^
+      (int)((timestamp >>> 32) ^ timestamp) *
+      type.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof LocalResourceRequest)) {
+      return false;
+    }
+    final LocalResourceRequest other = (LocalResourceRequest) o;
+    return getPath().equals(other.getPath()) &&
+           getTimestamp() == other.getTimestamp() &&
+           getType() == other.getType();
+  }
+
+  @Override
+  public int compareTo(LocalResourceRequest other) {
+    if (this == other) {
+      return 0;
+    }
+    int ret = getPath().compareTo(other.getPath());
+    if (0 == ret) {
+      ret = (int)(getTimestamp() - other.getTimestamp());
+      if (0 == ret) {
+        ret = getType().ordinal() - other.getType().ordinal();
+      }
+    }
+    return ret;
+  }
+
+  public Path getPath() {
+    return loc;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public LocalResourceType getType() {
+    return type;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{ ");
+    sb.append(getPath().toString()).append(", ");
+    sb.append(getTimestamp()).append(", ");
+    sb.append(getType()).append(" }");
+    return sb.toString();
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Fri Apr 29 08:35:53 2011
@@ -18,23 +18,11 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
-import java.net.URISyntaxException;
-import java.util.Collection;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+public interface LocalResourcesTracker extends EventHandler<ResourceEvent> {
 
-public interface LocalResourcesTracker {
-
-  Collection<org.apache.hadoop.yarn.api.records.LocalResource> register(
-      AppLocalizationRunnerImpl appLocalizationRunner,
-      Collection<org.apache.hadoop.yarn.api.records.LocalResource> values)
-      throws URISyntaxException;
-
-  void setSuccess(LocalResource localRsrc, long size, Path pathFromYarnURL)
-      throws IllegalArgumentException, InterruptedException;
-
-  void removeFailedResource(LocalResource localResource,
-      YarnRemoteException cause);
+  public boolean contains(LocalResourceRequest resource);
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Apr 29 08:35:53 2011
@@ -15,151 +15,50 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
 class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
 
-  private final ConcurrentHashMap<LocalResource,LocalizedResource> localrsrc =
-    new ConcurrentHashMap<LocalResource,LocalizedResource>();
-
-  public void setSuccess(LocalResource lResource, long rsrcSize,
-      Path localPath) throws IllegalArgumentException, InterruptedException {
-    LocalizedResource rsrc = localrsrc.get(lResource);
-    if (null == rsrc) {
-      throw new IllegalArgumentException("Unknown: " + lResource.getPath());
-    }
-    rsrc.success(lResource, rsrcSize, localPath);
-  }
-
-  public void removeFailedResource(LocalResource lResource,
-      YarnRemoteException e) throws IllegalArgumentException {
-    LocalizedResource rsrc = localrsrc.get(lResource);
-    if (null == rsrc) {
-      throw new IllegalArgumentException("Unknown: " + lResource.getPath());
-    }
-    rsrc.failure(lResource, e);
-  }
-
-  protected long currentTime() {
-    return System.nanoTime();
-  }
-
-  // TODO replace w/ queue over RPC
-  /** @return Resources not present in this bundle */
-  public Collection<org.apache.hadoop.yarn.api.records.LocalResource> register(
-      AppLocalizationRunnerImpl app,
-      Collection<org.apache.hadoop.yarn.api.records.LocalResource> rsrcs)
-      throws URISyntaxException {
-    ArrayList<org.apache.hadoop.yarn.api.records.LocalResource> ret =
-      new ArrayList<org.apache.hadoop.yarn.api.records.LocalResource>(rsrcs.size());
-    for (final org.apache.hadoop.yarn.api.records.LocalResource yrsrc : rsrcs) {
-      final LocalizedResource cand =
-        new LocalizedResource(new Callable<Map<LocalResource,Path>>() {
-              @Override
-              public Map<LocalResource,Path> call() {
-                // Future complete from RPC callback
-                throw new UnsupportedOperationException();
-              }
-            });
-      final LocalResource rsrc = new LocalResource(yrsrc);
-      while (true) {
-        LocalizedResource actual = localrsrc.putIfAbsent(rsrc, cand);
-        if (null == actual) {
-          cand.request(app);
-          ret.add(yrsrc);
-          break;
-        }
-        // ensure resource localization not cancelled
-        if (actual.isCancelled()) {
-          if (!localrsrc.replace(rsrc, actual, cand)) {
-            // newer entry exists
-            continue;
-          } else {
-            cand.request(app);
-            ret.add(yrsrc);
-            break;
-          }
-        }
-        actual.request(app);
-        break;
+  private final Dispatcher dispatcher;
+  private final ConcurrentHashMap<LocalResourceRequest,LocalizedResource>
+    localrsrc = new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
+
+  public LocalResourcesTrackerImpl(Dispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  public void handle(ResourceEvent event) {
+    LocalResourceRequest req = event.getLocalResource();
+    LocalizedResource rsrc = localrsrc.get(req);
+    switch (event.getType()) {
+    case REQUEST:
+    case LOCALIZED:
+      if (null == rsrc) {
+        rsrc = new LocalizedResource(req, dispatcher);
+        localrsrc.put(req, rsrc);
       }
-    }
-    return ret;
-  }
-
-  public void release(AppLocalizationRunnerImpl app, LocalResource[] resources) {
-    for (LocalResource rsrc : resources) {
-      LocalizedResource resource = localrsrc.get(rsrc);
-      if (resource != null) {
-        // XXX update timestamp to last-used?
-        resource.refCount.getAndDecrement();
-        resource.notifyQueue.remove(app);
+      break;
+    case RELEASE:
+      if (null == rsrc) {
+        LOG.info("Release unknown rsrc " + rsrc + " (discard)");
+        return;
       }
+      break;
     }
+    rsrc.handle(event);
   }
 
-  /**
-   * Private inner datum tracking a resource that is already localized.
-   */
-  // TODO use AQS for download
-  private class LocalizedResource extends
-      FutureTask<Map<LocalResource, Path>> {
-    private final AtomicInteger refCount = new AtomicInteger(0);
-    private final AtomicLong timestamp = new AtomicLong(currentTime());
-    private final BlockingQueue<AppLocalizationRunnerImpl> notifyQueue =
-      new LinkedBlockingQueue<AppLocalizationRunnerImpl>();
-    // TODO: Why is it needed?
-    private volatile long size = -1;
-    LocalizedResource(Callable<Map<LocalResource,Path>> fetch) {
-      super(fetch);
-    }
-    void request(AppLocalizationRunnerImpl appRunner) {
-      refCount.getAndIncrement();
-      timestamp.set(currentTime());
-      notifyQueue.offer(appRunner);
-      if (isDone() && notifyQueue.remove(appRunner)) {
-        appRunner.localizedResource(this);
-      }
-    }
-    void success(LocalResource rsrc, long rsrcSize, Path p)
-        throws InterruptedException {
-      size = rsrcSize;
-      set(Collections.singletonMap(rsrc, p));
-    }
-    void failure(LocalResource r, YarnRemoteException e) {
-      // TODO: How do you inform the appRunner?
-      localrsrc.remove(r, this);
-      setException(e);
-    }
-    @Override
-    protected void done() {
-      for (AppLocalizationRunner appRunner = notifyQueue.poll(); appRunner != null;
-           appRunner = notifyQueue.poll()) {
-        appRunner.localizedResource(this);
-      }
-    }
+  public boolean contains(LocalResourceRequest resource) {
+    return localrsrc.contains(resource);
   }
 
 }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,225 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+
+public class LocalizedResource implements EventHandler<ResourceEvent> {
+
+  private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
+
+  Path localPath;
+  long size = -1;
+  final LocalResourceRequest rsrc;
+  final Dispatcher dispatcher;
+  final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
+    stateMachine;
+  final Semaphore sem = new Semaphore(1);
+  final Queue<ContainerId> ref;
+  final AtomicLong timestamp = new AtomicLong(currentTime());
+
+  private static final StateMachineFactory<LocalizedResource,ResourceState,
+      ResourceEventType,ResourceEvent> stateMachineFactory =
+        new StateMachineFactory<LocalizedResource,ResourceState,
+          ResourceEventType,ResourceEvent>(ResourceState.INIT)
+    // From INIT (ref == 0, awaiting req)
+    .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
+        ResourceEventType.REQUEST, new FetchResourceTransition())
+    .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
+        ResourceEventType.LOCALIZED, new FetchDirectTransition())
+    .addTransition(ResourceState.INIT, ResourceState.INIT,
+        ResourceEventType.RELEASE, new ReleaseTransition())
+    // From DOWNLOADING (ref > 0, may be localizing)
+    .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
+        ResourceEventType.REQUEST, new FetchResourceTransition())
+    .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
+        ResourceEventType.LOCALIZED, new FetchSuccessTransition())
+    .addTransition(ResourceState.DOWNLOADING,
+        EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
+        ResourceEventType.RELEASE, new ReleasePendingTransition())
+    // From LOCALIZED (ref >= 0, on disk)
+    .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
+        ResourceEventType.REQUEST, new LocalizedResourceTransition())
+    .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
+        ResourceEventType.LOCALIZED, new LocalizedResourceTransition())
+    .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
+        ResourceEventType.RELEASE, new ReleaseTransition())
+    .installTopology();
+
+  public LocalizedResource(LocalResourceRequest rsrc, Dispatcher dispatcher) {
+    this.rsrc = rsrc;
+    this.dispatcher = dispatcher;
+    this.ref = new LinkedList<ContainerId>();
+    this.stateMachine = stateMachineFactory.make(this);
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{ ").append(rsrc.toString()).append(",")
+      .append(getState() == ResourceState.LOCALIZED
+          ? localPath 
+          : "pending").append(",[");
+    for (ContainerId c : ref) {
+      sb.append("(").append(c.toString()).append(")");
+    }
+    sb.append("],").append(timestamp.get()).append("}");
+    return sb.toString();
+  }
+
+  public void release(ContainerId container) {
+    if (!ref.remove(container)) {
+      LOG.info("Attempt to release claim on " + this +
+               " from unregistered container " + container);
+      assert false;
+    }
+    timestamp.set(currentTime());
+  }
+
+  long currentTime() {
+    return System.nanoTime();
+  }
+
+  public ResourceState getState() {
+    return stateMachine.getCurrentState();
+  }
+
+  public LocalResourceRequest getRequest() {
+    return rsrc;
+  }
+
+  public boolean tryAcquire() {
+    return sem.tryAcquire();
+  }
+
+  public void unlock() {
+    sem.release();
+  }
+
+  public synchronized void handle(ResourceEvent event) {
+    stateMachine.doTransition(event.getType(), event);
+  }
+
+  static abstract class ResourceTransition implements
+      SingleArcTransition<LocalizedResource,ResourceEvent> {
+    // typedef
+  }
+
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  private static class FetchResourceTransition extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      ResourceRequestEvent req = (ResourceRequestEvent) event;
+      LocalizerContext ctxt = req.getContext();
+      ContainerId container = ctxt.getContainer();
+      rsrc.ref.add(container);
+      rsrc.dispatcher.getEventHandler().handle(
+          new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt));
+    }
+  }
+
+  private static class FetchDirectTransition extends FetchSuccessTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      LOG.warn("Resource " + rsrc + " localized without listening container");
+      super.transition(rsrc, event);
+    }
+  }
+
+  /**
+   * Resource localized, notify waiting containers.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  private static class FetchSuccessTransition extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
+      rsrc.localPath = locEvent.getLocation();
+      rsrc.size = locEvent.getSize();
+      for (ContainerId container : rsrc.ref) {
+        rsrc.dispatcher.getEventHandler().handle(
+            new ContainerResourceLocalizedEvent(
+              container, rsrc.rsrc, rsrc.localPath));
+      }
+    }
+  }
+
+  /**
+   * Resource already localized, notify immediately.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  private static class LocalizedResourceTransition
+      extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      // notify waiting containers
+      ResourceRequestEvent reqEvent = (ResourceRequestEvent) event;
+      ContainerId container = reqEvent.getContext().getContainer();
+      rsrc.ref.add(container);
+      rsrc.dispatcher.getEventHandler().handle(
+          new ContainerResourceLocalizedEvent(
+            container, rsrc.rsrc, rsrc.localPath));
+    }
+  }
+
+  /**
+   * Decrement resource count, update timestamp.
+   */
+  private static class ReleaseTransition extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      // Note: assumes that localizing container must succeed or fail
+      ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
+      rsrc.release(relEvent.getContainer());
+    }
+  }
+
+  private static class ReleasePendingTransition implements
+      MultipleArcTransition<LocalizedResource,ResourceEvent,ResourceState> {
+    @Override
+    public ResourceState transition(LocalizedResource rsrc,
+        ResourceEvent event) {
+      ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
+      rsrc.release(relEvent.getContainer());
+      return rsrc.ref.isEmpty()
+        ? ResourceState.INIT
+        : ResourceState.DOWNLOADING;
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizerContext.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,31 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class LocalizerContext {
+
+  private final String user;
+  private final ContainerId container;
+  private final Credentials credentials;
+
+  public LocalizerContext(String user, ContainerId container,
+      Credentials credentials) {
+    this.user = user;
+    this.container = container;
+    this.credentials = credentials;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public ContainerId getContainer() {
+    return container;
+  }
+
+  public Credentials getCredentials() {
+    return credentials;
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Fri Apr 29 08:35:53 2011
@@ -15,15 +15,30 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.io.DataOutputStream;
+
+import java.net.URISyntaxException;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCALIZER_BIND_ADDRESS;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCALIZER_BIND_ADDRESS;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR;
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -32,8 +47,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.avro.ipc.Server;
 import org.apache.commons.logging.Log;
@@ -49,40 +62,44 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
-import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationRequest;
-import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.FailedLocalizationResponse;
-import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationRequest;
-import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.SuccessfulLocalizationResponse;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerResourceFailedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.api.records.URL;
-
 
 public class ResourceLocalizationService extends AbstractService
-    implements EventHandler<LocalizerEvent>, LocalizationProtocol {
+    implements EventHandler<LocalizationEvent>, LocalizationProtocol {
 
   private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class);
   public static final String NM_PRIVATE_DIR = "nmPrivate";
-  public final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700);
+  public static final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700);
 
   private Server server;
   private InetSocketAddress locAddr;
@@ -92,22 +109,15 @@ public class ResourceLocalizationService
   private final ContainerExecutor exec;
   protected final Dispatcher dispatcher;
   private final DeletionService delService;
-  private final ExecutorService appLocalizerThreadPool =
-    Executors.newCachedThreadPool();
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-  /**
-   * Map of private resources of users.
-   */
+  private LocalizerTracker localizers;
+  private final RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  //private final LocalResourcesTracker publicRsrc;
   private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
     new ConcurrentHashMap<String,LocalResourcesTracker>();
-
-  /**
-   * Map of applications that are in the process of localization.
-   * TODO: Why is it needed?
-   */
-  private final ConcurrentMap<String,AppLocalizationRunner> localizingApps =
-    new ConcurrentHashMap<String,AppLocalizationRunner>();
+  private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
+    new ConcurrentHashMap<String,LocalResourcesTracker>();
 
   public ResourceLocalizationService(Dispatcher dispatcher,
       ContainerExecutor exec, DeletionService delService) {
@@ -117,16 +127,21 @@ public class ResourceLocalizationService
     this.delService = delService;
   }
 
+  FileContext getLocalFileContext(Configuration conf) {
+    try {
+      return FileContext.getLocalFSFileContext(conf);
+    } catch (IOException e) {
+      throw new YarnException("Failed to access local fs");
+    }
+  }
+
   @Override
   public void init(Configuration conf) {
-    // TODO limit for public cache, not appLocalizer
-    //appLocalizer.setMaximumPoolSize(
-    //    conf.getInt(NM_MAX_PUBLIC_FETCH_THREADS,
-    //                DEFAULT_MAX_PUBLIC_FETCH_THREADS));
     try {
       // TODO queue deletions here, rather than NM init?
-      FileContext lfs = FileContext.getLocalFSFileContext(conf);
-      String[] sLocalDirs = conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
+      FileContext lfs = getLocalFileContext(conf);
+      String[] sLocalDirs =
+        conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
 
       localDirs = new ArrayList<Path>(sLocalDirs.length);
       logDirs = new ArrayList<Path>(sLocalDirs.length);
@@ -135,10 +150,10 @@ public class ResourceLocalizationService
         Path localdir = new Path(sLocaldir);
         localDirs.add(localdir);
         // $local/usercache
-        Path userdir = new Path(localdir, ApplicationLocalizer.USERCACHE);
+        Path userdir = new Path(localdir, ContainerLocalizer.USERCACHE);
         lfs.mkdir(userdir, null, true);
         // $local/filecache
-        Path filedir = new Path(localdir, ApplicationLocalizer.FILECACHE);
+        Path filedir = new Path(localdir, ContainerLocalizer.FILECACHE);
         lfs.mkdir(filedir, null, true);
         // $local/nmPrivate
         Path sysdir = new Path(localdir, NM_PRIVATE_DIR);
@@ -152,18 +167,32 @@ public class ResourceLocalizationService
         lfs.mkdir(logdir, null, true);
       }
     } catch (IOException e) {
-      throw new YarnException("Failed to start Localizer", e);
+      throw new YarnException("Failed to initialize LocalizationService", e);
     }
     localDirs = Collections.unmodifiableList(localDirs);
     logDirs = Collections.unmodifiableList(logDirs);
     sysDirs = Collections.unmodifiableList(sysDirs);
     locAddr = NetUtils.createSocketAddr(
-        conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
+      conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
+    localizers = new LocalizerTracker();
+    dispatcher.register(LocalizerEventType.class, localizers);
     super.init(conf);
   }
 
   @Override
+  public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
+    return localizers.processHeartbeat(status);
+  }
+
+  @Override
   public void start() {
+    server = createServer();
+    LOG.info("Localizer started on port " + server.getPort());
+    server.start();
+    super.start();
+  }
+
+  Server createServer() {
     YarnRPC rpc = YarnRPC.create(getConfig());
     Configuration conf = new Configuration(getConfig()); // Clone to separate
                                                          // sec-info classes
@@ -173,127 +202,87 @@ public class ResourceLocalizationService
           LocalizerSecurityInfo.class, SecurityInfo.class);
       secretManager = new LocalizerTokenSecretManager();
     }
-    server =
-        rpc.getServer(LocalizationProtocol.class, this, locAddr, conf,
-            secretManager);
-    LOG.info("Localizer started at " + locAddr);
-    server.start();
-    super.start();
-  }
-
-  /** LTC communication w/ Localizer */
-  public InetSocketAddress getAddress() {
-    return locAddr;
-  }
-
-  /**
-   * Localizer report to NodeManager of localized resource.
-   * @param user Owner of the private cache
-   * @param resource Resource localized
-   * @param path Location on the local filesystem, or null if failed
-   */
-  
-  @Override
-  public SuccessfulLocalizationResponse successfulLocalization(SuccessfulLocalizationRequest request) throws YarnRemoteException {
-    String user = request.getUser();
-    org.apache.hadoop.yarn.api.records.LocalResource resource = request.getResource();
-    URL path = request.getPath();
-    // TODO validate request
-    LocalResourcesTracker userCache = privateRsrc.get(user.toString());
-    if (null == userCache) {
-      throw RPCUtil.getRemoteException("Unknown user: " + user);
-    }
-    try {
-      userCache.setSuccess(new LocalResource(resource),
-          resource.getSize(), ConverterUtils.getPathFromYarnURL(path));
-    } catch (Exception e) {
-      throw RPCUtil.getRemoteException(e);
-    }
-    SuccessfulLocalizationResponse response = recordFactory.newRecordInstance(SuccessfulLocalizationResponse.class);
-    return response;
-  }
-
-  public FailedLocalizationResponse failedLocalization(FailedLocalizationRequest request) throws YarnRemoteException {
-    String user = request.getUser();
-    org.apache.hadoop.yarn.api.records.LocalResource resource = request.getResource();
-    YarnRemoteException cause = request.getException();
-    LocalResourcesTracker userCache = privateRsrc.get(user.toString());
-    if (null == userCache) {
-      throw RPCUtil.getRemoteException("Unknown user: " + user);
-    }
-    try {
-      userCache.removeFailedResource(new LocalResource(resource), cause);
-    } catch (Exception e) {
-      throw RPCUtil.getRemoteException(e);
-    }
-    FailedLocalizationResponse response = recordFactory.newRecordInstance(FailedLocalizationResponse.class);
-    return response;
+    return rpc.getServer(
+        LocalizationProtocol.class, this, locAddr, conf, secretManager);
   }
 
   @Override
   public void stop() {
-    appLocalizerThreadPool.shutdownNow();
     if (server != null) {
       server.close();
     }
+    if (localizers != null) {
+      localizers.stop();
+    }
     super.stop();
   }
 
   @Override
-  public void handle(LocalizerEvent event) {
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  public void handle(LocalizationEvent event) {
     String userName;
     String appIDStr;
-    
+    // TODO: create log dir as $logdir/$user/$appId
     switch (event.getType()) {
     case INIT_APPLICATION_RESOURCES:
-      Application app = ((ApplicationLocalizerEvent)event).getApplication();
-      String user = app.getUser();
-      LocalResourcesTracker rsrcTracker = privateRsrc.get(user);
-      if (null == rsrcTracker) {
-        LocalResourcesTracker perUserPrivateRsrcTracker =
-            new LocalResourcesTrackerImpl();
-        rsrcTracker =
-            privateRsrc.putIfAbsent(user, perUserPrivateRsrcTracker);
-        if (null == rsrcTracker) {
-          rsrcTracker = perUserPrivateRsrcTracker;
-        }
+      Application app =
+        ((ApplicationLocalizationEvent)event).getApplication();
+      // 0) Create application tracking structs
+      privateRsrc.putIfAbsent(app.getUser(),
+          new LocalResourcesTrackerImpl(dispatcher));
+      if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
+          new LocalResourcesTrackerImpl(dispatcher))) {
+        LOG.warn("Initializing application " + app + " already present");
+        assert false;
       }
-
-      // Possibility of duplicate app localization is avoided by ApplicationImpl
-      // itself, so we are good to go.
-
-      // TODO: use LDA for picking single logdir, sysDir
-      // TODO: create log dir as $logdir/$user/$appId
-      try {
-        AppLocalizationRunner appLocalizationRunner =
-          new AppLocalizationRunnerImpl(dispatcher,
-              app, this, null, rsrcTracker, exec, logDirs.get(0), localDirs,
-              new Path(sysDirs.get(0), app.toString()));
-        localizingApps.put(app.toString(), appLocalizationRunner);
-        appLocalizerThreadPool.submit(appLocalizationRunner);
-      } catch (IOException e) {
-        // TODO kill containers
-        LOG.info("Failed to submit application", e);
-        //dispatcher.getEventHandler().handle(
+      // 1) Signal container init
+      dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
+            app.getAppId(), logDirs.get(0)));
+      break;
+    case INIT_CONTAINER_RESOURCES:
+      ContainerLocalizationRequestEvent rsrcReqs =
+        (ContainerLocalizationRequestEvent) event;
+      Container c = rsrcReqs.getContainer();
+      LocalizerContext ctxt = new LocalizerContext(
+          c.getUser(), c.getContainerID(), c.getCredentials());
+      final LocalResourcesTracker tracker;
+      LocalResourceVisibility vis = rsrcReqs.getVisibility();
+      switch (vis) {
+      default:
+      case PUBLIC:
+        // TODO
+        tracker = null;
+        break;
+      case PRIVATE:
+        tracker = privateRsrc.get(c.getUser());
+        break;
+      case APPLICATION:
+        tracker =
+          appRsrc.get(ConverterUtils.toString(c.getContainerID().getAppId()));
+        break;
+      }
+      for (LocalResourceRequest req : rsrcReqs.getRequestedResources()) {
+        tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
       }
       break;
     case CLEANUP_CONTAINER_RESOURCES:
-      Container container = ((ContainerLocalizerEvent)event).getContainer();
+      Container container =
+        ((ContainerLocalizationEvent)event).getContainer();
 
       // Delete the container directories
       userName = container.getUser();;
       String containerIDStr = container.toString();
-      appIDStr = ConverterUtils.toString(container.getContainerID().getAppId());
+      appIDStr =
+        ConverterUtils.toString(container.getContainerID().getAppId());
       for (Path localDir : localDirs) {
-        Path usersdir = new Path(localDir, ApplicationLocalizer.USERCACHE);
+        Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
         Path userdir =
             new Path(usersdir, userName);
-        Path allAppsdir = new Path(userdir, ApplicationLocalizer.APPCACHE);
+        Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
         Path appDir = new Path(allAppsdir, appIDStr);
         Path containerDir =
             new Path(appDir, containerIDStr);
-        delService.delete(userName,
-            containerDir, null);
+        delService.delete(userName, containerDir, null);
 
         Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
         Path appSysDir = new Path(sysDir, appIDStr);
@@ -308,19 +297,21 @@ public class ResourceLocalizationService
     case DESTROY_APPLICATION_RESOURCES:
 
       Application application =
-          ((ApplicationLocalizerEvent) event).getApplication();
+          ((ApplicationLocalizationEvent) event).getApplication();
+      if (null == appRsrc.remove(application)) {
+        LOG.warn("Removing uninitialized application " + application);
+      }
 
       // Delete the application directories
       userName = application.getUser();
       appIDStr = application.toString();
       for (Path localDir : localDirs) {
-        Path usersdir = new Path(localDir, ApplicationLocalizer.USERCACHE);
+        Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
         Path userdir =
             new Path(usersdir, userName);
-        Path allAppsdir = new Path(userdir, ApplicationLocalizer.APPCACHE);
+        Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
         Path appDir = new Path(allAppsdir, appIDStr);
-        delService.delete(userName,
-            appDir, null);
+        delService.delete(userName, appDir, null);
 
         Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
         Path appSysDir = new Path(sysDir, appIDStr);
@@ -337,4 +328,247 @@ public class ResourceLocalizationService
     }
   }
 
+  class LocalizerTracker implements EventHandler<LocalizerEvent> {
+
+    private final Map<String,LocalizerRunner> trackers;
+
+    LocalizerTracker() {
+      this(new HashMap<String,LocalizerRunner>());
+    }
+
+    LocalizerTracker(Map<String,LocalizerRunner> trackers) {
+      this.trackers = trackers;
+    }
+
+    public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
+      String locId = status.getLocalizerId();
+      synchronized (trackers) {
+        LocalizerRunner localizer = trackers.get(locId);
+        if (null == localizer) {
+          // TODO process resources anyway
+          LocalizerHeartbeatResponse response =
+            recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
+          response.setLocalizerAction(LocalizerAction.DIE);
+          return response;
+        }
+        return localizer.update(status.getResources());
+      }
+    }
+
+    public void stop() {
+      for (LocalizerRunner localizer : trackers.values()) {
+        localizer.interrupt();
+      }
+    }
+
+    @Override
+    public void handle(LocalizerEvent event) {
+      synchronized (trackers) {
+        String locId = event.getLocalizerId();
+        LocalizerRunner localizer = trackers.get(locId);
+        switch(event.getType()) {
+          case REQUEST_RESOURCE_LOCALIZATION:
+            // 0) find running localizer or start new thread
+            LocalizerResourceRequestEvent req =
+              (LocalizerResourceRequestEvent)event;
+            if (null == localizer) {
+              LOG.info("Created localizer for " + req.getLocalizerId());
+              localizer = new LocalizerRunner(req.getContext(),
+                  sysDirs.get(0), req.getLocalizerId(), logDirs.get(0));
+              trackers.put(locId, localizer);
+              localizer.start();
+            }
+            // 1) propagate event
+            localizer.addResource(req);
+            break;
+          case ABORT_LOCALIZATION:
+            // 0) find running localizer, interrupt and remove
+            if (null == localizer) {
+              return; // ignore; already gone
+            }
+            trackers.remove(locId);
+            localizer.interrupt();
+            break;
+        }
+      }
+    }
+  }
+
+  class LocalizerRunner extends Thread {
+
+    final LocalizerContext context;
+    final String localizerId;
+    final Path nmPrivate;
+    final Path logDir;
+    final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
+    final List<LocalizerResourceRequestEvent> pending;
+
+    private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+    LocalizerRunner(LocalizerContext context, Path nmPrivate,
+        String localizerId, Path logDir) {
+      this(context, nmPrivate, localizerId, logDir,
+          new ArrayList<LocalizerResourceRequestEvent>(),
+          new HashMap<LocalResourceRequest,LocalizerResourceRequestEvent>());
+    }
+
+    LocalizerRunner(LocalizerContext context, Path nmPrivate,
+        String localizerId, Path logDir,
+        List<LocalizerResourceRequestEvent> pending,
+        Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled) {
+      this.nmPrivate = nmPrivate;
+      this.context = context;
+      this.localizerId = localizerId;
+      this.logDir = logDir;
+      this.pending = pending;
+      this.scheduled = scheduled;
+    }
+
+    public void addResource(LocalizerResourceRequestEvent request) {
+      pending.add(request);
+    }
+
+    LocalResource findNextResource() {
+      for (Iterator<LocalizerResourceRequestEvent> i = pending.iterator();
+           i.hasNext();) {
+        LocalizerResourceRequestEvent evt = i.next();
+        LocalizedResource nRsrc = evt.getResource();
+        if (ResourceState.LOCALIZED.equals(nRsrc.getState())) {
+          i.remove();
+          continue;
+        }
+        if (nRsrc.tryAcquire()) {
+          LocalResourceRequest nextRsrc = nRsrc.getRequest();
+          LocalResource next =
+            recordFactory.newRecordInstance(LocalResource.class);
+          next.setResource(
+              ConverterUtils.getYarnUrlFromPath(nextRsrc.getPath()));
+          next.setTimestamp(nextRsrc.getTimestamp());
+          next.setType(nextRsrc.getType());
+          next.setVisibility(evt.getVisibility());
+          scheduled.put(nextRsrc, evt);
+          return next;
+        }
+      }
+      return null;
+    }
+
+    // TODO this sucks. Fix it later
+    LocalizerHeartbeatResponse update(
+        List<LocalResourceStatus> stats) {
+      LocalizerHeartbeatResponse response =
+        recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
+
+      if (stats.isEmpty()) {
+        LocalResource next = findNextResource();
+        if (next != null) {
+          response.setLocalizerAction(LocalizerAction.LIVE);
+          response.addResource(next);
+        } else if (pending.isEmpty()) {
+          response.setLocalizerAction(LocalizerAction.DIE);
+        } else {
+          response.setLocalizerAction(LocalizerAction.LIVE);
+        }
+        return response;
+      }
+
+      for (LocalResourceStatus stat : stats) {
+        LocalResource rsrc = stat.getResource();
+        LocalResourceRequest req = null;
+        try {
+          req = new LocalResourceRequest(rsrc);
+        } catch (URISyntaxException e) {
+          // TODO fail? Already translated several times...
+        }
+        LocalizerResourceRequestEvent assoc = scheduled.get(req);
+        if (assoc == null) {
+          // internal error
+          LOG.error("Unknown resource reported: " + req);
+          continue;
+        }
+        switch (stat.getStatus()) {
+          case FETCH_SUCCESS:
+            // notify resource
+            try {
+              assoc.getResource().handle(
+                  new ResourceLocalizedEvent(req,
+                    ConverterUtils.getPathFromYarnURL(stat.getLocalPath()),
+                    stat.getLocalSize()));
+            } catch (URISyntaxException e) { }
+            if (pending.isEmpty()) {
+              response.setLocalizerAction(LocalizerAction.DIE);
+              break;
+            }
+            response.setLocalizerAction(LocalizerAction.LIVE);
+            LocalResource next = findNextResource();
+            if (next != null) {
+              response.addResource(next);
+            }
+            break;
+          case FETCH_PENDING:
+            response.setLocalizerAction(LocalizerAction.LIVE);
+            break;
+          case FETCH_FAILURE:
+            LOG.info("DEBUG: FAILED " + req, stat.getException());
+            assoc.getResource().unlock();
+            response.setLocalizerAction(LocalizerAction.DIE);
+            dispatcher.getEventHandler().handle(
+                new ContainerResourceFailedEvent(context.getContainer(),
+                  req, stat.getException()));
+            break;
+          default:
+            LOG.info("Unknown status: " + stat.getStatus());
+            response.setLocalizerAction(LocalizerAction.DIE);
+            dispatcher.getEventHandler().handle(
+                new ContainerResourceFailedEvent(context.getContainer(),
+                  req, stat.getException()));
+            break;
+        }
+      }
+      return response;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked") // dispatcher not typed
+    public void run() {
+      try {
+        // 0) init queue, etc.
+        // 1) write credentials to private dir
+        DataOutputStream tokenOut = null;
+        try {
+          Credentials credentials = context.getCredentials();
+          Path cTokens = new Path(nmPrivate, String.format(
+                ContainerLocalizer.TOKEN_FILE_FMT, localizerId));
+          FileContext lfs = getLocalFileContext(getConfig());
+          tokenOut = lfs.create(cTokens, EnumSet.of(CREATE, OVERWRITE));
+          LOG.info("Writing credentials to " + cTokens.toString() +
+                   ". Credentials list: ");
+          for (Token<? extends TokenIdentifier> tk :
+              credentials.getAllTokens()) {
+            LOG.info(tk.getService() + " : " + tk.encodeToUrlString());
+          }
+          credentials.writeTokenStorageToStream(tokenOut);
+        } finally {
+          if (tokenOut != null) {
+            tokenOut.close();
+          }
+        }
+        // 2) exec initApplication and wait
+        exec.startLocalizer(nmPrivate, locAddr, context.getUser(),
+            ConverterUtils.toString(context.getContainer().getAppId()),
+            localizerId, logDir, localDirs);
+      } catch (Exception e) {
+        // 3) on error, report failure to Container and signal ABORT
+        // 3.1) notify resource of failed localization
+        for (LocalizerResourceRequestEvent event : scheduled.values()) {
+          event.getResource().unlock();
+        }
+        //dispatcher.getEventHandler().handle(
+        //    new ContainerResourceFailedEvent(current.getContainer(),
+        //      current.getResource().getRequest(), e));
+      }
+    }
+
+  }
 }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceState.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+enum ResourceState {
+  INIT,
+  DOWNLOADING,
+  LOCALIZED
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizationEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+
+public class ApplicationLocalizationEvent extends LocalizationEvent {
+
+  final Application app;
+
+  public ApplicationLocalizationEvent(LocalizationEventType type, Application app) {
+    super(type);
+    this.app = app;
+  }
+
+  public Application getApplication() {
+    return app;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,35 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+public class ContainerLocalizationEvent extends LocalizationEvent {
+
+  final Container container;
+
+  public ContainerLocalizationEvent(LocalizationEventType event, Container c) {
+    super(event);
+    this.container = c;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+public class ContainerLocalizationRequestEvent extends ContainerLocalizationEvent {
+
+  private final LocalResourceVisibility vis;
+  private final Collection<LocalResourceRequest> reqs;
+
+  public ContainerLocalizationRequestEvent(Container c, Collection<LocalResourceRequest> reqs,
+      LocalResourceVisibility vis) {
+    super(LocalizationEventType.INIT_CONTAINER_RESOURCES, c);
+    this.vis = vis;
+    this.reqs = reqs;
+  }
+
+  public LocalResourceVisibility getVisibility() {
+    return vis;
+  }
+
+  public Collection<LocalResourceRequest> getRequestedResources() {
+    return reqs;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+
+public abstract class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
+
+  public LocalizationEvent(LocalizationEventType event) {
+    super(event);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+public enum LocalizationEventType {
+  INIT_APPLICATION_RESOURCES,
+  INIT_CONTAINER_RESOURCES,
+  CLEANUP_CONTAINER_RESOURCES,
+  DESTROY_APPLICATION_RESOURCES,
+}



Mime
View raw message