hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1367352 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ hadoop-yarn/hadoop-yarn-server...
Date Mon, 30 Jul 2012 22:56:15 GMT
Author: tucu
Date: Mon Jul 30 22:56:15 2012
New Revision: 1367352

URL: http://svn.apache.org/viewvc?rev=1367352&view=rev
Log:
MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files get deleted from
tasktracker. (mayank_bansal via tucu)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1367352&r1=1367351&r2=1367352&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Mon Jul 30 22:56:15 2012
@@ -156,6 +156,9 @@ Branch-2 ( Unreleased changes )
     MAPREDUCE-4465. Update description of yarn.nodemanager.address property. 
     (bowang via tucu)
 
+    MAPREDUCE-4342. Distributed Cache gives inconsistent result if cache files 
+    get deleted from tasktracker. (mayank_bansal via tucu)
+
 Release 2.1.0-alpha - Unreleased 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1367352&r1=1367351&r2=1367352&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
Mon Jul 30 22:56:15 2012
@@ -17,6 +17,7 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.io.File;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
 
 /**
  * A collection of {@link LocalizedResource}s all of same
@@ -67,6 +69,12 @@ class LocalResourcesTrackerImpl implemen
     switch (event.getType()) {
     case REQUEST:
     case LOCALIZED:
+      if (rsrc != null && (!isResourcePresent(rsrc))) {
+        LOG.info("Resource " + rsrc.getLocalPath()
+            + " is missing, localizing it again");
+        localrsrc.remove(req);
+        rsrc = null;
+      }
       if (null == rsrc) {
         rsrc = new LocalizedResource(req, dispatcher);
         localrsrc.put(req, rsrc);
@@ -82,6 +90,24 @@ class LocalResourcesTrackerImpl implemen
     rsrc.handle(event);
   }
 
+  /**
+   * This module checks if the resource which was localized is already present
+   * or not
+   * 
+   * @param rsrc
+   * @return true/false based on resource is present or not
+   */
+  public boolean isResourcePresent(LocalizedResource rsrc) {
+    boolean ret = true;
+    if (rsrc.getState() == ResourceState.LOCALIZED) {
+      File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+      if (!file.exists()) {
+        ret = false;
+      }
+    }
+    return ret;
+  }
+  
   @Override
   public boolean contains(LocalResourceRequest resource) {
     return localrsrc.containsKey(resource);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1367352&r1=1367351&r2=1367352&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
Mon Jul 30 22:56:15 2012
@@ -5,6 +5,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
+import org.mortbay.log.Log;
 
 public class TestLocalResourcesTrackerImpl {
 
@@ -131,6 +134,86 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testConsistency() {
+    String user = "testuser";
+    DrainDispatcher dispatcher = null;
+    try {
+      dispatcher = createDispatcher(new Configuration());
+      EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
+      EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
+      dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+      dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+      LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest,
LocalizedResource>();
+      localrsrc.put(req1, lr1);
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          dispatcher, localrsrc);
+
+      ResourceEvent req11Event = new ResourceRequestEvent(req1,
+          LocalResourceVisibility.PUBLIC, lc1);
+
+      ResourceEvent rel11Event = new ResourceReleaseEvent(req1, cId1);
+
+      // Localize R1 for C1
+      tracker.handle(req11Event);
+
+      dispatcher.await();
+
+      // Verify refCount for R1 is 1
+      Assert.assertEquals(1, lr1.getRefCount());
+
+      dispatcher.await();
+      verifyTrackedResourceCount(tracker, 1);
+
+      // Localize resource1
+      ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path(
+          "file:///tmp/r1"), 1);
+      lr1.handle(rle);
+      Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+      Assert.assertTrue(createdummylocalizefile(new Path("file:///tmp/r1")));
+      LocalizedResource rsrcbefore = tracker.iterator().next();
+      File resFile = new File(lr1.getLocalPath().toUri().getRawPath()
+          .toString());
+      Assert.assertTrue(resFile.exists());
+      Assert.assertTrue(resFile.delete());
+
+      // Localize R1 for C1
+      tracker.handle(req11Event);
+
+      dispatcher.await();
+      lr1.handle(rle);
+      Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+      LocalizedResource rsrcafter = tracker.iterator().next();
+      if (rsrcbefore == rsrcafter) {
+        Assert.fail("Localized resource should not be equal");
+      }
+      // Release resource1
+      tracker.handle(rel11Event);
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  private boolean createdummylocalizefile(Path path) {
+    boolean ret = false;
+    File file = new File(path.toUri().getRawPath().toString());
+    try {
+      ret = file.createNewFile();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return ret;
+  }
+  
   private void verifyTrackedResourceCount(LocalResourcesTracker tracker,
       int expected) {
     int count = 0;



Mime
View raw message