hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [30/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,24 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+public enum ContainersLauncherEventType {
+  LAUNCH_CONTAINER,
+  CLEANUP_CONTAINER, // The process(grp) itself.
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunner.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunner.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunner.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.Path;
+
+public interface AppLocalizationRunner extends Runnable {
+
+  // TODO: A future? wierd.
+  void localizedResource(Future<Map<LocalResource,Path>> result);
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunnerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunnerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunnerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/AppLocalizationRunnerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,229 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.PRIVATE;
+import static org.apache.hadoop.yarn.LocalResourceVisibility.PUBLIC;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitedEvent;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+
+/**
+ * A thread that takes care of localization of a single application. This is
+ * used by {@link ResourceLocalizationService} to start one thread per app.
+ * 
+ */
+public class AppLocalizationRunnerImpl implements AppLocalizationRunner {
+
+  private static final Log LOG =
+    LogFactory.getLog(AppLocalizationRunnerImpl.class);
+
+  final Path sysDir;
+  final Path logDir;
+  final FileContext lfs;
+  final List<Path> localDirs;
+  final Application app;
+  final Dispatcher dispatcher;
+  final ContainerExecutor exec;
+  final LocalizationProtocol localization;
+  final LocalResourcesTracker publicRsrcsTracker;
+  final LocalResourcesTracker privateRsrsTracker;
+  final BlockingQueue<Future<Map<LocalResource,Path>>> localizedResources;
+
+  public AppLocalizationRunnerImpl(Dispatcher dispatcher, Application app,
+      LocalizationProtocol localization,
+      LocalResourcesTracker publicResources,
+      LocalResourcesTracker privateResources, ContainerExecutor exec,
+      Path logDir,
+      List<Path> localDirs, Path sysDir) throws IOException {
+    // TODO
+    this(dispatcher, app, localization, publicResources, privateResources, exec,
+        logDir, localDirs, sysDir, FileContext.getLocalFSFileContext(),
+        new LinkedBlockingQueue<Future<Map<LocalResource,Path>>>());
+  }
+
+  AppLocalizationRunnerImpl(Dispatcher dispatcher, Application app,
+      LocalizationProtocol localization,
+      LocalResourcesTracker publicRsrcsTracker,
+      LocalResourcesTracker privateRsrcsTracker, ContainerExecutor exec,
+      Path logDir, List<Path> localDirs, Path sysDir, FileContext lfs,
+      BlockingQueue<Future<Map<LocalResource, Path>>> localizedResources) {
+    this.app = app;
+    this.lfs = lfs;
+    this.exec = exec;
+    this.logDir = logDir;
+    this.sysDir = sysDir;
+    this.localDirs = localDirs;
+    this.dispatcher = dispatcher;
+    this.localization = localization;
+    this.publicRsrcsTracker = publicRsrcsTracker;
+    this.privateRsrsTracker = privateRsrcsTracker;
+    this.localizedResources = localizedResources;
+    LOG.info("Initializing " + app + " for " + app.getUser());
+  }
+
+  public void localizedResource(Future<Map<LocalResource,Path>> result) {
+    localizedResources.offer(result);
+  }
+
+  private Map<LocalResource,String> invertMap(
+        Map<String,org.apache.hadoop.yarn.LocalResource> yrsrc)
+      throws URISyntaxException {
+    Map<LocalResource,String> ret = new HashMap<LocalResource,String>();
+    for (Map.Entry<String,org.apache.hadoop.yarn.LocalResource> y : yrsrc.entrySet()) {
+      ret.put(new LocalResource(y.getValue()), y.getKey());
+    }
+    return ret;
+  }
+
+  void abort() {
+    // TODO
+  }
+
+  private void writeApplicationLocalizationControlFiles(
+      Collection<org.apache.hadoop.yarn.LocalResource> todo) throws IOException {
+    DataOutputStream filesOut = null;
+    DataOutputStream tokenOut = null;
+    try {
+      lfs.mkdir(sysDir, null, false);
+      Path appFiles = new Path(sysDir, ApplicationLocalizer.FILECACHE_FILE);
+      filesOut = lfs.create(appFiles, EnumSet.of(CREATE, OVERWRITE));
+      ApplicationLocalizer.writeResourceDescription(filesOut, todo);
+      Path appTokens = new Path(sysDir, ApplicationLocalizer.APPTOKEN_FILE);
+      tokenOut = lfs.create(appTokens, EnumSet.of(CREATE, OVERWRITE));
+      Credentials appCreds = app.getCredentials();
+      LOG.info("Writing credentials again. Credentials list: ");
+      for (Token<? extends TokenIdentifier> tk : appCreds.getAllTokens()) {
+        LOG.info(tk.getService() + " : " + tk.encodeToUrlString());
+      }
+      appCreds.writeTokenStorageToStream(tokenOut);
+    } finally {
+      IOUtils.cleanup(null, filesOut, tokenOut);
+    }
+  }
+
+  @Override
+  public void run() {
+    // 0) queue public cache
+    Map<String,org.apache.hadoop.yarn.LocalResource> pub = app.getResources(PUBLIC);
+    // 1) wait for completion
+    // 1.1) if any failures, do *not* cancel remaining (other jobs may req) but
+    //      decr reference counts
+    // 3) queue private cache
+    // XXX no need to copy w/ working public cache
+    Map<String,org.apache.hadoop.yarn.LocalResource> priv =
+      new HashMap(app.getResources(PRIVATE));
+
+    Path workdir = null;
+    Map<Path,String> links = new HashMap<Path,String>();
+    // TODO avoid sync and wait for all rsrc. This is a high impacting bug.
+    synchronized (privateRsrsTracker) {
+    Collection<org.apache.hadoop.yarn.LocalResource> todo;
+    try {
+      // TODO public rsrc separate
+      priv.putAll(pub);
+      todo = privateRsrsTracker.register(this, priv.values());
+    } catch (URISyntaxException e) {
+      // TODO cancel application
+      LOG.warn("Bad resource", e);
+      return;
+    }
+    try {
+
+      // TODO: launch process only if todo is non-empty?
+
+      writeApplicationLocalizationControlFiles(todo);
+      // 4) start applicationInit via ContainerExecutor
+      // 4.1) applicationInit incl application-specific resources, e.g. job.xml
+      // TODO
+      try {
+        // TODO token location should not rely on this convention
+        workdir = new Path(localDirs.get(0),
+            new Path(ApplicationLocalizer.USERCACHE,
+              new Path(app.getUser(),
+                new Path(ApplicationLocalizer.APPCACHE, app.toString()))));
+        exec.initApplication(sysDir, localization, app.getUser(),
+            app.toString(), logDir, localDirs);
+        //links.put(new Path(workdir, ApplicationLocalizer.JOBTOKEN_FILE),
+        //          ApplicationLocalizer.JOBTOKEN_FILE);
+      } catch (IOException e) {
+        // TODO cleanup
+        LOG.warn("Failed to init application resources", e);
+      } catch (InterruptedException e) {
+        // TODO cleanup
+        LOG.warn("Failed to init application resources", e);
+      }
+      // 5) Compile rsrc resolution for this job
+      Map<LocalResource,String> requested;
+      requested = invertMap(priv);
+      //for (Future<Map<LocalResource,Path>> rsrc = localizedResources.poll();
+      //     rsrc != null; rsrc = localizedResources.poll()) {
+      for (int i = 0; i < priv.size(); ++i) {
+        Future<Map<LocalResource,Path>> rsrc = localizedResources.poll();
+          if (rsrc != null) {
+            // TODO: Happens if no priv resources and only app resources?
+            Map<LocalResource, Path> resolved = rsrc.get();
+            for (Map.Entry<LocalResource, Path> r : resolved.entrySet()) {
+              links.put(r.getValue(), requested.get(r.getKey()));
+            }
+          }
+      }
+    } catch (Throwable e) {
+      // notify/kill containers
+      LOG.info("Initialization of " + app + " failed: ", e);
+      dispatcher.getEventHandler().handle(new ApplicationEvent(
+            app.getAppId(), ApplicationEventType.FINISH_APPLICATION));
+      return;
+    }
+    }
+    LOG.info("Initialization of " + app + " complete");
+    // 6) signal completion of Application init to Application, which will
+    //    signal any waiting containers to start
+    dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
+          app.getAppId(), links, workdir));
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ApplicationLocalizer.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,516 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+import org.apache.hadoop.yarn.YarnRemoteException;
+
+/**
+ * Internal class responsible for initializing the job, not intended for users.
+ * Creates the following hierarchy:
+ *   <li>$local.dir/usercache/$user</li>
+ *   <li>$local.dir/usercache/$user/appcache</li>
+ *   <li>$local.dir/usercache/$user/appcache/$appId/work</li>
+ *   <li>$local.dir/usercache/$user/appcache/$appId/filecache</li>
+ *   <li>$local.dir/usercache/$user/appcache/$appId/appToken</li>
+ *   <li>$local.dir/usercache/$user/appcache/$appId/appFiles</li>
+ *   <li>$local.dir/usercache/$user/filecache</li>
+ */
+public class ApplicationLocalizer {
+
+  static final Log LOG = LogFactory.getLog(ApplicationLocalizer.class);
+
+  public static final String FILECACHE = "filecache";
+  public static final String FILECACHE_FILE = "appFiles";
+  public static final String APPCACHE = "appcache";
+  public static final String USERCACHE = "usercache";
+  public static final String APPTOKEN_FILE = "appTokens";
+  public static final String WORKDIR = "work";
+
+  private final String user;
+  private final String appId;
+  private final Path logDir;
+  private final FileContext lfs;
+  private final Configuration conf;
+  private final List<Path> localDirs;
+  private final LocalDirAllocator lDirAlloc;
+  private final List<org.apache.hadoop.yarn.LocalResource> privateResources;
+  private final List<org.apache.hadoop.yarn.LocalResource> applicationResources;
+
+  public ApplicationLocalizer(String user, String appId, Path logDir,
+      List<Path> localDirs) throws IOException {
+    this(FileContext.getLocalFSFileContext(), user, appId, logDir, localDirs);
+  }
+
+  public ApplicationLocalizer(FileContext lfs, String user, String appId,
+      Path logDir, List<Path> localDirs) throws IOException {
+    if (null == user) {
+      throw new IOException("Cannot initialize for null user");
+    }
+    if (null == appId) {
+      throw new IOException("Cannot initialize for null appId");
+    }
+    this.user = user;
+    this.appId = appId;
+    this.logDir = logDir;
+    this.lfs = lfs;
+    // TODO fix bug in FileContext requiring Configuration for local fs
+    this.conf = new Configuration();
+    this.localDirs = setLocalDirs(user, conf, localDirs);
+    lDirAlloc = new LocalDirAllocator(NM_LOCAL_DIR);
+    privateResources = new ArrayList<LocalResource>();
+    applicationResources = new ArrayList<LocalResource>();
+  }
+
+  public static void writeLaunchEnv(OutputStream out,
+      Map<CharSequence,CharSequence> environment, Map<Path,String> resources,
+      List<CharSequence> command, List<Path> appDirs)
+      throws IOException {
+    ShellScriptBuilder sb = new ShellScriptBuilder();
+    if (System.getenv("YARN_HOME") != null) {
+      sb.env("YARN_HOME", System.getenv("YARN_HOME"));
+    }
+    sb.env(YARNApplicationConstants.LOCAL_DIR_ENV,
+        StringUtils.join(",", appDirs));
+    if (environment != null) {
+      for (Map.Entry<CharSequence,CharSequence> env : environment.entrySet()) {
+        sb.env(env.getKey().toString(), env.getValue().toString());
+      }
+    }
+    if (resources != null) {
+      for (Map.Entry<Path,String> link : resources.entrySet()) {
+        sb.symlink(link.getKey(), link.getValue());
+      }
+    }
+    ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
+    cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
+    cmd.add("/bin/bash ");
+    cmd.add("-c ");
+    cmd.add("\"");
+    for (CharSequence cs : command) {
+      cmd.add(cs.toString());
+      cmd.add(" ");
+    }
+    cmd.add("\"");
+    sb.line(cmd.toArray(new String[cmd.size()]));
+    PrintStream pout = null;
+    try {
+      pout = new PrintStream(out);
+      sb.write(pout);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  static void writeResourceDescription(OutputStream out,
+      Collection<LocalResource> rsrc) throws IOException {
+    try {
+      BinaryEncoder encoder = new BinaryEncoder(out);
+      SpecificDatumWriter writer = new SpecificDatumWriter(LocalResource.class);
+      for (LocalResource r : rsrc) {
+        writer.write(r, encoder);
+      }
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  private void readResourceDescription(InputStream in) throws IOException {
+    BinaryDecoder decoder =
+      DecoderFactory.defaultFactory().createBinaryDecoder(in, null);
+    SpecificDatumReader<LocalResource> reader =
+      new SpecificDatumReader<LocalResource>(LocalResource.class);
+    while (!decoder.isEnd()) {
+      LocalResource rsrc = reader.read(null, decoder);
+      switch (rsrc.state) {
+        case PRIVATE:
+          privateResources.add(rsrc);
+          break;
+        // TODO: Commented to put everything in privateResources for now?
+        //case APPLICATION:
+        //  applicationResources.add(rsrc);
+        //  break;
+        default:
+          privateResources.add(rsrc);
+          break;
+      }
+    }
+  }
+
+  private static List<Path> setLocalDirs(String user, Configuration conf,
+      List<Path> localdirs) throws IOException {
+    if (null == localdirs || 0 == localdirs.size()) {
+      throw new IOException("Cannot initialize without local dirs");
+    }
+    String[] sLocaldirs = new String[localdirs.size()];
+    final List<Path> ret = new ArrayList<Path>(sLocaldirs.length);
+    for (int i = 0; i < sLocaldirs.length; ++i) {
+      Path p = new Path(localdirs.get(i), new Path(USERCACHE, user));
+      ret.add(p);
+      sLocaldirs[i] = p.toString();
+    }
+    conf.setStrings(NM_LOCAL_DIR, sLocaldirs);
+    return ret;
+  }
+
+  private String[] getSubdirs(String subdir) {
+    String[] cacheDirs = new String[localDirs.size()];
+    for (int i = 0, n = localDirs.size(); i < n; ++i) {
+      Path cacheDir = new Path(localDirs.get(i), subdir);
+      cacheDirs[i] = cacheDir.toString();
+    }
+    return cacheDirs;
+  }
+
+  FSDownload download(LocalDirAllocator lda, LocalResource rsrc)
+      throws IOException {
+    return new FSDownload(conf, lda, rsrc);
+  }
+
+  private void localizePrivateFiles(final LocalizationProtocol nodeManager)
+      throws IOException, InterruptedException, YarnRemoteException {
+    // setup the private distributed cache
+    String cacheContext = appId + ".private.cache";
+    conf.setStrings(cacheContext, getSubdirs(FILECACHE));
+    // TODO: Why different context but same local-dirs?
+    LocalDirAllocator privateLDA = new LocalDirAllocator(cacheContext);
+    pull(privateLDA, privateResources, nodeManager);
+  }
+
+  private void localizeAppFiles(final LocalizationProtocol nodeManager)
+      throws IOException, InterruptedException, YarnRemoteException {
+    // TODO localize application-scope files, e.g. job.xml, job.jar
+    String cacheContext = appId + ".cache";
+    //Configuration cacheConf = new Configuration(false);
+    conf.setStrings(cacheContext, getSubdirs(FILECACHE));
+    // TODO: Why different context but same local-dirs?
+    LocalDirAllocator applicationLDA = new LocalDirAllocator(cacheContext);
+    pull(applicationLDA, applicationResources, nodeManager);
+  }
+
+  private void pull(LocalDirAllocator lda, Collection<LocalResource> resources,
+      LocalizationProtocol nodeManager)
+      throws IOException, InterruptedException, YarnRemoteException {
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    CompletionService<Map<LocalResource,Path>> queue =
+      new ExecutorCompletionService<Map<LocalResource,Path>>(exec);
+    Map<Future<Map<LocalResource,Path>>, LocalResource> pending =
+      new HashMap<Future<Map<LocalResource,Path>>, LocalResource>();
+    for (LocalResource rsrc : resources) {
+      FSDownload dThread = download(lda, rsrc);
+      pending.put(queue.submit(dThread), rsrc);
+    }
+    try {
+      for (int i = 0, n = resources.size(); i < n; ++i) {
+        Future<Map<LocalResource,Path>> result = queue.take();
+        try {
+          Map<LocalResource,Path> localized = result.get();
+          for (Map.Entry<LocalResource,Path> local : result.get().entrySet()) {
+            nodeManager.successfulLocalization(user, local.getKey(),
+                AvroUtil.getYarnUrlFromPath(local.getValue()));
+            pending.remove(result);
+          }
+        } catch (ExecutionException e) {
+          // TODO: Shouldn't we continue localizing other paths?
+          nodeManager.failedLocalization(
+              user, pending.get(result),
+              RPCUtil.getRemoteException(e.getCause()));
+          throw new IOException("Failed to localize " +
+                                pending.get(result), e);
+        }
+      }
+    } finally {
+      YarnRemoteException e = RPCUtil.getRemoteException("Localization failed");
+      exec.shutdownNow();
+      for (LocalResource rsrc : pending.values()) {
+        try {
+          nodeManager.failedLocalization(
+              user, rsrc, RPCUtil.getRemoteException(e));
+        } catch (YarnRemoteException error) {
+          LOG.error("Failure cancelling localization", error);
+        }
+      }
+    }
+  }
+
+  private void localizeFiles(LocalizationProtocol nodeManager)
+      throws IOException, InterruptedException { 
+    // load user credentials, configuration
+    // ASSUME
+    // let $x = $local.dir
+    // forall $x, exists $x/$user
+    // exists $x/$user/appcache/$appId/appFiles
+    // exists $x/$user/appcache/$appId/appToken
+    // exists $logdir/userlogs/$appId
+    // TODO verify LTC
+    //createUserCacheDirs()
+    //createAppDirs()
+    //createAppLogDir();
+    InputStream in = null;
+    try {
+      in = lfs.open(new Path(FILECACHE_FILE));
+      readResourceDescription(in);
+    } finally {
+      if (in != null) {
+        in.close();
+      }
+    }
+    localizePrivateFiles(nodeManager);
+    localizeAppFiles(nodeManager);
+  }
+
+  LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
+//    // TODO: Fix the following
+//    UserGroupInformation remoteUser =
+//        UserGroupInformation.createRemoteUser(user);
+//    Token<LocalizerTokenIdentifier> token =
+//        new Token<LocalizerTokenIdentifier>();
+//    remoteUser.addToken(token);
+//    return remoteUser.doAs(new PrivilegedAction<Localization>() {
+//      @Override
+//      public Localization run() {
+        Configuration localizerConf = new Configuration();
+        YarnRPC rpc = YarnRPC.create(localizerConf);
+        if (UserGroupInformation.isSecurityEnabled()) {
+          localizerConf.setClass(
+              CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+              LocalizerSecurityInfo.class, SecurityInfo.class);
+        }
+        return (LocalizationProtocol)
+          rpc.getProxy(LocalizationProtocol.class, nmAddr, localizerConf);
+//      }
+//    });
+  }
+
+  public int runLocalization(final InetSocketAddress nmAddr)
+      throws IOException, InterruptedException {
+    // Pull in user's tokens to complete setup
+    final Credentials creds = new Credentials();
+    DataInputStream credFile = null;
+    try {
+      // assume credentials in cwd
+      credFile = lfs.open(new Path(APPTOKEN_FILE));
+      creds.readTokenStorageStream(credFile);
+    } finally  {
+      if (credFile != null) {
+        credFile.close();
+      }
+    }
+
+//    UserGroupInformation ugiJob =
+//        UserGroupInformation.createRemoteUser(localizer.getAppId());
+//    // TODO
+//    //Token<JobTokenIdentifier> jt = TokenCache.getJobToken(creds);
+//    //jt.setService(new Text(
+//    //      nmAddr.getAddress().getHostAddress() + ":" + nmAddr.getPort()));
+//    //ugiJob.addToken(jt);
+    UserGroupInformation remoteUser =
+        UserGroupInformation.createRemoteUser(user);
+    LocalizerTokenSecretManager secretManager =
+        new LocalizerTokenSecretManager();
+    LocalizerTokenIdentifier id =
+        secretManager.createIdentifier();
+    Token<LocalizerTokenIdentifier> localizerToken =
+        new Token<LocalizerTokenIdentifier>(id, secretManager);
+//        new Token<LocalizerTokenIdentifier>("testuser".getBytes(),
+//            new byte[0], LocalizerTokenIdentifier.KIND, new Text("testing"));
+    remoteUser.addToken(localizerToken);
+    final LocalizationProtocol nodeManager =
+        remoteUser.doAs(new PrivilegedAction<LocalizationProtocol>() {
+          @Override
+          public LocalizationProtocol run() {
+            return getProxy(nmAddr);
+          }
+        });
+
+    UserGroupInformation ugi =
+      UserGroupInformation.createRemoteUser(user);
+    for (Token<? extends TokenIdentifier> token : creds.getAllTokens()) {
+      ugi.addToken(token);
+    }
+
+    return ugi.doAs(new PrivilegedExceptionAction<Integer>() {
+      public Integer run() {
+        try {
+          localizeFiles(nodeManager);
+          return 0;
+        } catch (Throwable e) {
+          e.printStackTrace(System.out);
+          return -1;
+        }
+      }
+    });
+  }
+
+  public static void main(String[] argv) throws Throwable {
+    // usage: $0 user appId host port log_dir user_dir [user_dir]
+    // let $x = cwd for $local.dir
+    // VERIFY $user_dir, log_dir exists, owned by the user w/ correct perms
+    // VERIFY $logdir exists, owned by user w/ correct perms
+    // MKDIR $x/$user/appcache
+    // MKDIR $x/$user/filecache
+    // MKDIR $x/$user/appcache/$appid
+    // MKDIR $x/$user/appcache/$appid/output
+    // MKDIR $x/$user/appcache/$appid/filecache
+    // LOAD $x/$user/appcache/$appid/appTokens
+    // LOAD $x/$user/appcache/$appid/appFiles
+    // FOREACH file : files.PRIVATE
+    // FETCH to $x/$user/filecache
+    // FOREACH file : files.JOB
+    // FETCH to $x/$user/$appid/filecache
+    // WRITE $x/$user/appcache/$appid/privateEnv.sh
+    try {
+      String user = argv[0];
+      String appId = argv[1];
+      InetSocketAddress nmAddr =
+          new InetSocketAddress(argv[2], Integer.parseInt(argv[3]));
+      Path logDir = new Path(argv[4]);
+      String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
+      ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
+      for (String sLocaldir : sLocaldirs) {
+        localDirs.add(new Path(sLocaldir));
+      }
+
+      final String uid =
+          UserGroupInformation.getCurrentUser().getShortUserName();
+      if (!user.equals(uid)) {
+        LOG.warn("Localization running as " + uid + " not " + user); // TODO: throw exception.
+      }
+
+      ApplicationLocalizer localizer =
+          new ApplicationLocalizer(user, appId, logDir, localDirs);
+      System.exit(localizer.runLocalization(nmAddr));
+    } catch (Throwable e) {
+      // Print error to stdout so that LCE can use it.
+      e.printStackTrace(System.out);
+      // TODO: Above. Or set log4j props.
+      throw e;
+    }
+  }
+
+  private static class ShellScriptBuilder {
+
+    private final StringBuilder sb;
+
+    public ShellScriptBuilder() {
+      this(new StringBuilder("#!/bin/bash\n\n"));
+    }
+
+    protected ShellScriptBuilder(StringBuilder sb) {
+      this.sb = sb;
+    }
+
+    public ShellScriptBuilder env(String key, String value) {
+      line("export ", key, "=\"", value, "\"");
+      return this;
+    }
+
+    public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
+      return symlink(src, new Path(dst));
+    }
+
+    public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
+      if (!src.isAbsolute()) {
+        throw new IOException("Source must be absolute");
+      }
+      if (dst.isAbsolute()) {
+        throw new IOException("Destination must be relative");
+      }
+      if (dst.toUri().getPath().indexOf('/') != -1) {
+        line("mkdir -p ", dst.getParent().toString());
+      }
+      line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
+      return this;
+    }
+
+    public void write(PrintStream out) throws IOException {
+      out.append(sb);
+    }
+
+    public void line(CharSequence... command) {
+      for (CharSequence s : command) {
+        sb.append(s);
+      }
+      sb.append("\n");
+    }
+
+    @Override
+    public String toString() {
+      return sb.toString();
+    }
+
+  }
+
+}
+

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.RunJar;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import static org.apache.hadoop.fs.Options.*;
+
+import org.apache.hadoop.yarn.LocalResource;
+
+/**
+ * Download a single URL to the local disk.
+ *
+ */
+public class FSDownload implements Callable<Map<LocalResource,Path>> {
+
+  private static final Log LOG = LogFactory.getLog(FSDownload.class);
+
+  private Random rand;
+  private FileContext files;
+  private Configuration conf;
+  private LocalResource resource;
+  private LocalDirAllocator dirs;
+  private FsPermission cachePerms = new FsPermission((short) 0755);
+
+  public FSDownload(Configuration conf, LocalDirAllocator dirs,
+      LocalResource resource) throws IOException {
+    this(FileContext.getLocalFSFileContext(conf), conf, dirs, resource,
+        new Random());
+  }
+
+  FSDownload(FileContext files, Configuration conf, LocalDirAllocator dirs,
+      LocalResource resource, Random rand) {
+    this.conf = conf;
+    this.dirs = dirs;
+    this.files = files;
+    this.resource = resource;
+    this.rand = rand;
+  }
+
+  LocalResource getResource() {
+    return resource;
+  }
+
+  private Path copy(Path sCopy, Path dstdir) throws IOException {
+    Path dCopy = new Path(dstdir, sCopy.getName() + ".tmp");
+    FileStatus sStat = files.getFileStatus(sCopy);
+    if (sStat.getModificationTime() != resource.timestamp) {
+      throw new IOException("Resource " + sCopy +
+          " changed on src filesystem (expected " + resource.timestamp +
+          ", was " + sStat.getModificationTime());
+    }
+    files.util().copy(sCopy, dCopy);
+    return dCopy;
+  }
+
+  private long unpack(File localrsrc, File dst) throws IOException {
+    File destDir = new File(localrsrc.getParent());
+    switch (resource.type) {
+    case ARCHIVE:
+      String lowerDst = dst.getName().toLowerCase();
+      if (lowerDst.endsWith(".jar")) {
+        RunJar.unJar(localrsrc, dst);
+      } else if (lowerDst.endsWith(".zip")) {
+        FileUtil.unZip(localrsrc, dst);
+      } else if (lowerDst.endsWith(".tar.gz") ||
+                 lowerDst.endsWith(".tgz") ||
+                 lowerDst.endsWith(".tar")) {
+        FileUtil.unTar(localrsrc, dst);
+      } else {
+        LOG.warn("Cannot unpack " + localrsrc);
+        localrsrc.renameTo(dst);
+      }
+      break;
+    case FILE:
+    default:
+      localrsrc.renameTo(dst);
+      break;
+    }
+    return FileUtil.getDU(destDir);
+  }
+
+  @Override
+  public Map<LocalResource,Path> call() throws IOException {
+    Path sCopy;
+    try {
+      sCopy = AvroUtil.getPathFromYarnURL(resource.resource);
+    } catch (URISyntaxException e) {
+      throw new IOException("Invalid resource", e);
+    }
+
+    Path tmp;
+    Path dst =
+        dirs.getLocalPathForWrite(".", getEstimatedSize(resource),
+            conf);
+    do {
+      tmp = new Path(dst, String.valueOf(rand.nextLong()));
+    } while (files.util().exists(tmp));
+    dst = tmp;
+    files.mkdir(dst, cachePerms, false);
+    Path dst_work = new Path(dst + "_tmp");
+    files.mkdir(dst_work, cachePerms, false);
+
+    Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName()));
+    try {
+      Path dTmp = files.makeQualified(copy(sCopy, dst_work));
+      resource.size =
+        unpack(new File(dTmp.toUri()), new File(dFinal.toUri()));
+      files.rename(dst_work, dst, Rename.OVERWRITE);
+    } catch (IOException e) {
+      try { files.delete(dst, true); } catch (IOException ignore) { }
+      throw e;
+    } finally {
+      try {
+        files.delete(dst_work, true);
+      } catch (FileNotFoundException ignore) { }
+      // clear ref to internal var
+      rand = null;
+      conf = null;
+      //resource = null; TODO change downstream to not req
+      dirs = null;
+      cachePerms = null;
+    }
+    return Collections.singletonMap(resource,
+        files.makeQualified(new Path(dst, sCopy.getName())));
+  }
+
+  private static long getEstimatedSize(LocalResource rsrc) {
+    if (rsrc.size < 0) {
+      return -1;
+    }
+    switch (rsrc.type) {
+      case ARCHIVE:
+        return 5 * rsrc.size;
+      case FILE:
+      default:
+        return rsrc.size;
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResource.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,107 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import org.apache.hadoop.yarn.LocalResourceType;
+
+/**
+ * A comparable {@link org.apache.hadoop.yarn.LocalResource}.
+ * 
+ */
+class LocalResource implements Comparable<LocalResource> {
+
+  private final Path loc;
+  private final long timestamp;
+  private final LocalResourceType type;
+
+  /**
+   * Convert yarn.LocalResource into a localizer.LocalResource.
+   * @param resource
+   * @throws URISyntaxException
+   */
+  public LocalResource(org.apache.hadoop.yarn.LocalResource resource)
+      throws URISyntaxException {
+    this.loc = AvroUtil.getPathFromYarnURL(resource.resource);
+    this.timestamp = resource.timestamp;
+    this.type = resource.type;
+  }
+
+  @Override
+  public int hashCode() {
+    return loc.hashCode() ^
+      (int)((timestamp >>> 1) ^ timestamp) *
+      type.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof LocalResource)) {
+      return false;
+    }
+    final LocalResource other = (LocalResource) o;
+    return loc.equals(other.loc) &&
+           timestamp == other.timestamp &&
+           type == other.type;
+  }
+
+  @Override
+  public int compareTo(LocalResource other) {
+    if (this == other) {
+      return 0;
+    }
+    int ret = loc.compareTo(other.loc);
+    if (0 == ret) {
+      ret = (int)(timestamp - other.timestamp);
+      if (0 == ret) {
+        ret = type.ordinal() - other.type.ordinal();
+      }
+    }
+    return ret;
+  }
+
+  public Path getPath() {
+    return loc;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public LocalResourceType getType() {
+    return type;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{ ");
+    sb.append(loc.toString()).append(", ");
+    sb.append(timestamp).append(", ");
+    sb.append(type).append(" }");
+    return sb.toString();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.net.URISyntaxException;
+import java.util.Collection;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnRemoteException;
+
+public interface LocalResourcesTracker {
+
+  Collection<org.apache.hadoop.yarn.LocalResource> register(
+      AppLocalizationRunnerImpl appLocalizationRunner,
+      Collection<org.apache.hadoop.yarn.LocalResource> values)
+      throws URISyntaxException;
+
+  void setSuccess(LocalResource localRsrc, long size, Path pathFromYarnURL)
+      throws IllegalArgumentException, InterruptedException;
+
+  void removeFailedResource(LocalResource localResource,
+      YarnRemoteException cause);
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,164 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnRemoteException;
+
+class LocalResourcesTrackerImpl implements LocalResourcesTracker {
+
+  static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
+
+  private final ConcurrentHashMap<LocalResource,LocalizedResource> localrsrc =
+    new ConcurrentHashMap<LocalResource,LocalizedResource>();
+
+  public void setSuccess(LocalResource lResource, long rsrcSize,
+      Path localPath) throws IllegalArgumentException, InterruptedException {
+    LocalizedResource rsrc = localrsrc.get(lResource);
+    if (null == rsrc) {
+      throw new IllegalArgumentException("Unknown: " + lResource.getPath());
+    }
+    rsrc.success(lResource, rsrcSize, localPath);
+  }
+
+  public void removeFailedResource(LocalResource lResource,
+      YarnRemoteException e) throws IllegalArgumentException {
+    LocalizedResource rsrc = localrsrc.get(lResource);
+    if (null == rsrc) {
+      throw new IllegalArgumentException("Unknown: " + lResource.getPath());
+    }
+    rsrc.failure(lResource, e);
+  }
+
+  protected long currentTime() {
+    return System.nanoTime();
+  }
+
+  // TODO replace w/ queue over RPC
+  /** @return Resources not present in this bundle */
+  public Collection<org.apache.hadoop.yarn.LocalResource> register(
+      AppLocalizationRunnerImpl app,
+      Collection<org.apache.hadoop.yarn.LocalResource> rsrcs)
+      throws URISyntaxException {
+    ArrayList<org.apache.hadoop.yarn.LocalResource> ret =
+      new ArrayList<org.apache.hadoop.yarn.LocalResource>(rsrcs.size());
+    for (final org.apache.hadoop.yarn.LocalResource yrsrc : rsrcs) {
+      final LocalizedResource cand =
+        new LocalizedResource(new Callable<Map<LocalResource,Path>>() {
+              @Override
+              public Map<LocalResource,Path> call() {
+                // Future complete from RPC callback
+                throw new UnsupportedOperationException();
+              }
+            });
+      final LocalResource rsrc = new LocalResource(yrsrc);
+      while (true) {
+        LocalizedResource actual = localrsrc.putIfAbsent(rsrc, cand);
+        if (null == actual) {
+          cand.request(app);
+          ret.add(yrsrc);
+          break;
+        }
+        // ensure resource localization not cancelled
+        if (actual.isCancelled()) {
+          if (!localrsrc.replace(rsrc, actual, cand)) {
+            // newer entry exists
+            continue;
+          } else {
+            cand.request(app);
+            ret.add(yrsrc);
+            break;
+          }
+        }
+        actual.request(app);
+        break;
+      }
+    }
+    return ret;
+  }
+
+  public void release(AppLocalizationRunnerImpl app, LocalResource[] resources) {
+    for (LocalResource rsrc : resources) {
+      LocalizedResource resource = localrsrc.get(rsrc);
+      if (resource != null) {
+        // XXX update timestamp to last-used?
+        resource.refCount.getAndDecrement();
+        resource.notifyQueue.remove(app);
+      }
+    }
+  }
+
+  /**
+   * Private inner datum tracking a resource that is already localized.
+   */
+  // TODO use AQS for download
+  private class LocalizedResource extends
+      FutureTask<Map<LocalResource, Path>> {
+    private final AtomicInteger refCount = new AtomicInteger(0);
+    private final AtomicLong timestamp = new AtomicLong(currentTime());
+    private final BlockingQueue<AppLocalizationRunnerImpl> notifyQueue =
+      new LinkedBlockingQueue<AppLocalizationRunnerImpl>();
+    // TODO: Why is it needed?
+    private volatile long size = -1;
+    LocalizedResource(Callable<Map<LocalResource,Path>> fetch) {
+      super(fetch);
+    }
+    void request(AppLocalizationRunnerImpl appRunner) {
+      refCount.getAndIncrement();
+      timestamp.set(currentTime());
+      notifyQueue.offer(appRunner);
+      if (isDone() && notifyQueue.remove(appRunner)) {
+        appRunner.localizedResource(this);
+      }
+    }
+    void success(LocalResource rsrc, long rsrcSize, Path p)
+        throws InterruptedException {
+      size = rsrcSize;
+      set(Collections.singletonMap(rsrc, p));
+    }
+    void failure(LocalResource r, YarnRemoteException e) {
+      // TODO: How do you inform the appRunner?
+      localrsrc.remove(r, this);
+      setException(e);
+    }
+    @Override
+    protected void done() {
+      for (AppLocalizationRunner appRunner = notifyQueue.poll(); appRunner != null;
+           appRunner = notifyQueue.poll()) {
+        appRunner.localizedResource(this);
+      }
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.LocalizationProtocol;
+import org.apache.hadoop.yarn.URL;
+import org.apache.hadoop.yarn.YarnRemoteException;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.*;
+
+public class ResourceLocalizationService extends AbstractService
+    implements EventHandler<LocalizerEvent>, LocalizationProtocol {
+
+  private static final Log LOG = LogFactory.getLog(ResourceLocalizationService.class);
+  public static final String NM_PRIVATE_DIR = "nmPrivate";
+  public final FsPermission NM_PRIVATE_PERM = new FsPermission((short) 0700);
+
+  private Server server;
+  private InetSocketAddress locAddr;
+  private List<Path> logDirs;
+  private List<Path> localDirs;
+  private List<Path> sysDirs;
+  private final ContainerExecutor exec;
+  protected final Dispatcher dispatcher;
+  private final DeletionService delService;
+  private final ExecutorService appLocalizerThreadPool =
+    Executors.newCachedThreadPool();
+
+  /**
+   * Map of private resources of users.
+   */
+  private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
+    new ConcurrentHashMap<String,LocalResourcesTracker>();
+
+  /**
+   * Map of applications that are in the process of localization.
+   * TODO: Why is it needed?
+   */
+  private final ConcurrentMap<String,AppLocalizationRunner> localizingApps =
+    new ConcurrentHashMap<String,AppLocalizationRunner>();
+
+  public ResourceLocalizationService(Dispatcher dispatcher,
+      ContainerExecutor exec, DeletionService delService) {
+    super("localizer");
+    this.exec = exec;
+    this.dispatcher = dispatcher;
+    this.delService = delService;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    // TODO limit for public cache, not appLocalizer
+    //appLocalizer.setMaximumPoolSize(
+    //    conf.getInt(NM_MAX_PUBLIC_FETCH_THREADS,
+    //                DEFAULT_MAX_PUBLIC_FETCH_THREADS));
+    try {
+      // TODO queue deletions here, rather than NM init?
+      FileContext lfs = FileContext.getLocalFSFileContext(conf);
+      String[] sLocalDirs = conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
+
+      localDirs = new ArrayList<Path>(sLocalDirs.length);
+      logDirs = new ArrayList<Path>(sLocalDirs.length);
+      sysDirs = new ArrayList<Path>(sLocalDirs.length);
+      for (String sLocaldir : sLocalDirs) {
+        Path localdir = new Path(sLocaldir);
+        localDirs.add(localdir);
+        // $local/usercache
+        Path userdir = new Path(localdir, ApplicationLocalizer.USERCACHE);
+        lfs.mkdir(userdir, null, true);
+        // $local/filecache
+        Path filedir = new Path(localdir, ApplicationLocalizer.FILECACHE);
+        lfs.mkdir(filedir, null, true);
+        // $local/nmPrivate
+        Path sysdir = new Path(localdir, NM_PRIVATE_DIR);
+        lfs.mkdir(sysdir, NM_PRIVATE_PERM, true);
+        sysDirs.add(sysdir);
+      }
+      String[] sLogdirs = conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
+      for (String sLogdir : sLogdirs) {
+        Path logdir = new Path(sLogdir);
+        logDirs.add(logdir);
+        lfs.mkdir(logdir, null, true);
+      }
+    } catch (IOException e) {
+      throw new YarnException("Failed to start Localizer", e);
+    }
+    localDirs = Collections.unmodifiableList(localDirs);
+    logDirs = Collections.unmodifiableList(logDirs);
+    sysDirs = Collections.unmodifiableList(sysDirs);
+    locAddr = NetUtils.createSocketAddr(
+        conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    Configuration conf = new Configuration(getConfig()); // Clone to separate
+                                                         // sec-info classes
+    LocalizerTokenSecretManager secretManager = null;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+          LocalizerSecurityInfo.class, SecurityInfo.class);
+      secretManager = new LocalizerTokenSecretManager();
+    }
+    server =
+        rpc.getServer(LocalizationProtocol.class, this, locAddr, conf,
+            secretManager);
+    LOG.info("Localizer started at " + locAddr);
+    server.start();
+    super.start();
+  }
+
+  /** LTC communication w/ Localizer */
+  public InetSocketAddress getAddress() {
+    return locAddr;
+  }
+
+  /**
+   * Localizer report to NodeManager of localized resource.
+   * @param user Owner of the private cache
+   * @param resource Resource localized
+   * @param path Location on the local filesystem, or null if failed
+   */
+  @Override
+  public Void successfulLocalization(CharSequence user,
+      org.apache.hadoop.yarn.LocalResource resource, URL path)
+      throws YarnRemoteException {
+    // TODO validate request
+    LocalResourcesTracker userCache = privateRsrc.get(user.toString());
+    if (null == userCache) {
+      throw RPCUtil.getRemoteException("Unknown user: " + user);
+    }
+    try {
+      userCache.setSuccess(new LocalResource(resource),
+          resource.size, AvroUtil.getPathFromYarnURL(path));
+    } catch (Exception e) {
+      throw RPCUtil.getRemoteException(e);
+    }
+    return null;
+  }
+
+  @Override
+  public Void failedLocalization(CharSequence user,
+      org.apache.hadoop.yarn.LocalResource resource, YarnRemoteException cause) 
+      throws YarnRemoteException {
+    LocalResourcesTracker userCache = privateRsrc.get(user.toString());
+    if (null == userCache) {
+      throw RPCUtil.getRemoteException("Unknown user: " + user);
+    }
+    try {
+      userCache.removeFailedResource(new LocalResource(resource), cause);
+    } catch (Exception e) {
+      throw RPCUtil.getRemoteException(e);
+    }
+    return null;
+  }
+
+  @Override
+  public void stop() {
+    appLocalizerThreadPool.shutdownNow();
+    if (server != null) {
+      server.close();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handle(LocalizerEvent event) {
+    switch (event.getType()) {
+    case INIT_APPLICATION_RESOURCES:
+      Application app = ((ApplicationLocalizerEvent)event).getApplication();
+      String user = app.getUser();
+      LocalResourcesTracker rsrcTracker = privateRsrc.get(user);
+      if (null == rsrcTracker) {
+        LocalResourcesTracker perUserPrivateRsrcTracker =
+            new LocalResourcesTrackerImpl();
+        rsrcTracker =
+            privateRsrc.putIfAbsent(user, perUserPrivateRsrcTracker);
+        if (null == rsrcTracker) {
+          rsrcTracker = perUserPrivateRsrcTracker;
+        }
+      }
+
+      // Possibility of duplicate app localization is avoided by ApplicationImpl
+      // itself, so we are good to go.
+
+      // TODO: use LDA for picking single logdir, sysDir
+      // TODO: create log dir as $logdir/$user/$appId
+      try {
+        AppLocalizationRunner appLocalizationRunner =
+          new AppLocalizationRunnerImpl(dispatcher,
+              app, this, null, rsrcTracker, exec, logDirs.get(0), localDirs,
+              new Path(sysDirs.get(0), app.toString()));
+        localizingApps.put(app.toString(), appLocalizationRunner);
+        appLocalizerThreadPool.submit(appLocalizationRunner);
+      } catch (IOException e) {
+        // TODO kill containers
+        LOG.info("Failed to submit application", e);
+        //dispatcher.getEventHandler().handle(
+      }
+      break;
+    case CLEANUP_CONTAINER_RESOURCES:
+      Container container = ((ContainerLocalizerEvent)event).getContainer();
+      // TODO: delete the container dir
+      dispatcher.getEventHandler().handle(new ContainerEvent(
+            container.getLaunchContext().id,
+            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+      break;
+    case DESTROY_APPLICATION_RESOURCES:
+      // decrement reference counts of all resources associated with this app
+      break;
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizerEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizerEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ApplicationLocalizerEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+
+public class ApplicationLocalizerEvent extends LocalizerEvent {
+
+  final Application app;
+
+  public ApplicationLocalizerEvent(LocalizerEventType type, Application app) {
+    super(type);
+    this.app = app;
+  }
+
+  public Application getApplication() {
+    return app;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizerEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizerEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizerEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+public class ContainerLocalizerEvent extends LocalizerEvent {
+
+  final Container container;
+
+  public ContainerLocalizerEvent(LocalizerEventType event, Container c) {
+    super(event);
+    this.container = c;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+
+public abstract class LocalizerEvent extends AbstractEvent<LocalizerEventType> {
+
+  public LocalizerEvent(LocalizerEventType event) {
+    super(event);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
+
+public enum LocalizerEventType {
+  INIT_APPLICATION_RESOURCES,
+  CLEANUP_CONTAINER_RESOURCES,
+  DESTROY_APPLICATION_RESOURCES,
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,53 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LocalizerSecurityInfo implements SecurityInfo {
+
+  @Override
+  public KerberosInfo getKerborosInfo(Class<?> protocol) {
+    return null;
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol) {
+    return new TokenInfo() {
+
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public Class<? extends TokenSelector<? extends TokenIdentifier>>
+          value() {
+        System.err.print("=========== Using localizerTokenSecurityInfo");
+        return LocalizerTokenSelector.class;
+      }
+    };
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenIdentifier.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenIdentifier.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenIdentifier.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+public class LocalizerTokenIdentifier extends TokenIdentifier {
+
+  public static Text KIND = new Text("Localizer");
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // TODO Auto-generated method stub
+    out.writeInt(1);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    // TODO Auto-generated method stub
+    in.readInt();
+  }
+
+  @Override
+  public Text getKind() {
+    // TODO Auto-generated method stub
+    return KIND;
+  }
+
+  @Override
+  public UserGroupInformation getUser() {
+    // TODO Auto-generated method stub
+    return UserGroupInformation.createRemoteUser("testing");
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,44 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security;
+
+import org.apache.hadoop.security.token.SecretManager;
+
+public class LocalizerTokenSecretManager extends
+    SecretManager<LocalizerTokenIdentifier> {
+
+  @Override
+  protected byte[] createPassword(LocalizerTokenIdentifier identifier) {
+    return "testing".getBytes();
+  }
+
+  @Override
+  public byte[] retrievePassword(LocalizerTokenIdentifier identifier)
+      throws org.apache.hadoop.security.token.SecretManager.InvalidToken {
+    // TODO Auto-generated method stub
+    return "testing".getBytes();
+  }
+
+  @Override
+  public LocalizerTokenIdentifier createIdentifier() {
+    // TODO Auto-generated method stub
+    return new LocalizerTokenIdentifier();
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,50 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class LocalizerTokenSelector implements
+    TokenSelector<LocalizerTokenIdentifier> {
+
+  @Override
+  public Token<LocalizerTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    System.err.print("=========== Using localizerTokenSelector");
+//    if (service == null) {
+//      return null;
+//    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      System.err.print("============ token of kind " + token.getKind() + " is found");
+      if (LocalizerTokenIdentifier.KIND.equals(token.getKind())
+          //&& service.equals(token.getService())
+          ) {
+        return (Token<LocalizerTokenIdentifier>) token;
+      }
+    }
+    System.err.print("returning null ========== ");
+    return null;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class ContainersMonitor extends AbstractService
+    implements EventHandler<ContainersMonitorEvent> {
+
+  public ContainersMonitor() {
+    super("containers-monitor");
+  }
+
+  @Override
+  public void handle(ContainersMonitorEvent monitorEvent) {
+    // TODO
+  }
+
+}



Mime
View raw message