Author: cdouglas
Date: Mon Jun 6 13:44:06 2011
New Revision: 1132638
URL: http://svn.apache.org/viewvc?rev=1132638&view=rev
Log:
Add deletion of distributed cache resources.
Added:
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon Jun 6 13:44:06 2011
@@ -4,6 +4,9 @@ Trunk (unreleased changes)
MAPREDUCE-279
+
+ Add deletion of distributed cache resources. (cdouglas)
+
Fix class cast exception in Task abort for old mapreduce apis. (sharad)
Fix various issues with Web UI's. (Luke Lu)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Mon Jun 6 13:44:06 2011
@@ -92,6 +92,7 @@ public class ConverterUtils {
}
// TODO: Why thread local?
+ // ^ NumberFormat instances are not threadsafe
private static final ThreadLocal<NumberFormat> appIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
@@ -104,6 +105,7 @@ public class ConverterUtils {
};
// TODO: Why thread local?
+ // ^ NumberFormat instances are not threadsafe
private static final ThreadLocal<NumberFormat> containerIdFormat =
new ThreadLocal<NumberFormat>() {
@Override
@@ -159,4 +161,4 @@ public class ConverterUtils {
containerId.setId(Integer.parseInt(it.next()));
return containerId;
}
-}
\ No newline at end of file
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Mon Jun 6 13:44:06 2011
@@ -89,4 +89,15 @@ public class NMConfig {
NM_PREFIX + "container.manager.threads";
public static final int DEFAULT_NM_CONTAINER_MGR_THREADS = 5;
+
+ public static final String NM_TARGET_CACHE_MB =
+ NM_PREFIX + "target.cache.size";
+
+ public static final long DEFAULT_NM_TARGET_CACHE_MB = 10 * 1024;
+
+ public static final String NM_CACHE_CLEANUP_MS =
+ NM_PREFIX + "target.cache.cleanup.period.ms";
+
+ public static final long DEFAULT_NM_CACHE_CLEANUP_MS = 10 * 60 * 1000;
+
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Mon Jun 6 13:44:06 2011
@@ -140,6 +140,9 @@ public class ApplicationImpl implements
// Transitions from APPLICATION_RESOURCES_CLEANINGUP state
.addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+ ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+ ApplicationEventType.APPLICATION_CONTAINER_FINISHED)
+ .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
new AppCompletelyDoneTransition())
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Mon Jun 6 13:44:06 2011
@@ -194,6 +194,10 @@ public class ContainerImpl implements Co
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new ContainerKilledTransition())
+ .addTransition(ContainerState.KILLING,
+ ContainerState.KILLING,
+ ContainerEventType.RESOURCE_LOCALIZED,
+ new LocalizedResourceDuringKillTransition())
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
@@ -205,6 +209,10 @@ public class ContainerImpl implements Co
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition())
+ .addTransition(ContainerState.KILLING,
+ ContainerState.DONE,
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
+ CONTAINER_DONE_TRANSITION)
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -568,6 +576,24 @@ public class ContainerImpl implements Co
}
@SuppressWarnings("unchecked") // dispatcher not typed
+ static class LocalizedResourceDuringKillTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
+ String sym = container.pendingResources.remove(rsrcEvent.getResource());
+ if (null == sym) {
+ LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
+ " for container " + container.getContainerID());
+ assert false;
+ // fail container?
+ return;
+ }
+ container.localizedResources.put(rsrcEvent.getLocation(), sym);
+ }
+ }
+
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class KillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java Mon Jun 6 13:44:06 2011
@@ -41,9 +41,15 @@ public class LocalResourceRequest
*/
public LocalResourceRequest(LocalResource resource)
throws URISyntaxException {
- this.loc = ConverterUtils.getPathFromYarnURL(resource.getResource());
- this.timestamp = resource.getTimestamp();
- this.type = resource.getType();
+ this(ConverterUtils.getPathFromYarnURL(resource.getResource()),
+ resource.getTimestamp(),
+ resource.getType());
+ }
+
+ LocalResourceRequest(Path loc, long timestamp, LocalResourceType type) {
+ this.loc = loc;
+ this.timestamp = timestamp;
+ this.type = type;
}
@Override
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Mon Jun 6 13:44:06 2011
@@ -20,15 +20,21 @@ package org.apache.hadoop.yarn.server.no
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
/**
* Component tracking resources all of the same {@link LocalResourceVisibility}
*
*/
-interface LocalResourcesTracker extends EventHandler<ResourceEvent> {
+interface LocalResourcesTracker
+ extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> {
// TODO: Not used at all!!
boolean contains(LocalResourceRequest resource);
+ boolean remove(LocalizedResource req, DeletionService delService);
+
+ String getUser();
+
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Mon Jun 6 13:44:06 2011
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
/**
@@ -34,12 +37,20 @@ class LocalResourcesTrackerImpl implemen
static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
+ private final String user;
private final Dispatcher dispatcher;
- private final ConcurrentHashMap<LocalResourceRequest,LocalizedResource>
- localrsrc = new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
+ private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
- public LocalResourcesTrackerImpl(Dispatcher dispatcher) {
+ public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher) {
+ this(user, dispatcher,
+ new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>());
+ }
+
+ LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+ ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc) {
+ this.user = user;
this.dispatcher = dispatcher;
+ this.localrsrc = localrsrc;
}
@Override
@@ -66,7 +77,40 @@ class LocalResourcesTrackerImpl implemen
@Override
public boolean contains(LocalResourceRequest resource) {
- return localrsrc.contains(resource);
+ return localrsrc.containsKey(resource);
+ }
+
+ @Override
+ public boolean remove(LocalizedResource rem, DeletionService delService) {
+ // current synchronization guaranteed by crude RLS event for cleanup
+ LocalizedResource rsrc = localrsrc.remove(rem.getRequest());
+ if (null == rsrc) {
+ LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
+ " from " + getUser());
+ return true;
+ }
+ if (rsrc.getRefCount() > 0
+ || ResourceState.DOWNLOADING.equals(rsrc.getState())
+ || rsrc != rem) {
+ // internal error
+ LOG.error("Attempt to remove resource with non-zero refcount");
+ assert false;
+ return false;
+ }
+ if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
+ delService.delete(getUser(), rsrc.getLocalPath());
+ }
+ return true;
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public Iterator<LocalizedResource> iterator() {
+ return localrsrc.values().iterator();
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Mon Jun 6 13:44:06 2011
@@ -94,7 +94,7 @@ public class LocalizedResource implement
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.REQUEST, new LocalizedResourceTransition())
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
- ResourceEventType.LOCALIZED, new LocalizedResourceTransition())
+ ResourceEventType.LOCALIZED)
.addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
ResourceEventType.RELEASE, new ReleaseTransition())
.installTopology();
@@ -115,12 +115,12 @@ public class LocalizedResource implement
StringBuilder sb = new StringBuilder();
sb.append("{ ").append(rsrc.toString()).append(",")
.append(getState() == ResourceState.LOCALIZED
- ? localPath
+ ? getLocalPath() + "," + getSize()
: "pending").append(",[");
for (ContainerId c : ref) {
sb.append("(").append(c.toString()).append(")");
}
- sb.append("],").append(timestamp.get()).append("}");
+ sb.append("],").append(getTimestamp()).append("}");
return sb.toString();
}
@@ -150,6 +150,22 @@ public class LocalizedResource implement
return rsrc;
}
+ public Path getLocalPath() {
+ return localPath;
+ }
+
+ public long getTimestamp() {
+ return timestamp.get();
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public int getRefCount() {
+ return ref.size();
+ }
+
public boolean tryAcquire() {
return sem.tryAcquire();
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Mon Jun 6 13:44:06 2011
@@ -34,24 +34,32 @@ import java.util.concurrent.ExecutorComp
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_MAX_PUBLIC_FETCH_THREADS;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_CACHE_CLEANUP_MS;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCALIZER_BIND_ADDRESS;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_TARGET_CACHE_MB;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_CACHE_CLEANUP_MS;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCALIZER_BIND_ADDRESS;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_MAX_PUBLIC_FETCH_THREADS;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_TARGET_CACHE_MB;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -99,6 +107,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
@@ -118,6 +127,8 @@ public class ResourceLocalizationService
private Server server;
private InetSocketAddress localizationServerAddress;
+ private long cacheTargetSize;
+ private long cacheCleanupPeriod;
private List<Path> logDirs;
private List<Path> localDirs;
private List<Path> sysDirs;
@@ -127,6 +138,7 @@ public class ResourceLocalizationService
private LocalizerTracker localizerTracker;
private RecordFactory recordFactory;
private final LocalDirAllocator localDirsSelector;
+ private final ScheduledExecutorService cacheCleanup;
private final LocalResourcesTracker publicRsrc;
private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
@@ -141,7 +153,8 @@ public class ResourceLocalizationService
this.dispatcher = dispatcher;
this.delService = delService;
this.localDirsSelector = new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
- this.publicRsrc = new LocalResourcesTrackerImpl(dispatcher);
+ this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
+ this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
}
FileContext getLocalFileContext(Configuration conf) {
@@ -190,10 +203,16 @@ public class ResourceLocalizationService
localDirs = Collections.unmodifiableList(localDirs);
logDirs = Collections.unmodifiableList(logDirs);
sysDirs = Collections.unmodifiableList(sysDirs);
+ cacheTargetSize =
+ conf.getLong(NM_TARGET_CACHE_MB, DEFAULT_NM_TARGET_CACHE_MB) << 20;
+ cacheCleanupPeriod =
+ conf.getLong(NM_CACHE_CLEANUP_MS, DEFAULT_NM_CACHE_CLEANUP_MS);
localizationServerAddress = NetUtils.createSocketAddr(
conf.get(NM_LOCALIZER_BIND_ADDRESS, DEFAULT_NM_LOCALIZER_BIND_ADDRESS));
localizerTracker = new LocalizerTracker(conf);
dispatcher.register(LocalizerEventType.class, localizerTracker);
+ cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
+ cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
super.init(conf);
}
@@ -250,12 +269,16 @@ public class ResourceLocalizationService
Application app =
((ApplicationLocalizationEvent)event).getApplication();
// 0) Create application tracking structs
- privateRsrc.putIfAbsent(app.getUser(),
- new LocalResourcesTrackerImpl(dispatcher));
+ userName = app.getUser();
+ privateRsrc.putIfAbsent(userName,
+ new LocalResourcesTrackerImpl(userName, dispatcher));
if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
- new LocalResourcesTrackerImpl(dispatcher))) {
+ new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
LOG.warn("Initializing application " + app + " already present");
assert false; // TODO: FIXME assert doesn't help
+ // ^ The condition is benign. Tests should fail and it
+ // should appear in logs, but it's an internal error
+ // that should have no effect on applications
}
// 1) Signal container init
dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
@@ -288,6 +311,16 @@ public class ResourceLocalizationService
tracker.handle(new ResourceRequestEvent(req, vis, ctxt));
}
break;
+ case CACHE_CLEANUP:
+ ResourceRetentionSet retain =
+ new ResourceRetentionSet(delService, cacheTargetSize);
+ retain.addResources(publicRsrc);
+ LOG.debug("Resource cleanup (public) " + retain);
+ for (LocalResourcesTracker t : privateRsrc.values()) {
+ retain.addResources(t);
+ LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
+ }
+ break;
case CLEANUP_CONTAINER_RESOURCES:
Container container =
((ContainerLocalizationEvent)event).getContainer();
@@ -536,8 +569,9 @@ public class ResourceLocalizationService
final Map<LocalResourceRequest,LocalizerResourceRequestEvent> scheduled;
final List<LocalizerResourceRequestEvent> pending;
+ // TODO: threadsafe, use outer?
private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
+ RecordFactoryProvider.getRecordFactory(getConfig());
LocalizerRunner(LocalizerContext context, String localizerId) {
this.context = context;
@@ -702,17 +736,37 @@ public class ResourceLocalizationService
context.getUser(),
ConverterUtils.toString(context.getContainerId().getAppId()),
localizerId, localDirs);
+ // TODO handle ExitCodeException separately?
} catch (Exception e) {
+ LOG.info("Localizer failed", e);
// 3) on error, report failure to Container and signal ABORT
// 3.1) notify resource of failed localization
+ ContainerId cId = context.getContainerId();
+ dispatcher.getEventHandler().handle(
+ new ContainerResourceFailedEvent(cId, null, e));
+ } finally {
for (LocalizerResourceRequestEvent event : scheduled.values()) {
event.getResource().unlock();
}
- //dispatcher.getEventHandler().handle(
- // new ContainerResourceFailedEvent(current.getContainer(),
- // current.getResource().getRequest(), e));
}
}
}
+
+ static class CacheCleanup extends Thread {
+
+ private final Dispatcher dispatcher;
+
+ public CacheCleanup(Dispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void run() {
+ dispatcher.getEventHandler().handle(
+ new LocalizationEvent(LocalizationEventType.CACHE_CLEANUP));
+ }
+
+ }
+
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java?rev=1132638&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceRetentionSet.java Mon Jun 6 13:44:06 2011
@@ -0,0 +1,78 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+public class ResourceRetentionSet {
+
+ private long delSize;
+ private long currentSize;
+ private final long targetSize;
+ private final DeletionService delService;
+ private final SortedMap<LocalizedResource,LocalResourcesTracker> retain;
+
+ ResourceRetentionSet(DeletionService delService, long targetSize) {
+ this(delService, targetSize, new LRUComparator());
+ }
+
+ ResourceRetentionSet(DeletionService delService, long targetSize,
+ Comparator<? super LocalizedResource> cmp) {
+ this(delService, targetSize,
+ new TreeMap<LocalizedResource,LocalResourcesTracker>(cmp));
+ }
+
+ ResourceRetentionSet(DeletionService delService, long targetSize,
+ SortedMap<LocalizedResource,LocalResourcesTracker> retain) {
+ this.retain = retain;
+ this.delService = delService;
+ this.targetSize = targetSize;
+ }
+
+ public void addResources(LocalResourcesTracker newTracker) {
+ for (LocalizedResource resource : newTracker) {
+ currentSize += resource.getSize();
+ if (resource.getRefCount() > 0) {
+ // always retain resources in use
+ continue;
+ }
+ retain.put(resource, newTracker);
+ }
+ for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
+ retain.entrySet().iterator();
+ currentSize - delSize > targetSize && i.hasNext();) {
+ Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
+ LocalizedResource resource = rsrc.getKey();
+ LocalResourcesTracker tracker = rsrc.getValue();
+ if (tracker.remove(resource, delService)) {
+ delSize += resource.getSize();
+ i.remove();
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Cache: ").append(currentSize).append(", ");
+ sb.append("Deleted: ").append(delSize);
+ return sb.toString();
+ }
+
+ static class LRUComparator implements Comparator<LocalizedResource> {
+ public int compare(LocalizedResource r1, LocalizedResource r2) {
+ long ret = r1.getTimestamp() - r2.getTimestamp();
+ if (0 == ret) {
+ return System.identityHashCode(r1) - System.identityHashCode(r2);
+ }
+ return ret > 0 ? 1 : -1;
+ }
+ public boolean equals(Object other) {
+ return this == other;
+ }
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java Mon Jun 6 13:44:06 2011
@@ -19,9 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.yarn.event.Event;
-public abstract class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
+public class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
public LocalizationEvent(LocalizationEventType event) {
super(event);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEventType.java Mon Jun 6 13:44:06 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
public enum LocalizationEventType {
INIT_APPLICATION_RESOURCES,
INIT_CONTAINER_RESOURCES,
+ CACHE_CLEANUP,
CLEANUP_CONTAINER_RESOURCES,
DESTROY_APPLICATION_RESOURCES,
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java Mon Jun 6 13:44:06 2011
@@ -37,7 +37,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -281,7 +280,8 @@ public class TestContainerLocalizer {
return rsrc;
}
- static DataInputBuffer createFakeCredentials(Random r, int nTok)
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+static DataInputBuffer createFakeCredentials(Random r, int nTok)
throws IOException {
Credentials creds = new Credentials();
byte[] password = new byte[20];
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1132638&r1=1132637&r2=1132638&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Mon Jun 6 13:44:06 2011
@@ -1,19 +1,57 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+import java.net.InetSocketAddress;
+
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
+import java.util.Random;
+import org.apache.avro.ipc.Server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
+import static org.junit.Assert.*;
+import org.mockito.ArgumentMatcher;
import static org.mockito.Mockito.*;
import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
@@ -75,4 +113,178 @@ public class TestResourceLocalizationSer
}
}
+ @Test
+ @SuppressWarnings("unchecked") // mocked generics
+ public void testLocalizationHeartbeat() throws Exception {
+ Configuration conf = new Configuration();
+ AbstractFileSystem spylfs =
+ spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
+ final FileContext lfs = FileContext.getFileContext(spylfs, conf);
+ doNothing().when(spylfs).mkdir(
+ isA(Path.class), isA(FsPermission.class), anyBoolean());
+
+ List<Path> localDirs = new ArrayList<Path>();
+ String[] sDirs = new String[4];
+ for (int i = 0; i < 4; ++i) {
+ localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+ sDirs[i] = localDirs.get(i).toString();
+ }
+ conf.setStrings(NM_LOCAL_DIR, sDirs);
+
+ Server ignore = mock(Server.class);
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, applicationBus);
+ EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+ dispatcher.register(ContainerEventType.class, containerBus);
+
+ ContainerExecutor exec = mock(ContainerExecutor.class);
+ DeletionService delService = new DeletionService(exec);
+ delService.init(null);
+ delService.start();
+
+ ResourceLocalizationService rawService =
+ new ResourceLocalizationService(dispatcher, exec, delService);
+ ResourceLocalizationService spyService = spy(rawService);
+ doReturn(ignore).when(spyService).createServer();
+ doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+ try {
+ spyService.init(conf);
+ spyService.start();
+
+ // init application
+ final Application app = mock(Application.class);
+ final ApplicationId appId = mock(ApplicationId.class);
+ when(appId.getClusterTimestamp()).thenReturn(314159265358979L);
+ when(appId.getId()).thenReturn(3);
+ when(app.getUser()).thenReturn("user0");
+ when(app.getAppId()).thenReturn(appId);
+ spyService.handle(new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+ ArgumentMatcher<ApplicationEvent> matchesAppInit =
+ new ArgumentMatcher<ApplicationEvent>() {
+ @Override
+ public boolean matches(Object o) {
+ ApplicationEvent evt = (ApplicationEvent) o;
+ return evt.getType() == ApplicationEventType.APPLICATION_INITED
+ && appId == evt.getApplicationID();
+ }
+ };
+ dispatcher.await();
+ verify(applicationBus).handle(argThat(matchesAppInit));
+
+ // init container rsrc, localizer
+ Random r = new Random();
+ long seed = r.nextLong();
+ System.out.println("SEED: " + seed);
+ r.setSeed(seed);
+ final Container c = getMockContainer(appId, 42);
+ FSDataOutputStream out =
+ new FSDataOutputStream(new DataOutputBuffer(), null);
+ doReturn(out).when(spylfs).createInternal(isA(Path.class),
+ isA(EnumSet.class), isA(FsPermission.class), anyInt(), anyShort(),
+ anyLong(), isA(Progressable.class), anyInt(), anyBoolean());
+ final LocalResource resource = getMockResource(r);
+ final LocalResourceRequest req = new LocalResourceRequest(resource);
+ spyService.handle(new ContainerLocalizationRequestEvent(
+ c, Collections.singletonList(req),
+ LocalResourceVisibility.PRIVATE));
+ // Sigh. Thread init of private localizer not accessible
+ Thread.sleep(500);
+ dispatcher.await();
+ String appStr = ConverterUtils.toString(appId);
+ String ctnrStr = ConverterUtils.toString(c.getContainerID());
+ verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class),
+ eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+
+ // heartbeat from localizer
+ LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
+ LocalizerStatus stat = mock(LocalizerStatus.class);
+ when(stat.getLocalizerId()).thenReturn(ctnrStr);
+ when(rsrcStat.getResource()).thenReturn(resource);
+ when(rsrcStat.getLocalSize()).thenReturn(4344L);
+ URL locPath = getPath("/cache/private/blah");
+ when(rsrcStat.getLocalPath()).thenReturn(locPath);
+ when(rsrcStat.getStatus()).thenReturn(ResourceStatusType.FETCH_SUCCESS);
+ when(stat.getResources())
+ .thenReturn(Collections.<LocalResourceStatus>emptyList())
+ .thenReturn(Collections.singletonList(rsrcStat))
+ .thenReturn(Collections.<LocalResourceStatus>emptyList());
+
+ // get rsrc
+ LocalizerHeartbeatResponse response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+ assertEquals(req, new LocalResourceRequest(response.getLocalResource(0)));
+
+ // empty rsrc
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.LIVE, response.getLocalizerAction());
+ assertEquals(0, response.getAllResources().size());
+
+ // get shutdown
+ response = spyService.heartbeat(stat);
+ assertEquals(LocalizerAction.DIE, response.getLocalizerAction());
+
+ // verify container notification
+ ArgumentMatcher<ContainerEvent> matchesContainerLoc =
+ new ArgumentMatcher<ContainerEvent>() {
+ @Override
+ public boolean matches(Object o) {
+ ContainerEvent evt = (ContainerEvent) o;
+ return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED
+ && c.getContainerID() == evt.getContainerID();
+ }
+ };
+ dispatcher.await();
+ verify(containerBus).handle(argThat(matchesContainerLoc));
+ } finally {
+ delService.stop();
+ dispatcher.stop();
+ spyService.stop();
+ }
+ }
+
+ static URL getPath(String path) {
+ URL uri = mock(org.apache.hadoop.yarn.api.records.URL.class);
+ when(uri.getScheme()).thenReturn("file");
+ when(uri.getHost()).thenReturn(null);
+ when(uri.getFile()).thenReturn(path);
+ return uri;
+ }
+
+ static LocalResource getMockResource(Random r) {
+ LocalResource rsrc = mock(LocalResource.class);
+
+ String name = Long.toHexString(r.nextLong());
+ URL uri = getPath("/local/PRIVATE/" + name);
+
+ when(rsrc.getResource()).thenReturn(uri);
+ when(rsrc.getSize()).thenReturn(r.nextInt(1024) + 1024L);
+ when(rsrc.getTimestamp()).thenReturn(r.nextInt(1024) + 2048L);
+ when(rsrc.getType()).thenReturn(LocalResourceType.FILE);
+ when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
+ return rsrc;
+ }
+
+ static Container getMockContainer(ApplicationId appId, int id) {
+ Container c = mock(Container.class);
+ ContainerId cId = mock(ContainerId.class);
+ when(cId.getAppId()).thenReturn(appId);
+ when(cId.getId()).thenReturn(id);
+ when(c.getUser()).thenReturn("user0");
+ when(c.getContainerID()).thenReturn(cId);
+ Credentials creds = new Credentials();
+ creds.addToken(new Text("tok" + id), getToken(id));
+ when(c.getCredentials()).thenReturn(creds);
+ return c;
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ static Token<? extends TokenIdentifier> getToken(int id) {
+ return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
+ new Text("kind" + id), new Text("service" + id));
+ }
+
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1132638&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Mon Jun 6 13:44:06 2011
@@ -0,0 +1,83 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Mockito.*;
+
+public class TestResourceRetention {
+
+ @Test
+ public void testRsrcUnused() {
+ DeletionService delService = mock(DeletionService.class);
+ long TARGET_MB = 10 << 20;
+ ResourceRetentionSet rss = new ResourceRetentionSet(delService, TARGET_MB);
+ // 3MB files @{10, 15}
+ LocalResourcesTracker pubTracker =
+ createMockTracker(null, 3 * 1024 * 1024, 2, 10, 5);
+ // 1MB files @{3, 6, 9, 12}
+ LocalResourcesTracker trackerA =
+ createMockTracker("A", 1 * 1024 * 1024, 4, 3, 3);
+ // 4MB file @{1}
+ LocalResourcesTracker trackerB =
+ createMockTracker("B", 4 * 1024 * 1024, 1, 10, 5);
+ // 2MB files @{7, 9, 11}
+ LocalResourcesTracker trackerC =
+ createMockTracker("C", 2 * 1024 * 1024, 3, 7, 2);
+ // Total cache: 20MB; verify removed at least 10MB
+ rss.addResources(pubTracker);
+ rss.addResources(trackerA);
+ rss.addResources(trackerB);
+ rss.addResources(trackerC);
+ long deleted = 0L;
+ ArgumentCaptor<LocalizedResource> captor =
+ ArgumentCaptor.forClass(LocalizedResource.class);
+ verify(pubTracker, atMost(2))
+ .remove(captor.capture(), isA(DeletionService.class));
+ verify(trackerA, atMost(4))
+ .remove(captor.capture(), isA(DeletionService.class));
+ verify(trackerB, atMost(1))
+ .remove(captor.capture(), isA(DeletionService.class));
+ verify(trackerC, atMost(3))
+ .remove(captor.capture(), isA(DeletionService.class));
+ for (LocalizedResource rem : captor.getAllValues()) {
+ deleted += rem.getSize();
+ }
+ assertTrue(deleted >= 10 * 1024 * 1024);
+ assertTrue(deleted < 15 * 1024 * 1024);
+ }
+
+ LocalResourcesTracker createMockTracker(String user, final long rsrcSize,
+ long nRsrcs, long timestamp, long tsstep) {
+ ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
+ new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
+ LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
+ trackerResources));
+ for (int i = 0; i < nRsrcs; ++i) {
+ final LocalResourceRequest req = new LocalResourceRequest(
+ new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
+ LocalResourceType.FILE);
+ final long ts = timestamp + i * tsstep;
+ final Path p = new Path("file:///local/" + user + "/rsrc" + i);
+ LocalizedResource rsrc = new LocalizedResource(req, null) {
+ @Override public int getRefCount() { return 0; }
+ @Override public long getSize() { return rsrcSize; }
+ @Override public Path getLocalPath() { return p; }
+ @Override public long getTimestamp() { return ts; }
+ @Override
+ public ResourceState getState() { return ResourceState.LOCALIZED; }
+ };
+ trackerResources.put(req, rsrc);
+ }
+ return ret;
+ }
+
+}
|