hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1157346 - in /hadoop/common/trunk/mapreduce: CHANGES.txt src/java/org/apache/hadoop/mapred/IndexCache.java src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
Date Sat, 13 Aug 2011 08:16:29 GMT
Author: acmurthy
Date: Sat Aug 13 08:16:28 2011
New Revision: 1157346

URL: http://svn.apache.org/viewvc?rev=1157346&view=rev
Log:
MAPREDUCE-2541. Fixed a race condition in IndexCache.removeMap. Contributed by Binglin Chang.

Modified:
    hadoop/common/trunk/mapreduce/CHANGES.txt
    hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java
    hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java

Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1157346&r1=1157345&r2=1157346&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Sat Aug 13 08:16:28 2011
@@ -396,6 +396,9 @@ Trunk (unreleased changes)
     MAPREDUCE-2837. Ported bug fixes from y-merge to prepare for MAPREDUCE-279
     merge. (acmurthy) 
 
+    MAPREDUCE-2541. Fixed a race condition in IndexCache.removeMap. (Binglin
+    Chang via acmurthy) 
+
 
 Release 0.22.0 - Unreleased
 

Modified: hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java?rev=1157346&r1=1157345&r2=1157346&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/common/trunk/mapreduce/src/java/org/apache/hadoop/mapred/IndexCache.java Sat Aug
13 08:16:28 2011
@@ -130,12 +130,19 @@ class IndexCache {
   }
 
   /**
-   * This method removes the map from the cache. It should be called when
-   * a map output on this tracker is discarded.
+   * This method removes the map from the cache if index information for this
+   * map is loaded(size>0), index information entry in cache will not be 
+   * removed if it is in the loading phrase(size=0), this prevents corruption  
+   * of totalMemoryUsed. It should be called when a map output on this tracker 
+   * is discarded.
    * @param mapId The taskID of this map.
    */
   public void removeMap(String mapId) {
-    IndexInformation info = cache.remove(mapId);
+    IndexInformation info = cache.get(mapId);
+    if ((info != null) && (info.getSize() == 0)) {
+      return;
+    }
+    info = cache.remove(mapId);
     if (info != null) {
       totalMemoryUsed.addAndGet(-info.getSize());
       if (!queue.remove(mapId)) {
@@ -147,6 +154,19 @@ class IndexCache {
   }
 
   /**
+   * This method checks if cache and totolMemoryUsed is consistent.
+   * It is only used for unit test.
+   * @return True if cache and totolMemoryUsed is consistent
+   */
+  boolean checkTotalMemoryUsed() {
+    int totalSize = 0;
+    for (IndexInformation info : cache.values()) {
+      totalSize += info.getSize();
+    }
+    return totalSize == totalMemoryUsed.get();
+  }
+
+  /**
    * Bring memory usage below totalMemoryAllowed.
    */
   private synchronized void freeIndexInformation() {

Modified: hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java?rev=1157346&r1=1157345&r2=1157346&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
(original)
+++ hadoop/common/trunk/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestIndexCache.java
Sat Aug 13 08:16:28 2011
@@ -193,6 +193,60 @@ public class TestIndexCache extends Test
     }
   }
 
+  public void testRemoveMap() throws Exception {
+    // This test case use two thread to call getIndexInformation and 
+    // removeMap concurrently, in order to construct race condition.
+    // This test case may not repeatable. But on my macbook this test 
+    // fails with probability of 100% on code before MAPREDUCE-2541,
+    // so it is repeatable in practice.
+    JobConf conf = new JobConf();
+    FileSystem fs = FileSystem.getLocal(conf).getRaw();
+    Path p = new Path(System.getProperty("test.build.data", "/tmp"),
+                      "cache").makeQualified(fs);
+    fs.delete(p, true);
+    conf.setInt(TTConfig.TT_INDEX_CACHE, 10);
+    // Make a big file so removeMapThread almost surely runs faster than 
+    // getInfoThread 
+    final int partsPerMap = 100000;
+    final int bytesPerFile = partsPerMap * 24;
+    final IndexCache cache = new IndexCache(conf);
+
+    final Path big = new Path(p, "bigIndex");
+    final String user = 
+      UserGroupInformation.getCurrentUser().getShortUserName();
+    writeFile(fs, big, bytesPerFile, partsPerMap);
+    
+    // run multiple times
+    for (int i = 0; i < 20; ++i) {
+      Thread getInfoThread = new Thread() {
+        @Override
+        public void run() {
+          try {
+            cache.getIndexInformation("bigIndex", partsPerMap, big, user);
+          } catch (Exception e) {
+            // should not be here
+          }
+        }
+      };
+      Thread removeMapThread = new Thread() {
+        @Override
+        public void run() {
+          cache.removeMap("bigIndex");
+        }
+      };
+      if (i%2==0) {
+        getInfoThread.start();
+        removeMapThread.start();        
+      } else {
+        removeMapThread.start();        
+        getInfoThread.start();
+      }
+      getInfoThread.join();
+      removeMapThread.join();
+      assertEquals(true, cache.checkTotalMemoryUsed());
+    }      
+  }
+  
   private static void checkRecord(IndexRecord rec, long fill) {
     assertEquals(fill, rec.startOffset);
     assertEquals(fill, rec.rawLength);



Mime
View raw message