hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinayakum...@apache.org
Subject svn commit: r1601151 [2/5] - in /hadoop/common/branches/HDFS-5442/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yar...
Date Sat, 07 Jun 2014 16:29:20 GMT
Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Sat Jun  7 16:29:10 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -27,14 +28,21 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implemen
       .compile(RANDOM_DIR_REGEX);
 
   private final String user;
+  private final ApplicationId appId;
   private final Dispatcher dispatcher;
   private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
   private Configuration conf;
@@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implemen
    * per APPLICATION, USER and PUBLIC cache.
    */
   private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
+  private NMStateStoreService stateStore;
 
-  public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
-      boolean useLocalCacheDirectoryManager, Configuration conf) {
-    this(user, dispatcher,
+  public LocalResourcesTrackerImpl(String user, ApplicationId appId,
+      Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
+      Configuration conf, NMStateStoreService stateStore) {
+    this(user, appId, dispatcher,
       new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
-      useLocalCacheDirectoryManager, conf);
+      useLocalCacheDirectoryManager, conf, stateStore);
   }
 
-  LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+  LocalResourcesTrackerImpl(String user, ApplicationId appId,
+      Dispatcher dispatcher,
       ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
-      boolean useLocalCacheDirectoryManager, Configuration conf) {
+      boolean useLocalCacheDirectoryManager, Configuration conf,
+      NMStateStoreService stateStore) {
+    this.appId = appId;
     this.user = user;
     this.dispatcher = dispatcher;
     this.localrsrc = localrsrc;
@@ -98,6 +112,7 @@ class LocalResourcesTrackerImpl implemen
         new ConcurrentHashMap<LocalResourceRequest, Path>();
     }
     this.conf = conf;
+    this.stateStore = stateStore;
   }
 
   /*
@@ -119,8 +134,7 @@ class LocalResourcesTrackerImpl implemen
       if (rsrc != null && (!isResourcePresent(rsrc))) {
         LOG.info("Resource " + rsrc.getLocalPath()
             + " is missing, localizing it again");
-        localrsrc.remove(req);
-        decrementFileCountForLocalCacheDirectory(req, rsrc);
+        removeResource(req);
         rsrc = null;
       }
       if (null == rsrc) {
@@ -141,15 +155,102 @@ class LocalResourcesTrackerImpl implemen
       }
       break;
     case LOCALIZATION_FAILED:
-      decrementFileCountForLocalCacheDirectory(req, null);
       /*
        * If resource localization fails then Localized resource will be
        * removed from local cache.
        */
-      localrsrc.remove(req);
+      removeResource(req);
+      break;
+    case RECOVERED:
+      if (rsrc != null) {
+        LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
+        return;
+      }
+      rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
+      localrsrc.put(req, rsrc);
       break;
     }
+
     rsrc.handle(event);
+
+    if (event.getType() == ResourceEventType.LOCALIZED) {
+      if (rsrc.getLocalPath() != null) {
+        try {
+          stateStore.finishResourceLocalization(user, appId,
+              buildLocalizedResourceProto(rsrc));
+        } catch (IOException ioe) {
+          LOG.error("Error storing resource state for " + rsrc, ioe);
+        }
+      } else {
+        LOG.warn("Resource " + rsrc + " localized without a location");
+      }
+    }
+  }
+
+  private LocalizedResource recoverResource(LocalResourceRequest req,
+      ResourceRecoveredEvent event) {
+    // unique number for a resource is the directory of the resource
+    Path localDir = event.getLocalPath().getParent();
+    long rsrcId = Long.parseLong(localDir.getName());
+
+    // update ID generator to avoid conflicts with existing resources
+    while (true) {
+      long currentRsrcId = uniqueNumberGenerator.get();
+      long nextRsrcId = Math.max(currentRsrcId, rsrcId);
+      if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
+        break;
+      }
+    }
+
+    incrementFileCountForLocalCacheDirectory(localDir.getParent());
+
+    return new LocalizedResource(req, dispatcher);
+  }
+
+  private LocalizedResourceProto buildLocalizedResourceProto(
+      LocalizedResource rsrc) {
+    return LocalizedResourceProto.newBuilder()
+        .setResource(buildLocalResourceProto(rsrc.getRequest()))
+        .setLocalPath(rsrc.getLocalPath().toString())
+        .setSize(rsrc.getSize())
+        .build();
+  }
+
+  private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
+    LocalResourcePBImpl lrpb;
+    if (!(lr instanceof LocalResourcePBImpl)) {
+      lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
+          lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
+          lr.getPattern());
+    }
+    lrpb = (LocalResourcePBImpl) lr;
+    return lrpb.getProto();
+  }
+
+  public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
+    if (useLocalCacheDirectoryManager) {
+      Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
+          cacheDir);
+      if (cacheRoot != null) {
+        LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
+        if (dir == null) {
+          dir = new LocalCacheDirectoryManager(conf);
+          LocalCacheDirectoryManager otherDir =
+              directoryManagers.putIfAbsent(cacheRoot, dir);
+          if (otherDir != null) {
+            dir = otherDir;
+          }
+        }
+        if (cacheDir.equals(cacheRoot)) {
+          dir.incrementFileCountForPath("");
+        } else {
+          String dirStr = cacheDir.toUri().getRawPath();
+          String rootStr = cacheRoot.toUri().getRawPath();
+          dir.incrementFileCountForPath(
+              dirStr.substring(rootStr.length() + 1));
+        }
+      }
+    }
   }
 
   /*
@@ -217,11 +318,6 @@ class LocalResourcesTrackerImpl implemen
   }
   
   @Override
-  public boolean contains(LocalResourceRequest resource) {
-    return localrsrc.containsKey(resource);
-  }
-
-  @Override
   public boolean remove(LocalizedResource rem, DeletionService delService) {
  // current synchronization guaranteed by crude RLS event for cleanup
     LocalizedResource rsrc = localrsrc.get(rem.getRequest());
@@ -237,16 +333,31 @@ class LocalResourcesTrackerImpl implemen
           + " with non-zero refcount");
       return false;
     } else { // ResourceState is LOCALIZED or INIT
-      localrsrc.remove(rem.getRequest());
       if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
         delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
       }
-      decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
+      removeResource(rem.getRequest());
       LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
       return true;
     }
   }
 
+  private void removeResource(LocalResourceRequest req) {
+    LocalizedResource rsrc = localrsrc.remove(req);
+    decrementFileCountForLocalCacheDirectory(req, rsrc);
+    if (rsrc != null) {
+      Path localPath = rsrc.getLocalPath();
+      if (localPath != null) {
+        try {
+          stateStore.removeLocalizedResource(user, appId, localPath);
+        } catch (IOException e) {
+          LOG.error("Unable to remove resource " + rsrc + " from state store",
+              e);
+        }
+      }
+    }
+  }
+
   /**
    * Returns the path up to the random directory component.
    */
@@ -285,6 +396,7 @@ class LocalResourcesTrackerImpl implemen
   @Override
   public Path
       getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+    Path rPath = localDirPath;
     if (useLocalCacheDirectoryManager && localDirPath != null) {
 
       if (!directoryManagers.containsKey(localDirPath)) {
@@ -293,7 +405,7 @@ class LocalResourcesTrackerImpl implemen
       }
       LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
 
-      Path rPath = localDirPath;
+      rPath = localDirPath;
       String hierarchicalPath = dir.getRelativePathForLocalization();
       // For most of the scenarios we will get root path only which
       // is an empty string
@@ -301,21 +413,36 @@ class LocalResourcesTrackerImpl implemen
         rPath = new Path(localDirPath, hierarchicalPath);
       }
       inProgressLocalResourcesMap.put(req, rPath);
-      return rPath;
-    } else {
-      return localDirPath;
     }
-  }
 
-  @Override
-  public long nextUniqueNumber() {
-    return uniqueNumberGenerator.incrementAndGet();
+    rPath = new Path(rPath,
+        Long.toString(uniqueNumberGenerator.incrementAndGet()));
+    Path localPath = new Path(rPath, req.getPath().getName());
+    LocalizedResource rsrc = localrsrc.get(req);
+    rsrc.setLocalPath(localPath);
+    LocalResource lr = LocalResource.newInstance(req.getResource(),
+        req.getType(), req.getVisibility(), req.getSize(),
+        req.getTimestamp());
+    try {
+      stateStore.startResourceLocalization(user, appId,
+          ((LocalResourcePBImpl) lr).getProto(), localPath);
+    } catch (IOException e) {
+      LOG.error("Unable to record localization start for " + rsrc, e);
+    }
+    return rPath;
   }
 
-  @VisibleForTesting
-  @Private
   @Override
   public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
     return localrsrc.get(request);
   }
-}
\ No newline at end of file
+
+  @VisibleForTesting
+  LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
+    LocalCacheDirectoryManager mgr = null;
+    if (useLocalCacheDirectoryManager) {
+      mgr = directoryManagers.get(localDirPath);
+    }
+    return mgr;
+  }
+}

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Sat Jun  7 16:29:10 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -54,8 +55,8 @@ public class LocalizedResource implement
 
   private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
 
-  Path localPath;
-  long size = -1;
+  volatile Path localPath;
+  volatile long size = -1;
   final LocalResourceRequest rsrc;
   final Dispatcher dispatcher;
   final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
@@ -76,6 +77,8 @@ public class LocalizedResource implement
     // From INIT (ref == 0, awaiting req)
     .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
         ResourceEventType.REQUEST, new FetchResourceTransition())
+    .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
+        ResourceEventType.RECOVERED, new RecoveredTransition())
 
     // From DOWNLOADING (ref > 0, may be localizing)
     .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
@@ -157,6 +160,10 @@ public class LocalizedResource implement
     return localPath;
   }
 
+  public void setLocalPath(Path localPath) {
+    this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath);
+  }
+
   public long getTimestamp() {
     return timestamp.get();
   }
@@ -234,7 +241,8 @@ public class LocalizedResource implement
     @Override
     public void transition(LocalizedResource rsrc, ResourceEvent event) {
       ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
-      rsrc.localPath = locEvent.getLocation();
+      rsrc.localPath =
+          Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
       rsrc.size = locEvent.getSize();
       for (ContainerId container : rsrc.ref) {
         rsrc.dispatcher.getEventHandler().handle(
@@ -291,4 +299,13 @@ public class LocalizedResource implement
       rsrc.release(relEvent.getContainer());
     }
   }
+
+  private static class RecoveredTransition extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event;
+      rsrc.localPath = recoveredEvent.getLocalPath();
+      rsrc.size = recoveredEvent.getSize();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Sat Jun  7 16:29:10 2014
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -81,6 +82,8 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
@@ -109,10 +112,15 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -142,6 +150,7 @@ public class ResourceLocalizationService
   private RecordFactory recordFactory;
   private final ScheduledExecutorService cacheCleanup;
   private LocalizerTokenSecretManager secretManager;
+  private NMStateStoreService stateStore;
 
   private LocalResourcesTracker publicRsrc;
 
@@ -163,7 +172,7 @@ public class ResourceLocalizationService
 
   public ResourceLocalizationService(Dispatcher dispatcher,
       ContainerExecutor exec, DeletionService delService,
-      LocalDirsHandlerService dirsHandler) {
+      LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
 
     super(ResourceLocalizationService.class.getName());
     this.exec = exec;
@@ -175,6 +184,7 @@ public class ResourceLocalizationService
         new ThreadFactoryBuilder()
           .setNameFormat("ResourceLocalizationService Cache Cleanup")
           .build());
+    this.stateStore = stateStore;
   }
 
   FileContext getLocalFileContext(Configuration conf) {
@@ -203,15 +213,17 @@ public class ResourceLocalizationService
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     this.validateConf(conf);
-    this.publicRsrc =
-        new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
+    this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
+        true, conf, stateStore);
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {
       FileContext lfs = getLocalFileContext(conf);
       lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
 
-      cleanUpLocalDir(lfs,delService);
+      if (!stateStore.canRecover()) {
+        cleanUpLocalDir(lfs,delService);
+      }
 
       List<String> localDirs = dirsHandler.getLocalDirs();
       for (String localDir : localDirs) {
@@ -249,6 +261,74 @@ public class ResourceLocalizationService
     super.serviceInit(conf);
   }
 
+  //Recover localized resources after an NM restart
+  public void recoverLocalizedResources(RecoveredLocalizationState state)
+      throws URISyntaxException {
+    LocalResourceTrackerState trackerState = state.getPublicTrackerState();
+    recoverTrackerResources(publicRsrc, trackerState);
+
+    for (Map.Entry<String, RecoveredUserResources> userEntry :
+         state.getUserResources().entrySet()) {
+      String user = userEntry.getKey();
+      RecoveredUserResources userResources = userEntry.getValue();
+      trackerState = userResources.getPrivateTrackerState();
+      if (!trackerState.isEmpty()) {
+        LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+            null, dispatcher, true, super.getConfig(), stateStore);
+        LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
+            tracker);
+        if (oldTracker != null) {
+          tracker = oldTracker;
+        }
+        recoverTrackerResources(tracker, trackerState);
+      }
+
+      for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
+           userResources.getAppTrackerStates().entrySet()) {
+        trackerState = appEntry.getValue();
+        if (!trackerState.isEmpty()) {
+          ApplicationId appId = appEntry.getKey();
+          String appIdStr = ConverterUtils.toString(appId);
+          LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+              appId, dispatcher, false, super.getConfig(), stateStore);
+          LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+              tracker);
+          if (oldTracker != null) {
+            tracker = oldTracker;
+          }
+          recoverTrackerResources(tracker, trackerState);
+        }
+      }
+    }
+  }
+
+  private void recoverTrackerResources(LocalResourcesTracker tracker,
+      LocalResourceTrackerState state) throws URISyntaxException {
+    for (LocalizedResourceProto proto : state.getLocalizedResources()) {
+      LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+      LocalResourceRequest req = new LocalResourceRequest(rsrc);
+      LOG.info("Recovering localized resource " + req + " at "
+          + proto.getLocalPath());
+      tracker.handle(new ResourceRecoveredEvent(req,
+          new Path(proto.getLocalPath()), proto.getSize()));
+    }
+
+    for (Map.Entry<LocalResourceProto, Path> entry :
+         state.getInProgressResources().entrySet()) {
+      LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
+      LocalResourceRequest req = new LocalResourceRequest(rsrc);
+      Path localPath = entry.getValue();
+      tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
+
+      // delete any in-progress localizations, containers will request again
+      LOG.info("Deleting in-progress localization for " + req + " at "
+          + localPath);
+      tracker.remove(tracker.getLocalizedResource(req), delService);
+    }
+
+    // TODO: remove untracked directories in local filesystem
+  }
+
   @Override
   public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
     return localizerTracker.processHeartbeat(status);
@@ -337,17 +417,10 @@ public class ResourceLocalizationService
     // 0) Create application tracking structs
     String userName = app.getUser();
     privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
-      dispatcher, true, super.getConfig()));
-    if (null != appRsrc.putIfAbsent(
-      ConverterUtils.toString(app.getAppId()),
-      new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
-        .getConfig()))) {
-      LOG.warn("Initializing application " + app + " already present");
-      assert false; // TODO: FIXME assert doesn't help
-                    // ^ The condition is benign. Tests should fail and it
-                    // should appear in logs, but it's an internal error
-                    // that should have no effect on applications
-    }
+        null, dispatcher, true, super.getConfig(), stateStore));
+    String appIdStr = ConverterUtils.toString(app.getAppId());
+    appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
+        app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
     // 1) Signal container init
     //
     // This is handled by the ApplicationImpl state machine and allows
@@ -446,18 +519,28 @@ public class ResourceLocalizationService
 
   @SuppressWarnings({"unchecked"})
   private void handleDestroyApplicationResources(Application application) {
-    String userName;
-    String appIDStr;
+    String userName = application.getUser();
+    ApplicationId appId = application.getAppId();
+    String appIDStr = application.toString();
     LocalResourcesTracker appLocalRsrcsTracker =
-      appRsrc.remove(ConverterUtils.toString(application.getAppId()));
-    if (null == appLocalRsrcsTracker) {
+      appRsrc.remove(ConverterUtils.toString(appId));
+    if (appLocalRsrcsTracker != null) {
+      for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
+        Path localPath = rsrc.getLocalPath();
+        if (localPath != null) {
+          try {
+            stateStore.removeLocalizedResource(userName, appId, localPath);
+          } catch (IOException e) {
+            LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
+                + " from state store", e);
+          }
+        }
+      }
+    } else {
       LOG.warn("Removing uninitialized application " + application);
     }
-    // TODO: What to do with appLocalRsrcsTracker?
 
     // Delete the application directories
-    userName = application.getUser();
-    appIDStr = application.toString();
     for (String localDir : dirsHandler.getLocalDirs()) {
 
       // Delete the user-owned app-dir
@@ -668,19 +751,15 @@ public class ResourceLocalizationService
         if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
           LocalResource resource = request.getResource().getRequest();
           try {
-            Path publicDirDestPath =
+            Path publicRootPath =
                 dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
                     + ContainerLocalizer.FILECACHE,
                   ContainerLocalizer.getEstimatedSize(resource), true);
-            Path hierarchicalPath =
-                publicRsrc.getPathForLocalization(key, publicDirDestPath);
-            if (!hierarchicalPath.equals(publicDirDestPath)) {
-              publicDirDestPath = hierarchicalPath;
+            Path publicDirDestPath =
+                publicRsrc.getPathForLocalization(key, publicRootPath);
+            if (!publicDirDestPath.getParent().equals(publicRootPath)) {
               DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
             }
-            publicDirDestPath =
-                new Path(publicDirDestPath, Long.toString(publicRsrc
-                  .nextUniqueNumber()));
             // explicitly synchronize pending here to avoid future task
             // completing and being dequeued before pending updated
             synchronized (pending) {
@@ -968,9 +1047,8 @@ public class ResourceLocalizationService
       Path dirPath =
           dirsHandler.getLocalPathForWrite(cacheDirectory,
             ContainerLocalizer.getEstimatedSize(rsrc), false);
-      dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
-        dirPath);
-      return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+      return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+          dirPath);
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Sat Jun  7 16:29:10 2014
@@ -31,5 +31,7 @@ public enum ResourceEventType {
   /** See {@link ResourceReleaseEvent} */
   RELEASE,
   /** See {@link ResourceFailedLocalizationEvent} */
-  LOCALIZATION_FAILED
+  LOCALIZATION_FAILED,
+  /** See {@link ResourceRecoveredEvent} */
+  RECOVERED
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Sat Jun  7 16:29:10 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 
 public class DummyContainerManager extends ContainerManagerImpl {
@@ -75,7 +76,7 @@ public class DummyContainerManager exten
   protected ResourceLocalizationService createResourceLocalizationService(
       ContainerExecutor exec, DeletionService deletionContext) {
     return new ResourceLocalizationService(super.dispatcher, exec,
-        deletionContext, super.dirsHandler) {
+        deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
       @Override
       public void handle(LocalizationEvent event) {
         switch (event.getType()) {

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Sat Jun  7 16:29:10 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -79,7 +80,8 @@ public class TestEventFlow {
     YarnConfiguration conf = new YarnConfiguration();
     
     Context context = new NMContext(new NMContainerTokenSecretManager(conf),
-        new NMTokenSecretManagerInNM(), null, null) {
+        new NMTokenSecretManagerInNM(), null, null,
+        new NMNullStateStoreService()) {
       @Override
       public int getHttpPort() {
         return 1234;

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java Sat Jun  7 16:29:10 2014
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -27,17 +30,18 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.junit.Assert;
-
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -46,6 +50,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -53,10 +58,12 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -185,6 +192,9 @@ public class TestNodeManagerResync {
         TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE);
     final Container container =
         TestNodeStatusUpdater.getMockContainer(testCompleteContainer);
+    NMContainerStatus report =
+        createNMContainerStatus(2, ContainerState.COMPLETE);
+    when(container.getNMContainerStatus()).thenReturn(report);
     NodeManager nm = new NodeManager() {
       int registerCount = 0;
 
@@ -203,7 +213,7 @@ public class TestNodeManagerResync {
                 if (registerCount == 0) {
                   // first register, no containers info.
                   try {
-                    Assert.assertEquals(0, request.getContainerStatuses()
+                    Assert.assertEquals(0, request.getNMContainerStatuses()
                       .size());
                   } catch (AssertionError error) {
                     error.printStackTrace();
@@ -214,8 +224,8 @@ public class TestNodeManagerResync {
                     testCompleteContainer.getContainerId(), container);
                 } else {
                   // second register contains the completed container info.
-                  List<ContainerStatus> statuses =
-                      request.getContainerStatuses();
+                  List<NMContainerStatus> statuses =
+                      request.getNMContainerStatuses();
                   try {
                     Assert.assertEquals(1, statuses.size());
                     Assert.assertEquals(testCompleteContainer.getContainerId(),
@@ -510,4 +520,16 @@ public class TestNodeManagerResync {
         }
       }
     }}
+
+  public static NMContainerStatus createNMContainerStatus(int id,
+      ContainerState containerState) {
+    ApplicationId applicationId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, 1);
+    ContainerId containerId = ContainerId.newInstance(applicationAttemptId, id);
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, containerState,
+          Resource.newInstance(1024, 1), "recover container", 0);
+    return containerReport;
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Sat Jun  7 16:29:10 2014
@@ -109,6 +109,36 @@ public class TestNodeManagerShutdown {
   }
   
   @Test
+  public void testStateStoreRemovalOnDecommission() throws IOException {
+    final File recoveryDir = new File(basedir, "nm-recovery");
+    nm = new TestNodeManager();
+    YarnConfiguration conf = createNMConfig();
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.NM_RECOVERY_DIR, recoveryDir.getAbsolutePath());
+
+    // verify state store is not removed on normal shutdown
+    nm.init(conf);
+    nm.start();
+    Assert.assertTrue(recoveryDir.exists());
+    Assert.assertTrue(recoveryDir.isDirectory());
+    nm.stop();
+    nm = null;
+    Assert.assertTrue(recoveryDir.exists());
+    Assert.assertTrue(recoveryDir.isDirectory());
+
+    // verify state store is removed on decommissioned shutdown
+    nm = new TestNodeManager();
+    nm.init(conf);
+    nm.start();
+    Assert.assertTrue(recoveryDir.exists());
+    Assert.assertTrue(recoveryDir.isDirectory());
+    nm.getNMContext().setDecommissioned(true);
+    nm.stop();
+    nm = null;
+    Assert.assertFalse(recoveryDir.exists());
+  }
+
+  @Test
   public void testKillContainersOnShutdown() throws IOException,
       YarnException {
     nm = new TestNodeManager();

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Sat Jun  7 16:29:10 2014
@@ -91,6 +91,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 
 @SuppressWarnings("rawtypes")
 public class TestNodeStatusUpdater {
@@ -1159,7 +1161,8 @@ public class TestNodeStatusUpdater {
       @Override
       protected NMContext createNMContext(
           NMContainerTokenSecretManager containerTokenSecretManager,
-          NMTokenSecretManagerInNM nmTokenSecretManager) {
+          NMTokenSecretManagerInNM nmTokenSecretManager,
+          NMStateStoreService store) {
         return new MyNMContext(containerTokenSecretManager,
           nmTokenSecretManager);
       }
@@ -1268,7 +1271,8 @@ public class TestNodeStatusUpdater {
     public MyNMContext(
         NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager) {
-      super(containerTokenSecretManager, nmTokenSecretManager, null, null);
+      super(containerTokenSecretManager, nmTokenSecretManager, null, null,
+          new NMNullStateStoreService());
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Sat Jun  7 16:29:10 2014
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -103,7 +104,8 @@ public abstract class BaseContainerManag
   protected static final int HTTP_PORT = 5412;
   protected Configuration conf = new YarnConfiguration();
   protected Context context = new NMContext(new NMContainerTokenSecretManager(
-    conf), new NMTokenSecretManagerInNM(), null, new ApplicationACLsManager(conf)) {
+    conf), new NMTokenSecretManagerInNM(), null,
+    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java Sat Jun  7 16:29:10 2014
@@ -23,6 +23,7 @@ import org.junit.Assert;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
 import org.junit.Test;
 
 public class TestLocalCacheDirectoryManager {
@@ -73,7 +74,7 @@ public class TestLocalCacheDirectoryMana
     conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
     Exception e = null;
     ResourceLocalizationService service =
-        new ResourceLocalizationService(null, null, null, null);
+        new ResourceLocalizationService(null, null, null, null, null);
     try {
       service.init(conf);
     } catch (Exception e1) {
@@ -109,4 +110,49 @@ public class TestLocalCacheDirectoryMana
     // first sub directory
     Assert.assertEquals(firstSubDir, dir.getRelativePathForLocalization());
   }
+
+  @Test
+  public void testDirectoryConversion() {
+    for (int i = 0; i < 10000; ++i) {
+      String path = Directory.getRelativePath(i);
+      Assert.assertEquals("Incorrect conversion for " + i, i,
+          Directory.getDirectoryNumber(path));
+    }
+  }
+
+  @Test
+  public void testIncrementFileCountForPath() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY,
+        LocalCacheDirectoryManager.DIRECTORIES_PER_LEVEL + 2);
+    LocalCacheDirectoryManager mgr = new LocalCacheDirectoryManager(conf);
+    final String rootPath = "";
+    mgr.incrementFileCountForPath(rootPath);
+    Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
+    Assert.assertFalse("root dir should be full",
+        rootPath.equals(mgr.getRelativePathForLocalization()));
+    // finish filling the other directory
+    mgr.getRelativePathForLocalization();
+    // free up space in the root dir
+    mgr.decrementFileCountForPath(rootPath);
+    mgr.decrementFileCountForPath(rootPath);
+    Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
+    Assert.assertEquals(rootPath, mgr.getRelativePathForLocalization());
+    String otherDir = mgr.getRelativePathForLocalization();
+    Assert.assertFalse("root dir should be full", otherDir.equals(rootPath));
+
+    final String deepDir0 = "d/e/e/p/0";
+    final String deepDir1 = "d/e/e/p/1";
+    final String deepDir2 = "d/e/e/p/2";
+    final String deepDir3 = "d/e/e/p/3";
+    mgr.incrementFileCountForPath(deepDir0);
+    Assert.assertEquals(otherDir, mgr.getRelativePathForLocalization());
+    Assert.assertEquals(deepDir0, mgr.getRelativePathForLocalization());
+    Assert.assertEquals("total dir count incorrect after increment",
+        deepDir1, mgr.getRelativePathForLocalization());
+    mgr.incrementFileCountForPath(deepDir2);
+    mgr.incrementFileCountForPath(deepDir1);
+    mgr.incrementFileCountForPath(deepDir2);
+    Assert.assertEquals(deepDir3, mgr.getRelativePathForLocalization());
+  }
 }

Modified: hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1601151&r1=1601150&r2=1601151&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-5442/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Sat Jun  7 16:29:10 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -34,13 +35,17 @@ import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
@@ -52,10 +57,14 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 public class TestLocalResourcesTrackerImpl {
 
@@ -92,8 +101,8 @@ public class TestLocalResourcesTrackerIm
       localrsrc.put(req1, lr1);
       localrsrc.put(req2, lr2);
       LocalResourcesTracker tracker =
-          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
-            conf);
+          new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+              false, conf, new NMNullStateStoreService());
 
       ResourceEvent req11Event =
           new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -176,7 +185,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       localrsrc.put(req1, lr1);
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          dispatcher, localrsrc, false, conf);
+          null, dispatcher, localrsrc, false, conf,
+          new NMNullStateStoreService());
 
       ResourceEvent req11Event = new ResourceRequestEvent(req1,
           LocalResourceVisibility.PUBLIC, lc1);
@@ -246,7 +256,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker =
-          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
+          new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+              true, conf, new NMNullStateStoreService());
 
       LocalResourceRequest lr =
           createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
@@ -264,6 +275,7 @@ public class TestLocalResourcesTrackerIm
 
       // Container-1 requesting local resource.
       tracker.handle(reqEvent1);
+      dispatcher.await();
 
       // New localized Resource should have been added to local resource map
       // and the requesting container will be added to its waiting queue.
@@ -280,6 +292,7 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent2 =
           new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
       tracker.handle(reqEvent2);
+      dispatcher.await();
 
       // Container 2 should have been added to the waiting queue of the local
       // resource
@@ -295,6 +308,7 @@ public class TestLocalResourcesTrackerIm
       LocalizedResource localizedResource = localrsrc.get(lr);
       
       tracker.handle(resourceFailedEvent);
+      dispatcher.await();
 
       // After receiving failed resource event; all waiting containers will be
       // notified with Container Resource Failed Event.
@@ -308,6 +322,7 @@ public class TestLocalResourcesTrackerIm
       // exception.
       ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
       tracker.handle(relEvent1);
+      dispatcher.await();
 
       // Container-3 now requests for the same resource. This request call
       // is coming prior to Container-2's release call.
@@ -316,6 +331,7 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent3 =
           new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
       tracker.handle(reqEvent3);
+      dispatcher.await();
 
       // Local resource cache now should have the requested resource and the
       // number of waiting containers should be 1.
@@ -327,6 +343,7 @@ public class TestLocalResourcesTrackerIm
       // Container-2 Releases the resource
       ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
       tracker.handle(relEvent2);
+      dispatcher.await();
 
       // Making sure that there is no change in the cache after the release.
       Assert.assertEquals(1, localrsrc.size());
@@ -340,6 +357,7 @@ public class TestLocalResourcesTrackerIm
       ResourceLocalizedEvent localizedEvent =
           new ResourceLocalizedEvent(lr, localizedPath, 123L);
       tracker.handle(localizedEvent);
+      dispatcher.await();
       
       // Verifying ContainerResourceLocalizedEvent .
       verify(containerEventHandler, times(1)).handle(
@@ -351,6 +369,7 @@ public class TestLocalResourcesTrackerIm
       // Container-3 releasing the resource.
       ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
       tracker.handle(relEvent3);
+      dispatcher.await();
       
       Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
       
@@ -384,7 +403,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          dispatcher, localrsrc, true, conf);
+          null, dispatcher, localrsrc, true, conf,
+          new NMNullStateStoreService());
 
       // This is a random path. NO File creation will take place at this place.
       Path localDir = new Path("/tmp");
@@ -401,7 +421,9 @@ public class TestLocalResourcesTrackerIm
       tracker.handle(reqEvent1);
 
       // Simulate the process of localization of lr1
-      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+      // NOTE: Localization path from tracker has resource ID at end
+      Path hierarchicalPath1 =
+          tracker.getPathForLocalization(lr1, localDir).getParent();
       // Simulate lr1 getting localized
       ResourceLocalizedEvent rle1 =
           new ResourceLocalizedEvent(lr1,
@@ -417,7 +439,8 @@ public class TestLocalResourcesTrackerIm
           new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent2);
 
-      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      Path hierarchicalPath2 =
+          tracker.getPathForLocalization(lr2, localDir).getParent();
       // localization failed.
       ResourceFailedLocalizationEvent rfe2 =
           new ResourceFailedLocalizationEvent(
@@ -435,7 +458,8 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
           LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent3);
-      Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
+      Path hierarchicalPath3 =
+          tracker.getPathForLocalization(lr3, localDir).getParent();
       // localization successful
       ResourceLocalizedEvent rle3 =
           new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
@@ -479,6 +503,284 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testStateStoreSuccessfulLocalization() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    DeletionService mockDelService = mock(DeletionService.class);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+      // Container 1 requests lr1 to be localized
+      ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+          LocalResourceVisibility.APPLICATION, lc1);
+      tracker.handle(reqEvent1);
+      dispatcher.await();
+
+      // Simulate the process of localization of lr1
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+      ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+          ArgumentCaptor.forClass(LocalResourceProto.class);
+      ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+          localResourceCaptor.capture(), pathCaptor.capture());
+      LocalResourceProto lrProto = localResourceCaptor.getValue();
+      Path localizedPath1 = pathCaptor.getValue();
+      Assert.assertEquals(lr1,
+          new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+      Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+      // Simulate lr1 getting localized
+      ResourceLocalizedEvent rle1 =
+          new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120);
+      tracker.handle(rle1);
+      dispatcher.await();
+
+      ArgumentCaptor<LocalizedResourceProto> localizedProtoCaptor =
+          ArgumentCaptor.forClass(LocalizedResourceProto.class);
+      verify(stateStore).finishResourceLocalization(eq(user), eq(appId),
+          localizedProtoCaptor.capture());
+      LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue();
+      Assert.assertEquals(lr1, new LocalResourceRequest(
+          new LocalResourcePBImpl(localizedProto.getResource())));
+      Assert.assertEquals(localizedPath1.toString(),
+          localizedProto.getLocalPath());
+      LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1);
+      Assert.assertNotNull(localizedRsrc1);
+
+      // simulate release and retention processing
+      tracker.handle(new ResourceReleaseEvent(lr1, cId1));
+      dispatcher.await();
+      boolean removeResult = tracker.remove(localizedRsrc1, mockDelService);
+
+      Assert.assertTrue(removeResult);
+      verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+          eq(localizedPath1));
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testStateStoreFailedLocalization() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+      // Container 1 requests lr1 to be localized
+      ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+          LocalResourceVisibility.APPLICATION, lc1);
+      tracker.handle(reqEvent1);
+      dispatcher.await();
+
+      // Simulate the process of localization of lr1
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+      ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+          ArgumentCaptor.forClass(LocalResourceProto.class);
+      ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+          localResourceCaptor.capture(), pathCaptor.capture());
+      LocalResourceProto lrProto = localResourceCaptor.getValue();
+      Path localizedPath1 = pathCaptor.getValue();
+      Assert.assertEquals(lr1,
+          new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+      Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+      ResourceFailedLocalizationEvent rfe1 =
+          new ResourceFailedLocalizationEvent(
+              lr1, new Exception("Test").toString());
+      tracker.handle(rfe1);
+      dispatcher.await();
+      verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+          eq(localizedPath1));
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveredResource() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp/localdir");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      Assert.assertNull(tracker.getLocalizedResource(lr1));
+      final long localizedId1 = 52;
+      Path hierarchicalPath1 = new Path(localDir,
+          Long.toString(localizedId1));
+      Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+
+      // verify new paths reflect recovery of previous resources
+      LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc2 = new LocalizerContext(user, cId1, null);
+      ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2,
+          LocalResourceVisibility.APPLICATION, lc2);
+      tracker.handle(reqEvent2);
+      dispatcher.await();
+      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
+      Assert.assertEquals(localizedId1 + 1, localizedId2);
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveredResourceWithDirCacheMgr() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDirRoot = new Path("/tmp/localdir");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, true, conf, stateStore);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr1));
+      final long localizedId1 = 52;
+      Path hierarchicalPath1 = new Path(localDirRoot + "/4/2",
+          Long.toString(localizedId1));
+      Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+      LocalCacheDirectoryManager dirMgrRoot =
+          tracker.getDirectoryManager(localDirRoot);
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/2").getCount());
+
+      LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr2));
+      final long localizedId2 = localizedId1 + 1;
+      Path hierarchicalPath2 = new Path(localDirRoot + "/4/2",
+          Long.toString(localizedId2));
+      Path localizedPath2 = new Path(hierarchicalPath2, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr2, localizedPath2, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr2));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+
+      LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr3));
+      final long localizedId3 = 128;
+      Path hierarchicalPath3 = new Path(localDirRoot + "/4/3",
+          Long.toString(localizedId3));
+      Path localizedPath3 = new Path(hierarchicalPath3, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr3, localizedPath3, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr3));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+
+      LocalResourceRequest lr4 = createLocalResourceRequest(user, 4, 4,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr4));
+      final long localizedId4 = 256;
+      Path hierarchicalPath4 = new Path(localDirRoot + "/4",
+          Long.toString(localizedId4));
+      Path localizedPath4 = new Path(hierarchicalPath4, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr4, localizedPath4, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr4));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
   private boolean createdummylocalizefile(Path path) {
     boolean ret = false;
     File file = new File(path.toUri().getRawPath().toString());



Mime
View raw message