hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1203015 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
Date Thu, 17 Nov 2011 02:17:00 GMT
Author: nspiegelberg
Date: Thu Nov 17 02:17:00 2011
New Revision: 1203015

URL: http://svn.apache.org/viewvc?rev=1203015&view=rev
Log:
Refactored FSUtils locality checking to support future reuse

Summary:
Locality checking in FSUtils would not be correct if invoked by multiple
callers in the same JVM concurrently, due to the use of shared static
variables. This is not a problem at present but reuse of the locality
checking increases the chance of this happening.

The use of the ThreadPoolExecutor for locality checking was also made
more conventional by having a single region path per task rather than
bucketing the paths into a couple large tasks.

Test Plan:
Ran the HBase locality check tool on TITANMIGRATE001 and verified the
results.

Reviewers: liyintang, kannan, kranganathan
Reviewed By: kranganathan

Differential Revision: 360763

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1203015&r1=1203014&r2=1203015&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Thu Nov
17 02:17:00 2011
@@ -26,9 +26,7 @@ import java.io.InterruptedIOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -792,46 +790,21 @@ public class FSUtils {
 
     FileStatus[] statusList = fs.globStatus(queryPath, pathFilter);
 
-    LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
-        statusList.length);
-
     if (null == statusList) {
       return regionToBestLocalityRSMapping;
+    } else {
+      LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
+          statusList.length);
     }
 
+    // lower the number of threads in case we have very few expected regions
+    threadPoolSize = Math.min(threadPoolSize, statusList.length);
+
     // run in multiple threads
-    int totalGoodRegionFiles = 0;
-    ThreadPoolExecutor tpe = null;
-    FSRegionScanner[] parallelTasks = null;
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
+        threadPoolSize, 60, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<Runnable>(statusList.length));
     try {
-      // lower the number of threads in case we have very few expected regions
-      int maxRegions = statusList.length;
-
-      if (maxRegions < threadPoolSize) {
-        threadPoolSize = maxRegions;
-      }
-
-      // initialize executor service
-      tpe = new ThreadPoolExecutor(threadPoolSize,
-          threadPoolSize, 60, TimeUnit.SECONDS,
-          new ArrayBlockingQueue<Runnable>(
-              threadPoolSize));
-
-      // set defaults
-      FSRegionScanner.setFileSystem(fs);
-      FSRegionScanner
-          .setRegionToBestLocalityRSMapping(regionToBestLocalityRSMapping);
-
-      // start initializing "thread pool" and threads
-      parallelTasks = new FSRegionScanner[threadPoolSize];
-      ArrayList<LinkedList<Path>> buckets = new ArrayList<LinkedList<Path>>();
-      for (int i = 0; i < threadPoolSize; ++i) {
-        // create buckets for each thread
-        LinkedList<Path> bucket = new LinkedList<Path>();
-        buckets.add(bucket);
-        parallelTasks[i] = new FSRegionScanner(bucket);
-      }
-
       // ignore all file status items that are not of interest
       for (FileStatus regionStatus : statusList) {
         if (null == regionStatus) {
@@ -847,43 +820,26 @@ public class FSUtils {
           continue;
         }
 
-        // add to respective bucket, do round-robin additions to make sure all
-        // threads get things to do; can create empty buckets in the rare case
-        // in which we end up getting less region paths than we have threads to
-        // handle them
-        buckets.get(totalGoodRegionFiles % threadPoolSize).add(regionPath);
-        ++totalGoodRegionFiles;
-      }
-
-      // start each thread in executor service
-      for (FSRegionScanner task : parallelTasks) {
-        tpe.execute(task);
+        tpe.execute(new FSRegionScanner(fs, regionPath,
+            regionToBestLocalityRSMapping));
       }
     } finally {
-      if (null != tpe && null != parallelTasks) {
-        tpe.shutdown();
-        int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
-            60 * 1000);
-        try {
-          // here we wait until TPE terminates, which is either naturally or by
-          // exceptions in the execution of the threads
-          while (!tpe.awaitTermination(threadWakeFrequency,
-              TimeUnit.MILLISECONDS)) {
-            int filesDone = 0;
-            for (FSRegionScanner rs : parallelTasks) {
-              filesDone += rs.getNumberOfFinishedRegions();
-            }
-
-            // printing out rough estimate, so as to not introduce
-            // AtomicInteger
-            LOG.info("Locality checking is underway: { Threads completed : "
-                + tpe.getCompletedTaskCount() + "/" + parallelTasks.length
-                + " , Scanned Regions : " + filesDone + "/"
-                + totalGoodRegionFiles + " }");
-          }
-        } catch (InterruptedException e) {
-          throw new IOException(e);
+      tpe.shutdown();
+      int threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
+          60 * 1000);
+      try {
+        // here we wait until TPE terminates, which is either naturally or by
+        // exceptions in the execution of the threads
+        while (!tpe.awaitTermination(threadWakeFrequency,
+            TimeUnit.MILLISECONDS)) {
+          // printing out rough estimate, so as to not introduce
+          // AtomicInteger
+          LOG.info("Locality checking is underway: { Scanned Regions : "
+              + tpe.getCompletedTaskCount() + "/"
+              + tpe.getTaskCount() + " }");
         }
+      } catch (InterruptedException e) {
+        throw new IOException(e);
       }
     }
 
@@ -902,139 +858,102 @@ public class FSUtils {
 class FSRegionScanner implements Runnable {
   static private final Log LOG = LogFactory.getLog(FSRegionScanner.class);
 
-  /**
-   * The shared block count map
-   */
-  private HashMap<String, AtomicInteger> blockCountMap;
+  private Path regionPath;
 
   /**
    * The file system used
    */
-  static private FileSystem fs;
-
-  static void setFileSystem(FileSystem fs) {
-    FSRegionScanner.fs = fs;
-  }
+  private FileSystem fs;
 
   /**
    * The locality mapping returned by the above getRegionLocalityMappingFromFS
    * method
    */
-  static private MapWritable regionToBestLocalityRSMapping;
+  private MapWritable regionToBestLocalityRSMapping;
 
-  static void setRegionToBestLocalityRSMapping(
+  FSRegionScanner(FileSystem fs, Path regionPath,
       MapWritable regionToBestLocalityRSMapping) {
-    FSRegionScanner.regionToBestLocalityRSMapping =
-      regionToBestLocalityRSMapping;
-  }
-
-  /**
-   * The respective paths to analyze for each thread
-   */
-  private LinkedList<Path> paths;
-
-  /**
-   * Number of finished blocks by now
-   */
-  private int numerOfFinishedRegions;
-
-  public int getNumberOfFinishedRegions() {
-    return numerOfFinishedRegions;
-  }
-
-  FSRegionScanner(LinkedList<Path> paths) {
-    this.paths = paths;
-    this.numerOfFinishedRegions = 0;
-    this.blockCountMap = new HashMap<String, AtomicInteger>();
+    this.fs = fs;
+    this.regionPath = regionPath;
+    this.regionToBestLocalityRSMapping = regionToBestLocalityRSMapping;
   }
 
   @Override
   public void run() {
     try {
-      for (Path regionPath : paths) {
-        try {
-          // empty the map for each region
-          blockCountMap.clear();
-
-          if (null == regionPath) {
-            continue;
-          }
-          //get table name
-          String tableName = regionPath.getParent().getName();
-          int totalBlkCount = 0;
-
-          // ignore null
-          FileStatus[] cfList = fs.listStatus(regionPath);
-          if (null == cfList) {
-            continue;
-          }
+      // empty the map for each region
+      Map<String, AtomicInteger> blockCountMap = new HashMap<String, AtomicInteger>();
 
-          // for each cf, get all the blocks information
-          for (FileStatus cfStatus : cfList) {
-            if (!cfStatus.isDir()) {
-              // skip because this is not a CF directory
-              continue;
-            }
-            FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
-            if (null == storeFileLists) {
-              continue;
-            }
+      //get table name
+      String tableName = regionPath.getParent().getName();
+      int totalBlkCount = 0;
+
+      // ignore null
+      FileStatus[] cfList = fs.listStatus(regionPath);
+      if (null == cfList) {
+        return;
+      }
 
-            for (FileStatus storeFile : storeFileLists) {
-              BlockLocation[] blkLocations =
-                fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
-              if (null == blkLocations) {
-                continue;
-              }
+      // for each cf, get all the blocks information
+      for (FileStatus cfStatus : cfList) {
+        if (!cfStatus.isDir()) {
+          // skip because this is not a CF directory
+          continue;
+        }
+        FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
+        if (null == storeFileLists) {
+          continue;
+        }
 
-              totalBlkCount += blkLocations.length;
-              for(BlockLocation blk: blkLocations) {
-                for (String host: blk.getHosts()) {
-                  AtomicInteger count = blockCountMap.get(host);
-                  if (count == null) {
-                    count = new AtomicInteger(0);
-                    blockCountMap.put(host, count);
-                  }
-                 count.incrementAndGet();
-                }
-              }
-            }
+        for (FileStatus storeFile : storeFileLists) {
+          BlockLocation[] blkLocations =
+            fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
+          if (null == blkLocations) {
+            continue;
           }
 
-          int largestBlkCount = 0;
-          String hostToRun = null;
-          for (String host: blockCountMap.keySet()) {
-            int tmp = blockCountMap.get(host).get();
-            if (tmp > largestBlkCount) {
-              largestBlkCount = tmp;
-              hostToRun = host;
+          totalBlkCount += blkLocations.length;
+          for(BlockLocation blk: blkLocations) {
+            for (String host: blk.getHosts()) {
+              AtomicInteger count = blockCountMap.get(host);
+              if (count == null) {
+                count = new AtomicInteger(0);
+                blockCountMap.put(host, count);
+              }
+             count.incrementAndGet();
             }
           }
+        }
+      }
 
-          // empty regions could make this null
-          if (null == hostToRun) {
-            continue;
-          }
+      int largestBlkCount = 0;
+      String hostToRun = null;
+      for (Map.Entry<String, AtomicInteger> entry : blockCountMap.entrySet()) {
+        String host = entry.getKey();
 
-          if (hostToRun.endsWith(".")) {
-            hostToRun = hostToRun.substring(0, hostToRun.length()-1);
-          }
-          String name = tableName + ":" + regionPath.getName();
-          synchronized (regionToBestLocalityRSMapping) {
-            regionToBestLocalityRSMapping.put(new Text(name), new Text(hostToRun));
-          }
-          this.numerOfFinishedRegions++;
-        } catch (IOException e) {
-          LOG.warn("Problem scanning file system", e);
-          continue;
-        } catch (RuntimeException e) {
-          LOG.warn("Problem scanning file system", e);
-          continue;
+        int tmp = entry.getValue().get();
+        if (tmp > largestBlkCount) {
+          largestBlkCount = tmp;
+          hostToRun = host;
         }
       }
-    } finally {
-      // sanity check
-      assert this.numerOfFinishedRegions <= this.paths.size();
+
+      // empty regions could make this null
+      if (null == hostToRun) {
+        return;
+      }
+
+      if (hostToRun.endsWith(".")) {
+        hostToRun = hostToRun.substring(0, hostToRun.length()-1);
+      }
+      String name = tableName + ":" + regionPath.getName();
+      synchronized (regionToBestLocalityRSMapping) {
+        regionToBestLocalityRSMapping.put(new Text(name), new Text(hostToRun));
+      }
+    } catch (IOException e) {
+      LOG.warn("Problem scanning file system", e);
+    } catch (RuntimeException e) {
+      LOG.warn("Problem scanning file system", e);
     }
   }
 }



Mime
View raw message