hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cl...@apache.org
Subject svn commit: r1598785 [2/2] - in /hadoop/common/branches/fs-encryption/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/view/ hadoop...
Date Sat, 31 May 2014 00:01:26 GMT
Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Sat May 31 00:01:20 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -34,13 +35,17 @@ import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
@@ -52,10 +57,14 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 public class TestLocalResourcesTrackerImpl {
 
@@ -92,8 +101,8 @@ public class TestLocalResourcesTrackerIm
       localrsrc.put(req1, lr1);
       localrsrc.put(req2, lr2);
       LocalResourcesTracker tracker =
-          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, false,
-            conf);
+          new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+              false, conf, new NMNullStateStoreService());
 
       ResourceEvent req11Event =
           new ResourceRequestEvent(req1, LocalResourceVisibility.PUBLIC, lc1);
@@ -176,7 +185,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       localrsrc.put(req1, lr1);
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          dispatcher, localrsrc, false, conf);
+          null, dispatcher, localrsrc, false, conf,
+          new NMNullStateStoreService());
 
       ResourceEvent req11Event = new ResourceRequestEvent(req1,
           LocalResourceVisibility.PUBLIC, lc1);
@@ -246,7 +256,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker =
-          new LocalResourcesTrackerImpl(user, dispatcher, localrsrc, true, conf);
+          new LocalResourcesTrackerImpl(user, null, dispatcher, localrsrc,
+              true, conf, new NMNullStateStoreService());
 
       LocalResourceRequest lr =
           createLocalResourceRequest(user, 1, 1, LocalResourceVisibility.PUBLIC);
@@ -264,6 +275,7 @@ public class TestLocalResourcesTrackerIm
 
       // Container-1 requesting local resource.
       tracker.handle(reqEvent1);
+      dispatcher.await();
 
       // New localized Resource should have been added to local resource map
       // and the requesting container will be added to its waiting queue.
@@ -280,6 +292,7 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent2 =
           new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc2);
       tracker.handle(reqEvent2);
+      dispatcher.await();
 
       // Container 2 should have been added to the waiting queue of the local
       // resource
@@ -295,6 +308,7 @@ public class TestLocalResourcesTrackerIm
       LocalizedResource localizedResource = localrsrc.get(lr);
       
       tracker.handle(resourceFailedEvent);
+      dispatcher.await();
 
       // After receiving failed resource event; all waiting containers will be
       // notified with Container Resource Failed Event.
@@ -308,6 +322,7 @@ public class TestLocalResourcesTrackerIm
       // exception.
       ResourceReleaseEvent relEvent1 = new ResourceReleaseEvent(lr, cId1);
       tracker.handle(relEvent1);
+      dispatcher.await();
 
       // Container-3 now requests for the same resource. This request call
       // is coming prior to Container-2's release call.
@@ -316,6 +331,7 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent3 =
           new ResourceRequestEvent(lr, LocalResourceVisibility.PRIVATE, lc3);
       tracker.handle(reqEvent3);
+      dispatcher.await();
 
       // Local resource cache now should have the requested resource and the
       // number of waiting containers should be 1.
@@ -327,6 +343,7 @@ public class TestLocalResourcesTrackerIm
       // Container-2 Releases the resource
       ResourceReleaseEvent relEvent2 = new ResourceReleaseEvent(lr, cId2);
       tracker.handle(relEvent2);
+      dispatcher.await();
 
       // Making sure that there is no change in the cache after the release.
       Assert.assertEquals(1, localrsrc.size());
@@ -340,6 +357,7 @@ public class TestLocalResourcesTrackerIm
       ResourceLocalizedEvent localizedEvent =
           new ResourceLocalizedEvent(lr, localizedPath, 123L);
       tracker.handle(localizedEvent);
+      dispatcher.await();
       
       // Verifying ContainerResourceLocalizedEvent .
       verify(containerEventHandler, times(1)).handle(
@@ -351,6 +369,7 @@ public class TestLocalResourcesTrackerIm
       // Container-3 releasing the resource.
       ResourceReleaseEvent relEvent3 = new ResourceReleaseEvent(lr, cId3);
       tracker.handle(relEvent3);
+      dispatcher.await();
       
       Assert.assertEquals(0, localrsrc.get(lr).getRefCount());
       
@@ -384,7 +403,8 @@ public class TestLocalResourcesTrackerIm
       ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc =
           new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
       LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
-          dispatcher, localrsrc, true, conf);
+          null, dispatcher, localrsrc, true, conf,
+          new NMNullStateStoreService());
 
       // This is a random path. NO File creation will take place at this place.
       Path localDir = new Path("/tmp");
@@ -401,7 +421,9 @@ public class TestLocalResourcesTrackerIm
       tracker.handle(reqEvent1);
 
       // Simulate the process of localization of lr1
-      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+      // NOTE: Localization path from tracker has resource ID at end
+      Path hierarchicalPath1 =
+          tracker.getPathForLocalization(lr1, localDir).getParent();
       // Simulate lr1 getting localized
       ResourceLocalizedEvent rle1 =
           new ResourceLocalizedEvent(lr1,
@@ -417,7 +439,8 @@ public class TestLocalResourcesTrackerIm
           new ResourceRequestEvent(lr2, LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent2);
 
-      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      Path hierarchicalPath2 =
+          tracker.getPathForLocalization(lr2, localDir).getParent();
       // localization failed.
       ResourceFailedLocalizationEvent rfe2 =
           new ResourceFailedLocalizationEvent(
@@ -435,7 +458,8 @@ public class TestLocalResourcesTrackerIm
       ResourceEvent reqEvent3 = new ResourceRequestEvent(lr3,
           LocalResourceVisibility.PUBLIC, lc1);
       tracker.handle(reqEvent3);
-      Path hierarchicalPath3 = tracker.getPathForLocalization(lr3, localDir);
+      Path hierarchicalPath3 =
+          tracker.getPathForLocalization(lr3, localDir).getParent();
       // localization successful
       ResourceLocalizedEvent rle3 =
           new ResourceLocalizedEvent(lr3, new Path(hierarchicalPath3.toUri()
@@ -479,6 +503,284 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testStateStoreSuccessfulLocalization() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    DeletionService mockDelService = mock(DeletionService.class);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+      // Container 1 requests lr1 to be localized
+      ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+          LocalResourceVisibility.APPLICATION, lc1);
+      tracker.handle(reqEvent1);
+      dispatcher.await();
+
+      // Simulate the process of localization of lr1
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+      ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+          ArgumentCaptor.forClass(LocalResourceProto.class);
+      ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+          localResourceCaptor.capture(), pathCaptor.capture());
+      LocalResourceProto lrProto = localResourceCaptor.getValue();
+      Path localizedPath1 = pathCaptor.getValue();
+      Assert.assertEquals(lr1,
+          new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+      Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+      // Simulate lr1 getting localized
+      ResourceLocalizedEvent rle1 =
+          new ResourceLocalizedEvent(lr1, pathCaptor.getValue(), 120);
+      tracker.handle(rle1);
+      dispatcher.await();
+
+      ArgumentCaptor<LocalizedResourceProto> localizedProtoCaptor =
+          ArgumentCaptor.forClass(LocalizedResourceProto.class);
+      verify(stateStore).finishResourceLocalization(eq(user), eq(appId),
+          localizedProtoCaptor.capture());
+      LocalizedResourceProto localizedProto = localizedProtoCaptor.getValue();
+      Assert.assertEquals(lr1, new LocalResourceRequest(
+          new LocalResourcePBImpl(localizedProto.getResource())));
+      Assert.assertEquals(localizedPath1.toString(),
+          localizedProto.getLocalPath());
+      LocalizedResource localizedRsrc1 = tracker.getLocalizedResource(lr1);
+      Assert.assertNotNull(localizedRsrc1);
+
+      // simulate release and retention processing
+      tracker.handle(new ResourceReleaseEvent(lr1, cId1));
+      dispatcher.await();
+      boolean removeResult = tracker.remove(localizedRsrc1, mockDelService);
+
+      Assert.assertTrue(removeResult);
+      verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+          eq(localizedPath1));
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testStateStoreFailedLocalization() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+
+      // Container 1 requests lr1 to be localized
+      ResourceEvent reqEvent1 = new ResourceRequestEvent(lr1,
+          LocalResourceVisibility.APPLICATION, lc1);
+      tracker.handle(reqEvent1);
+      dispatcher.await();
+
+      // Simulate the process of localization of lr1
+      Path hierarchicalPath1 = tracker.getPathForLocalization(lr1, localDir);
+
+      ArgumentCaptor<LocalResourceProto> localResourceCaptor =
+          ArgumentCaptor.forClass(LocalResourceProto.class);
+      ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(stateStore).startResourceLocalization(eq(user), eq(appId),
+          localResourceCaptor.capture(), pathCaptor.capture());
+      LocalResourceProto lrProto = localResourceCaptor.getValue();
+      Path localizedPath1 = pathCaptor.getValue();
+      Assert.assertEquals(lr1,
+          new LocalResourceRequest(new LocalResourcePBImpl(lrProto)));
+      Assert.assertEquals(hierarchicalPath1, localizedPath1.getParent());
+
+      ResourceFailedLocalizationEvent rfe1 =
+          new ResourceFailedLocalizationEvent(
+              lr1, new Exception("Test").toString());
+      tracker.handle(rfe1);
+      dispatcher.await();
+      verify(stateStore).removeLocalizedResource(eq(user), eq(appId),
+          eq(localizedPath1));
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveredResource() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDir = new Path("/tmp/localdir");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, false, conf, stateStore);
+      // Container 1 needs lr1 resource
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.APPLICATION);
+      Assert.assertNull(tracker.getLocalizedResource(lr1));
+      final long localizedId1 = 52;
+      Path hierarchicalPath1 = new Path(localDir,
+          Long.toString(localizedId1));
+      Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+
+      // verify new paths reflect recovery of previous resources
+      LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+          LocalResourceVisibility.APPLICATION);
+      LocalizerContext lc2 = new LocalizerContext(user, cId1, null);
+      ResourceEvent reqEvent2 = new ResourceRequestEvent(lr2,
+          LocalResourceVisibility.APPLICATION, lc2);
+      tracker.handle(reqEvent2);
+      dispatcher.await();
+      Path hierarchicalPath2 = tracker.getPathForLocalization(lr2, localDir);
+      long localizedId2 = Long.parseLong(hierarchicalPath2.getName());
+      Assert.assertEquals(localizedId1 + 1, localizedId2);
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testRecoveredResourceWithDirCacheMgr() throws Exception {
+    final String user = "someuser";
+    final ApplicationId appId = ApplicationId.newInstance(1, 1);
+    // This is a random path. NO File creation will take place at this place.
+    final Path localDirRoot = new Path("/tmp/localdir");
+    Configuration conf = new YarnConfiguration();
+    DrainDispatcher dispatcher = null;
+    dispatcher = createDispatcher(conf);
+    EventHandler<LocalizerEvent> localizerEventHandler =
+        mock(EventHandler.class);
+    EventHandler<LocalizerEvent> containerEventHandler =
+        mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+    dispatcher.register(ContainerEventType.class, containerEventHandler);
+    NMStateStoreService stateStore = mock(NMStateStoreService.class);
+
+    try {
+      LocalResourcesTrackerImpl tracker = new LocalResourcesTrackerImpl(user,
+          appId, dispatcher, true, conf, stateStore);
+      LocalResourceRequest lr1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr1));
+      final long localizedId1 = 52;
+      Path hierarchicalPath1 = new Path(localDirRoot + "/4/2",
+          Long.toString(localizedId1));
+      Path localizedPath1 = new Path(hierarchicalPath1, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr1, localizedPath1, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr1));
+      LocalCacheDirectoryManager dirMgrRoot =
+          tracker.getDirectoryManager(localDirRoot);
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/2").getCount());
+
+      LocalResourceRequest lr2 = createLocalResourceRequest(user, 2, 2,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr2));
+      final long localizedId2 = localizedId1 + 1;
+      Path hierarchicalPath2 = new Path(localDirRoot + "/4/2",
+          Long.toString(localizedId2));
+      Path localizedPath2 = new Path(hierarchicalPath2, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr2, localizedPath2, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr2));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+
+      LocalResourceRequest lr3 = createLocalResourceRequest(user, 3, 3,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr3));
+      final long localizedId3 = 128;
+      Path hierarchicalPath3 = new Path(localDirRoot + "/4/3",
+          Long.toString(localizedId3));
+      Path localizedPath3 = new Path(hierarchicalPath3, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr3, localizedPath3, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr3));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+
+      LocalResourceRequest lr4 = createLocalResourceRequest(user, 4, 4,
+          LocalResourceVisibility.PUBLIC);
+      Assert.assertNull(tracker.getLocalizedResource(lr4));
+      final long localizedId4 = 256;
+      Path hierarchicalPath4 = new Path(localDirRoot + "/4",
+          Long.toString(localizedId4));
+      Path localizedPath4 = new Path(hierarchicalPath4, "resource.jar");
+      tracker.handle(new ResourceRecoveredEvent(lr4, localizedPath4, 120));
+      dispatcher.await();
+      Assert.assertNotNull(tracker.getLocalizedResource(lr4));
+      Assert.assertEquals(0, dirMgrRoot.getDirectory("").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4").getCount());
+      Assert.assertEquals(2, dirMgrRoot.getDirectory("4/2").getCount());
+      Assert.assertEquals(1, dirMgrRoot.getDirectory("4/3").getCount());
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
   private boolean createdummylocalizefile(Path path) {
     boolean ret = false;
     File file = new File(path.toUri().getRawPath().toString());

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Sat May 31 00:01:20 2014
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
@@ -120,6 +122,10 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
@@ -188,7 +194,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService locService =
       spy(new ResourceLocalizationService(dispatcher, exec, delService,
-                                          diskhandler));
+                                          diskhandler,
+                                          new NMNullStateStoreService()));
     doReturn(lfs)
       .when(locService).getLocalFileContext(isA(Configuration.class));
     try {
@@ -253,7 +260,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler);
+                                      dirsHandler,
+                                      new NMNullStateStoreService());
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -287,7 +295,7 @@ public class TestResourceLocalizationSer
               user, appId);
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
       
       // init resources
       Random r = new Random();
@@ -402,6 +410,233 @@ public class TestResourceLocalizationSer
     }
   }
   
+  @Test
+  @SuppressWarnings("unchecked") // mocked generics
+  public void testRecovery() throws Exception {
+    final String user1 = "user1";
+    final String user2 = "user2";
+    final ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+    final ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+
+    List<Path> localDirs = new ArrayList<Path>();
+    String[] sDirs = new String[4];
+    for (int i = 0; i < 4; ++i) {
+      localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
+      sDirs[i] = localDirs.get(i).toString();
+    }
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+
+    NMMemoryStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+    //Ignore actual localization
+    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerBus);
+
+    LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+    dirsHandler.init(conf);
+
+    ResourceLocalizationService spyService =
+        createSpyService(dispatcher, dirsHandler, stateStore);
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      final Application app1 = mock(Application.class);
+      when(app1.getUser()).thenReturn(user1);
+      when(app1.getAppId()).thenReturn(appId1);
+      final Application app2 = mock(Application.class);
+      when(app2.getUser()).thenReturn(user2);
+      when(app2.getAppId()).thenReturn(appId2);
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app1));
+      spyService.handle(new ApplicationLocalizationEvent(
+          LocalizationEventType.INIT_APPLICATION_RESOURCES, app2));
+      dispatcher.await();
+
+      //Get a handle on the trackers after they're setup with INIT_APP_RESOURCES
+      LocalResourcesTracker appTracker1 =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user1, appId1);
+      LocalResourcesTracker privTracker1 =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+              user1, null);
+      LocalResourcesTracker appTracker2 =
+          spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user2, appId2);
+      LocalResourcesTracker pubTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+              null, null);
+
+      // init containers
+      final Container c1 = getMockContainer(appId1, 1, user1);
+      final Container c2 = getMockContainer(appId2, 2, user2);
+
+      // init resources
+      Random r = new Random();
+      long seed = r.nextLong();
+      System.out.println("SEED: " + seed);
+      r.setSeed(seed);
+
+      // Send localization requests of each type.
+      final LocalResource privResource1 = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq1 =
+          new LocalResourceRequest(privResource1);
+      final LocalResource privResource2 = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq2 =
+          new LocalResourceRequest(privResource2);
+
+      final LocalResource pubResource1 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq1 =
+          new LocalResourceRequest(pubResource1);
+      final LocalResource pubResource2 = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq2 =
+          new LocalResourceRequest(pubResource2);
+
+      final LocalResource appResource1 = getAppMockedResource(r);
+      final LocalResourceRequest appReq1 =
+          new LocalResourceRequest(appResource1);
+      final LocalResource appResource2 = getAppMockedResource(r);
+      final LocalResourceRequest appReq2 =
+          new LocalResourceRequest(appResource2);
+      final LocalResource appResource3 = getAppMockedResource(r);
+      final LocalResourceRequest appReq3 =
+          new LocalResourceRequest(appResource3);
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req1 =
+          new HashMap<LocalResourceVisibility,
+                      Collection<LocalResourceRequest>>();
+      req1.put(LocalResourceVisibility.PRIVATE,
+          Arrays.asList(new LocalResourceRequest[] { privReq1, privReq2 }));
+      req1.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq1));
+      req1.put(LocalResourceVisibility.APPLICATION,
+          Collections.singletonList(appReq1));
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+        new HashMap<LocalResourceVisibility,
+                    Collection<LocalResourceRequest>>();
+      req2.put(LocalResourceVisibility.APPLICATION,
+          Arrays.asList(new LocalResourceRequest[] { appReq2, appReq3 }));
+      req2.put(LocalResourceVisibility.PUBLIC,
+          Collections.singletonList(pubReq2));
+
+      // Send Request event
+      spyService.handle(new ContainerLocalizationRequestEvent(c1, req1));
+      spyService.handle(new ContainerLocalizationRequestEvent(c2, req2));
+      dispatcher.await();
+
+      // Simulate start of localization for all resources
+      privTracker1.getPathForLocalization(privReq1,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.USERCACHE + user1));
+      privTracker1.getPathForLocalization(privReq2,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.USERCACHE + user1));
+      LocalizedResource privLr1 = privTracker1.getLocalizedResource(privReq1);
+      LocalizedResource privLr2 = privTracker1.getLocalizedResource(privReq2);
+      appTracker1.getPathForLocalization(appReq1,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId1));
+      LocalizedResource appLr1 = appTracker1.getLocalizedResource(appReq1);
+      appTracker2.getPathForLocalization(appReq2,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId2));
+      LocalizedResource appLr2 = appTracker2.getLocalizedResource(appReq2);
+      appTracker2.getPathForLocalization(appReq3,
+          dirsHandler.getLocalPathForWrite(
+              ContainerLocalizer.APPCACHE + appId2));
+      LocalizedResource appLr3 = appTracker2.getLocalizedResource(appReq3);
+      pubTracker.getPathForLocalization(pubReq1,
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+      LocalizedResource pubLr1 = pubTracker.getLocalizedResource(pubReq1);
+      pubTracker.getPathForLocalization(pubReq2,
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.FILECACHE));
+      LocalizedResource pubLr2 = pubTracker.getLocalizedResource(pubReq2);
+
+      // Simulate completion of localization for most resources with
+      // possibly different sizes than in the request
+      assertNotNull("Localization not started", privLr1.getLocalPath());
+      privTracker1.handle(new ResourceLocalizedEvent(privReq1,
+          privLr1.getLocalPath(), privLr1.getSize() + 5));
+      assertNotNull("Localization not started", privLr2.getLocalPath());
+      privTracker1.handle(new ResourceLocalizedEvent(privReq2,
+          privLr2.getLocalPath(), privLr2.getSize() + 10));
+      assertNotNull("Localization not started", appLr1.getLocalPath());
+      appTracker1.handle(new ResourceLocalizedEvent(appReq1,
+          appLr1.getLocalPath(), appLr1.getSize()));
+      assertNotNull("Localization not started", appLr3.getLocalPath());
+      appTracker2.handle(new ResourceLocalizedEvent(appReq3,
+          appLr3.getLocalPath(), appLr3.getSize() + 7));
+      assertNotNull("Localization not started", pubLr1.getLocalPath());
+      pubTracker.handle(new ResourceLocalizedEvent(pubReq1,
+          pubLr1.getLocalPath(), pubLr1.getSize() + 1000));
+      assertNotNull("Localization not started", pubLr2.getLocalPath());
+      pubTracker.handle(new ResourceLocalizedEvent(pubReq2,
+          pubLr2.getLocalPath(), pubLr2.getSize() + 99999));
+
+      dispatcher.await();
+      assertEquals(ResourceState.LOCALIZED, privLr1.getState());
+      assertEquals(ResourceState.LOCALIZED, privLr2.getState());
+      assertEquals(ResourceState.LOCALIZED, appLr1.getState());
+      assertEquals(ResourceState.DOWNLOADING, appLr2.getState());
+      assertEquals(ResourceState.LOCALIZED, appLr3.getState());
+      assertEquals(ResourceState.LOCALIZED, pubLr1.getState());
+      assertEquals(ResourceState.LOCALIZED, pubLr2.getState());
+
+      // restart and recover
+      spyService = createSpyService(dispatcher, dirsHandler, stateStore);
+      spyService.init(conf);
+      spyService.recoverLocalizedResources(
+          stateStore.loadLocalizationState());
+      dispatcher.await();
+
+      appTracker1 = spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user1, appId1);
+      privTracker1 = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PRIVATE, user1, null);
+      appTracker2 = spyService.getLocalResourcesTracker(
+              LocalResourceVisibility.APPLICATION, user2, appId2);
+      pubTracker = spyService.getLocalResourcesTracker(
+          LocalResourceVisibility.PUBLIC, null, null);
+
+      LocalizedResource recoveredRsrc =
+          privTracker1.getLocalizedResource(privReq1);
+      assertEquals(privReq1, recoveredRsrc.getRequest());
+      assertEquals(privLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(privLr1.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = privTracker1.getLocalizedResource(privReq2);
+      assertEquals(privReq2, recoveredRsrc.getRequest());
+      assertEquals(privLr2.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(privLr2.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = appTracker1.getLocalizedResource(appReq1);
+      assertEquals(appReq1, recoveredRsrc.getRequest());
+      assertEquals(appLr1.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(appLr1.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+      recoveredRsrc = appTracker2.getLocalizedResource(appReq2);
+      assertNull("in-progress resource should not be present", recoveredRsrc);
+      recoveredRsrc = appTracker2.getLocalizedResource(appReq3);
+      assertEquals(appReq3, recoveredRsrc.getRequest());
+      assertEquals(appLr3.getLocalPath(), recoveredRsrc.getLocalPath());
+      assertEquals(appLr3.getSize(), recoveredRsrc.getSize());
+      assertEquals(ResourceState.LOCALIZED, recoveredRsrc.getState());
+    } finally {
+      dispatcher.stop();
+      stateStore.close();
+    }
+  }
+
   @Test( timeout = 10000)
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
@@ -436,7 +671,8 @@ public class TestResourceLocalizationSer
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler);
+                                      dirsHandler,
+                                      new NMNullStateStoreService());
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -469,7 +705,7 @@ public class TestResourceLocalizationSer
       long seed = r.nextLong();
       System.out.println("SEED: " + seed);
       r.setSeed(seed);
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, "user0");
       FSDataOutputStream out =
         new FSDataOutputStream(new DataOutputBuffer(), null);
       doReturn(out).when(spylfs).createInternal(isA(Path.class),
@@ -616,7 +852,8 @@ public class TestResourceLocalizationSer
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-                                        dirsHandler);
+                                        dirsHandler,
+                                        new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -637,7 +874,7 @@ public class TestResourceLocalizationSer
       dispatcher.await();
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
 
       // init resources
       Random r = new Random();
@@ -725,7 +962,7 @@ public class TestResourceLocalizationSer
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-            dirsHandlerSpy);
+            dirsHandlerSpy, new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -758,7 +995,7 @@ public class TestResourceLocalizationSer
         .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
 
       // init container.
-      final Container c = getMockContainer(appId, 42);
+      final Container c = getMockContainer(appId, 42, user);
 
       // first test ioexception
       Mockito
@@ -838,7 +1075,7 @@ public class TestResourceLocalizationSer
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler);
+            localDirHandler, new NMNullStateStoreService());
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -991,7 +1228,7 @@ public class TestResourceLocalizationSer
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler);
+            localDirHandler, new NMNullStateStoreService());
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -1157,7 +1394,7 @@ public class TestResourceLocalizationSer
       // it as otherwise it will remove requests from pending queue.
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            dirsHandler);
+            dirsHandler, new NMNullStateStoreService());
       ResourceLocalizationService spyService = spy(rawService);
       dispatcher1.register(LocalizationEventType.class, spyService);
       spyService.init(conf);
@@ -1424,12 +1661,13 @@ public class TestResourceLocalizationSer
     return getMockedResource(r, LocalResourceVisibility.PRIVATE);
   }
 
-  private static Container getMockContainer(ApplicationId appId, int id) {
+  private static Container getMockContainer(ApplicationId appId, int id,
+      String user) {
     Container c = mock(Container.class);
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(appId, 1);
     ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id);
-    when(c.getUser()).thenReturn("user0");
+    when(c.getUser()).thenReturn(user);
     when(c.getContainerId()).thenReturn(cId);
     Credentials creds = new Credentials();
     creds.addToken(new Text("tok" + id), getToken(id));
@@ -1438,6 +1676,24 @@ public class TestResourceLocalizationSer
     return c;
   }
 
+  private ResourceLocalizationService createSpyService(
+      DrainDispatcher dispatcher, LocalDirsHandlerService dirsHandler,
+      NMStateStoreService stateStore) {
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
+    DeletionService delService = mock(DeletionService.class);
+    ResourceLocalizationService rawService =
+      new ResourceLocalizationService(dispatcher, exec, delService,
+                                      dirsHandler, stateStore);
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
+        isA(Configuration.class));
+    doReturn(lfs).when(spyService)
+        .getLocalFileContext(isA(Configuration.class));
+    return spyService;
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   static Token<? extends TokenIdentifier> getToken(int id) {
     return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Sat May 31 00:01:20 2014
@@ -26,11 +26,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
-
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 
 import org.mockito.ArgumentCaptor;
+
 import static org.mockito.Mockito.*;
 
 public class TestResourceRetention {
@@ -81,7 +83,7 @@ public class TestResourceRetention {
     ConcurrentMap<LocalResourceRequest,LocalizedResource> trackerResources =
       new ConcurrentHashMap<LocalResourceRequest,LocalizedResource>();
     LocalResourcesTracker ret = spy(new LocalResourcesTrackerImpl(user, null,
-          trackerResources, false, conf));
+      null, trackerResources, false, conf, new NMNullStateStoreService()));
     for (int i = 0; i < nRsrcs; ++i) {
       final LocalResourceRequest req = new LocalResourceRequest(
           new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java Sat May 31 00:01:20 2014
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -77,7 +78,8 @@ public class TestContainerLogsPage {
     NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
     healthChecker.init(conf);
     LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
-    NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, new ApplicationACLsManager(conf));
+    NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+        new ApplicationACLsManager(conf), new NMNullStateStoreService());
     // Add an application and the corresponding containers
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     String user = "nobody";

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Sat May 31 00:01:20 2014
@@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -77,7 +79,8 @@ public class TestNMWebServer {
   }
   
   private int startNMWebAppServer(String webAddr) {
-    Context nmContext = new NodeManager.NMContext(null, null, null, null);
+    Context nmContext = new NodeManager.NMContext(null, null, null, null,
+        null);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -135,7 +138,8 @@ public class TestNMWebServer {
 
   @Test
   public void testNMWebApp() throws IOException, YarnException {
-    Context nmContext = new NodeManager.NMContext(null, null, null, null);
+    Context nmContext = new NodeManager.NMContext(null, null, null, null,
+        null);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -185,6 +189,7 @@ public class TestNMWebServer {
     ContainerId container2 =
         BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 1);
     NodeManagerMetrics metrics = mock(NodeManagerMetrics.class);
+    NMStateStoreService stateStore = new NMNullStateStoreService();
     for (ContainerId containerId : new ContainerId[] { container1,
         container2}) {
       // TODO: Use builder utils

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java Sat May 31 00:01:20 2014
@@ -107,7 +107,8 @@ public class TestNMWebServices extends J
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java Sat May 31 00:01:20 2014
@@ -99,7 +99,8 @@ public class TestNMWebServicesApps exten
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager);
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java Sat May 31 00:01:20 2014
@@ -122,7 +122,8 @@ public class TestNMWebServicesContainers
       healthChecker.init(conf);
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
-      nmContext = new NodeManager.NMContext(null, null, dirsHandler, aclsManager) {
+      nmContext = new NodeManager.NMContext(null, null, dirsHandler,
+          aclsManager, null) {
         public NodeId getNodeId() {
           return NodeId.newInstance("testhost.foo.com", 8042);
         };

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Sat May 31 00:01:20 2014
@@ -90,7 +90,9 @@ public class ZKRMStateStore extends RMSt
 
   private String zkHostPort = null;
   private int zkSessionTimeout;
-  private long zkRetryInterval;
+
+  @VisibleForTesting
+  long zkRetryInterval;
   private List<ACL> zkAcl;
   private List<ZKUtil.ZKAuthInfo> zkAuths;
 
@@ -199,9 +201,14 @@ public class ZKRMStateStore extends RMSt
     zkSessionTimeout =
         conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
             YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-    zkRetryInterval =
-        conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
-          YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+
+    if (HAUtil.isHAEnabled(conf)) {
+      zkRetryInterval = zkSessionTimeout / numRetries;
+    } else {
+      zkRetryInterval =
+          conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
+              YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
+    }
 
     zkAcl = RMZKUtils.getZKAcls(conf);
     zkAuths = RMZKUtils.getZKAuths(conf);

Modified: hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1598785&r1=1598784&r2=1598785&view=diff
==============================================================================
--- hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java (original)
+++ hadoop/common/branches/fs-encryption/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java Sat May 31 00:01:20 2014
@@ -41,6 +41,7 @@ import java.security.NoSuchAlgorithmExce
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -203,7 +204,7 @@ public class TestZKRMStateStoreZKClientC
       LOG.error(error, e);
       fail(error);
     }
-    Assert.assertEquals("newBytes", new String(ret));
+    assertEquals("newBytes", new String(ret));
   }
 
   @Test(timeout = 20000)
@@ -232,7 +233,7 @@ public class TestZKRMStateStoreZKClientC
 
     try {
       byte[] ret = store.getDataWithRetries(path, false);
-      Assert.assertEquals("bytes", new String(ret));
+      assertEquals("bytes", new String(ret));
     } catch (Exception e) {
       String error = "New session creation failed";
       LOG.error(error, e);
@@ -281,4 +282,24 @@ public class TestZKRMStateStoreZKClientC
 
     zkClientTester.getRMStateStore(conf);
   }
+
+  @Test
+  public void testZKRetryInterval() throws Exception {
+    TestZKClient zkClientTester = new TestZKClient();
+    YarnConfiguration conf = new YarnConfiguration();
+
+    ZKRMStateStore store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS,
+        store.zkRetryInterval);
+    store.stop();
+
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    assertEquals(YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS /
+            YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES,
+        store.zkRetryInterval);
+    store.stop();
+  }
 }



Mime
View raw message