hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r1132638 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ yarn/yarn-server/yarn-server-n...
Date Mon, 06 Jun 2011 13:44:07 GMT
Author: cdouglas
Date: Mon Jun  6 13:44:06 2011
New Revision: 1132638

URL: http://svn.apache.org/viewvc?rev=1132638&view=rev
Log:
Add deletion of distributed cache resources.

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon Jun  6 13:44:06 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
+
+    Add deletion of distributed cache resources. (cdouglas)
+
     Fix class cast exception in Task abort for old mapreduce apis. (sharad)
 
     Fix various issues with Web UI's. (Luke Lu)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Mon Jun  6 13:44:06 2011
@@ -92,6 +92,7 @@ public class ConverterUtils {
   }
 
   // TODO: Why thread local?
+  // ^ NumberFormat instances are not threadsafe
   private static final ThreadLocal<NumberFormat> appIdFormat =
     new ThreadLocal<NumberFormat>() {
       @Override
@@ -104,6 +105,7 @@ public class ConverterUtils {
     };
 
   // TODO: Why thread local?
+  // ^ NumberFormat instances are not threadsafe
   private static final ThreadLocal<NumberFormat> containerIdFormat =
       new ThreadLocal<NumberFormat>() {
         @Override
@@ -159,4 +161,4 @@ public class ConverterUtils {
     containerId.setId(Integer.parseInt(it.next()));
     return containerId;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Mon Jun  6 13:44:06 2011
@@ -89,4 +89,15 @@ public class NMConfig {
     NM_PREFIX + "container.manager.threads";
   
   public static final int DEFAULT_NM_CONTAINER_MGR_THREADS = 5;
+
+  public static final String NM_TARGET_CACHE_MB =
+    NM_PREFIX + "target.cache.size";
+
+  public static final long DEFAULT_NM_TARGET_CACHE_MB = 10 * 1024;
+
+  public static final String NM_CACHE_CLEANUP_MS =
+    NM_PREFIX + "target.cache.cleanup.period.ms";
+
+  public static final long DEFAULT_NM_CACHE_CLEANUP_MS = 10 * 60 * 1000;
+
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Mon Jun  6 13:44:06 2011
@@ -140,6 +140,9 @@ public class ApplicationImpl implements 
 
            // Transitions from APPLICATION_RESOURCES_CLEANINGUP state
            .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+               ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+               ApplicationEventType.APPLICATION_CONTAINER_FINISHED)
+           .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
                ApplicationState.FINISHED,
                ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
                new AppCompletelyDoneTransition())

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Mon Jun  6 13:44:06 2011
@@ -194,6 +194,10 @@ public class ContainerImpl implements Co
         ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new ContainerKilledTransition())
+    .addTransition(ContainerState.KILLING,
+        ContainerState.KILLING,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        new LocalizedResourceDuringKillTransition())
     .addTransition(ContainerState.KILLING, ContainerState.KILLING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
@@ -205,6 +209,10 @@ public class ContainerImpl implements Co
     .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new ExitedWithFailureTransition())
+    .addTransition(ContainerState.KILLING,
+            ContainerState.DONE,
+            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
+            CONTAINER_DONE_TRANSITION)
 
     // From CONTAINER_CLEANEDUP_AFTER_KILL State.
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -568,6 +576,24 @@ public class ContainerImpl implements Co
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed
+  static class LocalizedResourceDuringKillTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
+      String sym = container.pendingResources.remove(rsrcEvent.getResource());
+      if (null == sym) {
+        LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
+                 " for container " + container.getContainerID());
+        assert false;
+        // fail container?
+        return;
+      }
+      container.localizedResources.put(rsrcEvent.getLocation(), sym);
+    }
+  }
+
+  @SuppressWarnings("unchecked") // dispatcher not typed
   static class KillTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java Mon Jun  6 13:44:06 2011
@@ -41,9 +41,15 @@ public class LocalResourceRequest
    */
   public LocalResourceRequest(LocalResource resource)
       throws URISyntaxException {
-    this.loc = ConverterUtils.getPathFromYarnURL(resource.getResource());
-    this.timestamp = resource.getTimestamp();
-    this.type = resource.getType();
+    this(ConverterUtils.getPathFromYarnURL(resource.getResource()),
+        resource.getTimestamp(),
+        resource.getType());
+  }
+
+  LocalResourceRequest(Path loc, long timestamp, LocalResourceType type) {
+    this.loc = loc;
+    this.timestamp = timestamp;
+    this.type = type;
   }
 
   @Override

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Mon Jun  6 13:44:06 2011
@@ -20,15 +20,21 @@ package org.apache.hadoop.yarn.server.no
 
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
 /**
  * Component tracking resources all of the same {@link LocalResourceVisibility}
  * 
  */
-interface LocalResourcesTracker extends EventHandler<ResourceEvent> {
+interface LocalResourcesTracker
+    extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> {
 
   // TODO: Not used at all!!
   boolean contains(LocalResourceRequest resource);
 
+  boolean remove(LocalizedResource req, DeletionService delService);
+
+  String getUser();
+
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Mon Jun  6 13:44:06 2011
@@ -17,12 +17,15 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
 /**
@@ -34,12 +37,20 @@ class LocalResourcesTrackerImpl implemen
 
   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
 
+  private final String user;
   private final Dispatcher dispatcher;
-  private final ConcurrentHashMap<LocalResourceRequest,LocalizedResource>
-    localrsrc = new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
+  private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
 
-  public LocalResourcesTrackerImpl(Dispatcher dispatcher) {
+  public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
+    this(user, dispatcher,
+        new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>());
+  }
+
+  LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+      ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) {
+    this.user = user;
     this.dispatcher = dispatcher;
+    this.localrsrc = localrsrc;
   }
 
   @Override
@@ -66,7 +77,40 @@ class LocalResourcesTrackerImpl implemen
 
   @Override
   public boolean contains(LocalResourceRequest resource) {
-    return localrsrc.contains(resource);
+    return localrsrc.containsKey(resource);
+  }
+
+  @Override
+  public boolean remove(LocalizedResource rem, DeletionService delService) {
+    // current synchronization guaranteed by crude RLS event for cleanup
+    LocalizedResource rsrc = localrsrc.remove(rem.getRequest());
+    if (null == rsrc) {
+      LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
+          " from " + getUser());
+      return true;
+    }
+    if (rsrc.getRefCount() > 0
+        || ResourceState.DOWNLOADING.equals(rsrc.getState())
+        || rsrc != rem) {
+      // internal error
+      LOG.error("Attempt to remove resource with non-zero refcount");
+      assert false;
+      return false;
+    }
+    if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
+      delService.delete(getUser(), rsrc.getLocalPath());
+    }
+    return true;
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public Iterator<LocalizedResource> iterator() {
+    return localrsrc.values().iterator();
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Mon Jun  6 13:44:06 2011
@@ -94,7 +94,7 @@ public class LocalizedResource implement
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
         ResourceEventType.REQUEST, new LocalizedResourceTransition())
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
-        ResourceEventType.LOCALIZED, new LocalizedResourceTransition())
+        ResourceEventType.LOCALIZED)
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
         ResourceEventType.RELEASE, new ReleaseTransition())
     .installTopology();
@@ -115,12 +115,12 @@ public class LocalizedResource implement
     StringBuilder sb = new StringBuilder();
     sb.append("{ ").append(rsrc.toString()).append(",")
       .append(getState() == ResourceState.LOCALIZED
-          ? localPath 
+          ? getLocalPath() + "," + getSize()
           : "pending").append(",[");
     for (ContainerId c : ref) {
       sb.append("(").append(c.toString()).append(")");
     }
-    sb.append("],").append(timestamp.get()).append("}");
+    sb.append("],").append(getTimestamp()).append("}");
     return sb.toString();
   }
 
@@ -150,6 +150,22 @@ public class LocalizedResource implement
     return rsrc;
   }
 
+  public Path getLocalPath() {
+    return localPath;
+  }
+
+  public long getTimestamp() {
+    return timestamp.get();
+  }
+
+  public long getSize() {
+    return size;
+  }
+
+  public int getRefCount() {
+    return ref.size();
+  }
+
   public boolean tryAcquire() {
     return sem.tryAcquire();
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Mon Jun  6 13:44:06 2011
@@ -34,24 +34,32 @@ import java.util.concurrent.ExecutorComp
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_MAX_PUBLIC_FETCH_THREADS;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_CACHE_CLEANUP_MS;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCALIZER_BIND_ADDRESS;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_TARGET_CACHE_MB;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_CACHE_CLEANUP_MS;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCALIZER_BIND_ADDRESS;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR;
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_MAX_PUBLIC_FETCH_THREADS;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_TARGET_CACHE_MB;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -99,6 +107,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
@@ -118,6 +127,8 @@ public class ResourceLocalizationService
 
   private Server server;
   private InetSocketAddress localizationServerAddress;
+  private long cacheTargetSize;
+  private long cacheCleanupPeriod;
   private List<Path> logDirs;
   private List<Path> localDirs;
   private List<Path> sysDirs;
@@ -127,6 +138,7 @@ public class ResourceLocalizationService
   private LocalizerTracker localizerTracker;
   private RecordFactory recordFactory;
   private final LocalDirAllocator localDirsSelector;
+  private final ScheduledExecutorService cacheCleanup;
 
   private final LocalResourcesTracker publicRsrc;
   private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
@@ -141,7 +153,8 @@ public class ResourceLocalizationService
     this.dispatcher = dispatcher;
     this.delService = delService;
     this.localDirsSelector = new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
-    this.publicRsrc = new LocalResourcesTrackerImpl(dispatcher);
+    this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
+    this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
   }
 
   FileContext getLocalFileContext(Configuration conf) {
@@ -190,10 +203,16 @@ public class ResourceLocalizationService
     localDirs = Collections.unmodifiableList(localDirs);
     logDirs = Collections.unmodifiableList(logDirs);
     sysDirs = Collections.unmodifiableList(sysDirs);
+    cacheTargetSize =
+      conf.getLong(NM_TARGET_CACHE_MB, DEFAULT_NM_TARGET_CACHE_MB) << 20;
+    cacheCleanupPeriod =
+      conf.getLong(NM_CACHE_CLEANUP_MS, DEFAULT_NM_CACHE_CLEANUP_MS);
     localizationServerAddress = NetUtils.createSocketAddr(
       conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
     localizerTracker = new LocalizerTracker(conf);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
+    cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
+        cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     super.init(conf);
   }
 
@@ -250,12 +269,16 @@ public class ResourceLocalizationService
       Application app =
         ((ApplicationLocalizationEvent)event).getApplication();
       // 0) Create application tracking structs
-      privateRsrc.putIfAbsent(app.getUser(),
-          new LocalResourcesTrackerImpl(dispatcher));
+      userName = app.getUser();
+      privateRsrc.putIfAbsent(userName,
+          new LocalResourcesTrackerImpl(userName, dispatcher));
       if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
-          new LocalResourcesTrackerImpl(dispatcher))) {
+          new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
         LOG.warn("Initializing application " + app + " already present");
         assert false; // TODO: FIXME assert doesn't help
+                      // ^ The condition is benign. Tests should fail and it
+                      //   should appear in logs, but it's an internal error
+                      //   that should have no effect on applications
       }
       // 1) Signal container init
       dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
@@ -288,6 +311,16 @@ public class ResourceLocalizationService
         tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
       }
       break;
+    case CACHE_CLEANUP:
+      ResourceRetentionSet retain =
+        new ResourceRetentionSet(delService, cacheTargetSize);
+      retain.addResources(publicRsrc);
+      LOG.debug("Resource cleanup (public) " + retain);
+      for (LocalResourcesTracker t : privateRsrc.values()) {
+        retain.addResources(t);
+        LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
+      }
+      break;
     case CLEANUP_CONTAINER_RESOURCES:
       Container container =
         ((ContainerLocalizationEvent)event).getContainer();
@@ -536,8 +569,9 @@ public class ResourceLocalizationService
     final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
     final List<LocalizerResourceRequestEvent> pending;
 
+    // TODO: threadsafe, use outer?
     private final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
+      RecordFactoryProvider.getRecordFactory(getConfig());
 
     LocalizerRunner(LocalizerContext context, String localizerId) {
       this.context = context;
@@ -702,17 +736,37 @@ public class ResourceLocalizationService
             context.getUser(),
             ConverterUtils.toString(context.getContainerId().getAppId()),
             localizerId, localDirs);
+      // TODO handle ExitCodeException separately?
       } catch (Exception e) {
+        LOG.info("Localizer failed", e);
         // 3) on error, report failure to Container and signal ABORT
         // 3.1) notify resource of failed localization
+        ContainerId cId = context.getContainerId();
+        dispatcher.getEventHandler().handle(
+            new ContainerResourceFailedEvent(cId, null, e));
+      } finally {
         for (LocalizerResourceRequestEvent event : scheduled.values()) {
           event.getResource().unlock();
         }
-        //dispatcher.getEventHandler().handle(
-        //    new ContainerResourceFailedEvent(current.getContainer(),
-        //      current.getResource().getRequest(), e));
       }
     }
 
   }
+
+  static class CacheCleanup extends Thread {
+
+    private final Dispatcher dispatcher;
+
+    public CacheCleanup(Dispatcher dispatcher) {
+      this.dispatcher = dispatcher;
+    }
+
+    @Override
+    public void run() {
+      dispatcher.getEventHandler().handle(
+          new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));
+    }
+
+  }
+
 }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java?rev=1132638&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java Mon Jun  6 13:44:06 2011
@@ -0,0 +1,78 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+public class ResourceRetentionSet {
+
+  private long delSize;
+  private long currentSize;
+  private final long targetSize;
+  private final DeletionService delService;
+  private final SortedMap<LocalizedResource,LocalResourcesTracker> retain;
+
+  ResourceRetentionSet(DeletionService delService, long targetSize) {
+    this(delService, targetSize, new LRUComparator());
+  }
+
+  ResourceRetentionSet(DeletionService delService, long targetSize,
+      Comparator<? super LocalizedResource> cmp) {
+    this(delService, targetSize,
+        new TreeMap<LocalizedResource,LocalResourcesTracker>(cmp));
+  }
+
+  ResourceRetentionSet(DeletionService delService, long targetSize,
+      SortedMap<LocalizedResource,LocalResourcesTracker> retain) {
+    this.retain = retain;
+    this.delService = delService;
+    this.targetSize = targetSize;
+  }
+
+  public void addResources(LocalResourcesTracker newTracker) {
+    for (LocalizedResource resource : newTracker) {
+      currentSize += resource.getSize();
+      if (resource.getRefCount() > 0) {
+        // always retain resources in use
+        continue;
+      }
+      retain.put(resource, newTracker);
+    }
+    for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
+           retain.entrySet().iterator();
+         currentSize - delSize > targetSize && i.hasNext();) {
+      Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
+      LocalizedResource resource = rsrc.getKey();
+      LocalResourcesTracker tracker = rsrc.getValue();
+      if (tracker.remove(resource, delService)) {
+        delSize += resource.getSize();
+        i.remove();
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Cache: ").append(currentSize).append(", ");
+    sb.append("Deleted: ").append(delSize);
+    return sb.toString();
+  }
+
+  static class LRUComparator implements Comparator<LocalizedResource> {
+    public int compare(LocalizedResource r1, LocalizedResource r2) {
+      long ret = r1.getTimestamp() - r2.getTimestamp();
+      if (0 == ret) {
+        return System.identityHashCode(r1) - System.identityHashCode(r2);
+      }
+      return ret > 0 ? 1 : -1;
+    }
+    public boolean equals(Object other) {
+      return this == other;
+    }
+  }
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java Mon Jun  6 13:44:06 2011
@@ -19,9 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.Event;
 
-public abstract class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
+public class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
 
   public LocalizationEvent(LocalizationEventType event) {
     super(event);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java Mon Jun  6 13:44:06 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 public enum LocalizationEventType {
   INIT_APPLICATION_RESOURCES,
   INIT_CONTAINER_RESOURCES,
+  CACHE_CLEANUP,
   CLEANUP_CONTAINER_RESOURCES,
   DESTROY_APPLICATION_RESOURCES,
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Mon Jun  6 13:44:06 2011
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -281,7 +280,8 @@ public class TestContainerLocalizer {
     return rsrc;
   }
 
-  static DataInputBuffer createFakeCredentials(Random r, int nTok)
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+static DataInputBuffer createFakeCredentials(Random r, int nTok)
       throws IOException {
     Credentials creds = new Credentials();
     byte[] password = new byte[20];

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Mon Jun  6 13:44:06 2011
@@ -1,19 +1,57 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.net.InetSocketAddress;
+
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
+import java.util.Random;
 
+import org.apache.avro.ipc.Server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import org.junit.Test;
+import static org.junit.Assert.*;
 
+import org.mockito.ArgumentMatcher;
 import static org.mockito.Mockito.*;
 
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
@@ -75,4 +113,178 @@ public class TestResourceLocalizationSer
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testLocalizationHeartbeat() throws Exception {
+    Configuration conf = new Configuration();
+    AbstractFileSystem spylfs =
+      spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+    final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+    doNothing().when(spylfs).mkdir(
+        isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[4];
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+      sDirs[i] = localDirs.get(i).toString();
+    }
+    conf.setStrings(NM_LOCAL_DIR, sDirs);
+
+    Server ignore = mock(Server.class);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    DeletionService delService = new DeletionService(exec);
+    delService.init(null);
+    delService.start();
+
+    ResourceLocalizationService rawService =
+      new ResourceLocalizationService(dispatcher, exec, delService);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(ignore).when(spyService).createServer();
+    doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      // init application
+      final Application app = mock(Application.class);
+      final ApplicationId appId = mock(ApplicationId.class);
+      when(appId.getClusterTimestamp()).thenReturn(314159265358979L);
+      when(appId.getId()).thenReturn(3);
+      when(app.getUser()).thenReturn("user0");
+      when(app.getAppId()).thenReturn(appId);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      ArgumentMatcher<ApplicationEvent> matchesAppInit =
+        new ArgumentMatcher<ApplicationEvent>() {
+          @Override
+          public boolean matches(Object o) {
+            ApplicationEvent evt = (ApplicationEvent) o;
+            return evt.getType() == ApplicationEventType.APPLICATION_INITED
+              && appId == evt.getApplicationID();
+          }
+        };
+      dispatcher.await();
+      verify(applicationBus).handle(argThat(matchesAppInit));
+
+      // init container rsrc, localizer
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+      final Container c = getMockContainer(appId, 42);
+      FSDataOutputStream out =
+        new FSDataOutputStream(new DataOutputBuffer(), null);
+      doReturn(out).when(spylfs).createInternal(isA(Path.class),
+          isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
+          anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
+      final LocalResource resource = getMockResource(r);
+      final LocalResourceRequest req = new LocalResourceRequest(resource);
+      spyService.handle(new ContainerLocalizationRequestEvent(
+                c, Collections.singletonList(req),
+                LocalResourceVisibility.PRIVATE));
+      // Sigh. Thread init of private localizer not accessible
+      Thread.sleep(500);
+      dispatcher.await();
+      String appStr = ConverterUtils.toString(appId);
+      String ctnrStr = ConverterUtils.toString(c.getContainerID());
+      verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class),
+            eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+
+      // heartbeat from localizer
+      LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
+      LocalizerStatus stat = mock(LocalizerStatus.class);
+      when(stat.getLocalizerId()).thenReturn(ctnrStr);
+      when(rsrcStat.getResource()).thenReturn(resource);
+      when(rsrcStat.getLocalSize()).thenReturn(4344L);
+      URL locPath = getPath("/cache/private/blah");
+      when(rsrcStat.getLocalPath()).thenReturn(locPath);
+      when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+      when(stat.getResources())
+        .thenReturn(Collections.<LocalResourceStatus>emptyList())
+        .thenReturn(Collections.singletonList(rsrcStat))
+        .thenReturn(Collections.<LocalResourceStatus>emptyList());
+
+      // get rsrc
+      LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
+
+      // empty rsrc
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+      assertEquals(0, response.getAllResources().size());
+
+      // get shutdown
+      response = spyService.heartbeat(stat);
+      assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+      // verify container notification
+      ArgumentMatcher<ContainerEvent> matchesContainerLoc =
+        new ArgumentMatcher<ContainerEvent>() {
+          @Override
+          public boolean matches(Object o) {
+            ContainerEvent evt = (ContainerEvent) o;
+            return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
+              && c.getContainerID() == evt.getContainerID();
+          }
+        };
+      dispatcher.await();
+      verify(containerBus).handle(argThat(matchesContainerLoc));
+    } finally {
+      delService.stop();
+      dispatcher.stop();
+      spyService.stop();
+    }
+  }
+
+  static URL getPath(String path) {
+    URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
+    when(uri.getScheme()).thenReturn("file");
+    when(uri.getHost()).thenReturn(null);
+    when(uri.getFile()).thenReturn(path);
+    return uri;
+  }
+
+  static LocalResource getMockResource(Random r) {
+    LocalResource rsrc = mock(LocalResource.class);
+
+    String name = Long.toHexString(r.nextLong());
+    URL uri = getPath("/local/PRIVATE/" + name);
+
+    when(rsrc.getResource()).thenReturn(uri);
+    when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
+    when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
+    when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
+    when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
+    return rsrc;
+  }
+
+  static Container getMockContainer(ApplicationId appId, int id) {
+    Container c = mock(Container.class);
+    ContainerId cId = mock(ContainerId.class);
+    when(cId.getAppId()).thenReturn(appId);
+    when(cId.getId()).thenReturn(id);
+    when(c.getUser()).thenReturn("user0");
+    when(c.getContainerID()).thenReturn(cId);
+    Credentials creds = new Credentials();
+    creds.addToken(new Text("tok" + id), getToken(id));
+    when(c.getCredentials()).thenReturn(creds);
+    return c;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  static Token<? extends TokenIdentifier> getToken(int id) {
+    return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
+        new Text("kind" + id), new Text("service" + id));
+  }
+
 }

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1132638&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Mon Jun  6 13:44:06 2011
@@ -0,0 +1,83 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Mockito.*;
+
+public class TestResourceRetention {
+
+  @Test
+  public void testRsrcUnused() {
+    DeletionService delService = mock(DeletionService.class);
+    long TARGET_MB = 10 << 20;
+    ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB);
+    // 3MB files @{10, 15}
+    LocalResourcesTracker pubTracker =
+      createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5);
+    // 1MB files @{3, 6, 9, 12}
+    LocalResourcesTracker trackerA =
+      createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3);
+    // 4MB file @{1}
+    LocalResourcesTracker trackerB =
+      createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5);
+    // 2MB files @{7, 9, 11}
+    LocalResourcesTracker trackerC =
+      createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2);
+    // Total cache: 20MB; verify removed at least 10MB
+    rss.addResources(pubTracker);
+    rss.addResources(trackerA);
+    rss.addResources(trackerB);
+    rss.addResources(trackerC);
+    long deleted = 0L;
+    ArgumentCaptor<LocalizedResource> captor =
+      ArgumentCaptor.forClass(LocalizedResource.class);
+    verify(pubTracker, atMost(2))
+      .remove(captor.capture(), isA(DeletionService.class));
+    verify(trackerA, atMost(4))
+      .remove(captor.capture(), isA(DeletionService.class));
+    verify(trackerB, atMost(1))
+      .remove(captor.capture(), isA(DeletionService.class));
+    verify(trackerC, atMost(3))
+      .remove(captor.capture(), isA(DeletionService.class));
+    for (LocalizedResource rem : captor.getAllValues()) {
+      deleted += rem.getSize();
+    }
+    assertTrue(deleted >= 10 * 1024 * 1024);
+    assertTrue(deleted < 15 * 1024 * 1024);
+  }
+
+  LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
+      long nRsrcs, long timestamp, long tsstep) {
+    ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
+      new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
+    LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
+          trackerResources));
+    for (int i = 0; i < nRsrcs; ++i) {
+      final LocalResourceRequest req = new LocalResourceRequest(
+          new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
+          LocalResourceType.FILE);
+      final long ts = timestamp + i * tsstep;
+      final Path p = new Path("file:///local/" + user + "/rsrc" + i);
+      LocalizedResource rsrc = new LocalizedResource(req, null) {
+        @Override public int getRefCount() { return 0; }
+        @Override public long getSize() { return rsrcSize; }
+        @Override public Path getLocalPath() { return p; }
+        @Override public long getTimestamp() { return ts; }
+        @Override
+        public ResourceState getState() { return ResourceState.LOCALIZED; }
+      };
+      trackerResources.put(req, rsrc);
+    }
+    return ret;
+  }
+
+}



Mime
View raw message